2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file transport/gnunet-service-tng.c
22 * @brief main for gnunet-service-tng
23 * @author Christian Grothoff
26 * - figure out how to transmit (selective) ACKs in case of uni-directional
27 * communicators (with/without core? DV-only?) When do we use ACKs?
28 * => communicators use selective ACKs for flow control
29 * => transport uses message-level ACKs for RTT, fragment confirmation
30 * => integrate DV into transport, use neither core nor communicators
31 * but rather give communicators transport-encapsulated messages
32 * (which could be core-data, background-channel traffic, or
33 * transport-to-transport traffic)
36 * - address validation: what is our plan here?
37 * #1 Peerstore only gets 'validated' addresses
38 * #2 transport needs another API to "trigger" validation!
39 * API may be used by core/application or communicators;
40 * => use yet another lib/MQ/connection?
41 * #3 transport should use validation to also establish
42 * effective flow control (for uni-directional transports!)
43 * #4 UDP broadcasting logic must be extended to use the new API
44 * #5 only validated addresses go to ATS for scheduling; that
45 * also ensures we know the RTT
46 * #6 to ensure flow control and RTT are OK, we always do the
47 * 'validation', even if address comes from PEERSTORE
49 * - ACK handling / retransmission
50 * - track RTT, distance, loss, etc.
51 * - DV data structures:
55 * - routing of messages (using DV data structures!)
56 * - handling of DV-boxed messages that need to be forwarded
57 * - backchannel message encryption & decryption
61 * - use ATS bandwidth allocation callback and schedule transmissions!
64 * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
67 * - change transport-core API to provide proper flow control in both
68 * directions, allow multiple messages per peer simultaneously (tag
69 * confirmations with unique message ID), and replace quota-out with
70 * proper flow control;
71 * - if messages are below MTU, consider adding ACKs and other stuff
72 * (requires planning at receiver, and additional MST-style demultiplex
74 * - could avoid copying body of message into each fragment and keep
75 * fragments as just pointers into the original message and only
76 * fully build fragments just before transmission (optimization, should
77 * reduce CPU and memory use)
79 * Design realizations / discussion:
80 * - communicators do flow control by calling MQ "notify sent"
81 * when 'ready'. They determine flow implicitly (i.e. TCP blocking)
82 * or explicitly via background channel FC ACKs. As long as the
83 * channel is not full, they may 'notify sent' even if the other
84 * peer has not yet confirmed receipt. The other peer confirming
85 * is _only_ for FC, not for more reliable transmission; reliable
86 * transmission (i.e. of fragments) is left to _transport_.
87 * - ACKs sent back in uni-directional communicators are done via
88 * the background channel API; here transport _may_ initially
89 * broadcast (with bounded # hops) if no path is known;
90 * - transport should _integrate_ DV-routing and build a view of
91 * the network; then background channel traffic can be
92 * routed via DV as well as explicit "DV" traffic.
93 * - background channel is also used for ACKs and NAT traversal support
94 * - transport service is responsible for AEAD'ing the background
95 * channel, timestamps and monotonic time are used against replay
96 * of old messages -> peerstore needs to be supplied with
97 * "latest timestamps seen" data
98 * - if transport implements DV, we likely need a 3rd peermap
99 * in addition to ephemerals and (direct) neighbours
100 * => in this data structure, we should track ATS metrics (distance, RTT, etc.)
101 * as well as latest timestamps seen, goodput, fragments for transmission, etc.
102 * ==> check if stuff needs to be moved out of "Neighbour"
103 * - transport should encapsualte core-level messages and do its
104 * own ACKing for RTT/goodput/loss measurements _and_ fragment
107 #include "platform.h"
108 #include "gnunet_util_lib.h"
109 #include "gnunet_statistics_service.h"
110 #include "gnunet_transport_monitor_service.h"
111 #include "gnunet_peerstore_service.h"
112 #include "gnunet_hello_lib.h"
113 #include "gnunet_ats_transport_service.h"
114 #include "gnunet_signatures.h"
115 #include "transport.h"
119 * What is the size we assume for a read operation in the
120 * absence of an MTU for the purpose of flow control?
122 #define IN_PACKET_SIZE_WITHOUT_MTU 128
125 * If a queue delays the next message by more than this number
126 * of seconds we log a warning. Note: this is for testing,
127 * the value chosen here might be too aggressively low!
129 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
132 * How long are ephemeral keys valid?
134 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
137 * How long do we keep partially reassembled messages around before giving up?
139 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
142 * How many messages can we have pending for a given communicator
143 * process before we start to throttle that communicator?
145 * Used if a communicator might be CPU-bound and cannot handle the traffic.
147 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
150 * How many messages can we have pending for a given session (queue to
151 * a particular peer via a communicator) process before we start to
152 * throttle that queue?
154 * Used if ATS assigns more bandwidth to a particular transmission
155 * method than that transmission method can right now handle. (Yes,
156 * ATS should eventually notice utilization below allocation and
157 * adjust, but we don't want to queue up tons of messages in the
158 * meantime). Must be significantly below
159 * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
161 #define SESSION_QUEUE_LIMIT 32
164 GNUNET_NETWORK_STRUCT_BEGIN
167 * Outer layer of an encapsulated backchannel message.
169 struct TransportBackchannelEncapsulationMessage
172 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
174 struct GNUNET_MessageHeader header;
177 * Distance the backchannel message has traveled, to be updated at
178 * each hop. Used to bound the number of hops in case a backchannel
179 * message is broadcast and thus travels without routing
180 * information (during initial backchannel discovery).
185 * Target's peer identity (as backchannels may be transmitted
186 * indirectly, or even be broadcast).
188 struct GNUNET_PeerIdentity target;
191 * Ephemeral key setup by the sender for @e target, used
192 * to encrypt the payload.
194 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
196 // FIXME: probably should add random IV here as well,
197 // especially if we re-use ephemeral keys!
200 * HMAC over the ciphertext of the encrypted, variable-size
201 * body that follows. Verified via DH of @e target and
204 struct GNUNET_HashCode hmac;
206 /* Followed by encrypted, variable-size payload */
211 * Body by which a peer confirms that it is using an ephemeral key.
213 struct EphemeralConfirmation
217 * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
219 struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
222 * How long is this signature over the ephemeral key valid?
223 * Note that the receiver MUST IGNORE the absolute time, and
224 * only interpret the value as a mononic time and reject
225 * "older" values than the last one observed. Even with this,
226 * there is no real guarantee against replay achieved here,
227 * as the latest timestamp is not persisted. This is
228 * necessary as we do not want to require synchronized
229 * clocks and may not have a bidirectional communication
230 * channel. Communicators must protect against replay
231 * attacks when using backchannel communication!
233 struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
236 * Target's peer identity.
238 struct GNUNET_PeerIdentity target;
241 * Ephemeral key setup by the sender for @e target, used
242 * to encrypt the payload.
244 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
250 * Plaintext of the variable-size payload that is encrypted
251 * within a `struct TransportBackchannelEncapsulationMessage`
253 struct TransportBackchannelRequestPayload
257 * Sender's peer identity.
259 struct GNUNET_PeerIdentity sender;
262 * Signature of the sender over an
263 * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
265 struct GNUNET_CRYPTO_EddsaSignature sender_sig;
268 * How long is this signature over the ephemeral key
271 struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
274 * Current monotonic time of the sending transport service. Used to
275 * detect replayed messages. Note that the receiver should remember
276 * a list of the recently seen timestamps and only reject messages
277 * if the timestamp is in the list, or the list is "full" and the
278 * timestamp is smaller than the lowest in the list. This list of
279 * timestamps per peer should be persisted to guard against replays
282 struct GNUNET_TIME_AbsoluteNBO monotonic_time;
284 /* Followed by a `struct GNUNET_MessageHeader` with a message
285 for a communicator */
287 /* Followed by a 0-termianted string specifying the name of
288 the communicator which is to receive the message */
294 * Outer layer of an encapsulated unfragmented application message sent
295 * over an unreliable channel.
297 struct TransportReliabilityBox
300 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
302 struct GNUNET_MessageHeader header;
305 * Number of messages still to be sent before a commulative
306 * ACK is requested. Zero if an ACK is requested immediately.
307 * In NBO. Note that the receiver may send the ACK faster
308 * if it believes that is reasonable.
310 uint32_t ack_countdown GNUNET_PACKED;
313 * Unique ID of the message used for signalling receipt of
314 * messages sent over possibly unreliable channels. Should
317 struct GNUNET_ShortHashCode msg_uuid;
322 * Confirmation that the receiver got a
323 * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
324 * confirmation may be transmitted over a completely different queue,
325 * so ACKs are identified by a combination of PID of sender and
326 * message UUID, without the queue playing any role!
328 struct TransportReliabilityAckMessage
331 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
333 struct GNUNET_MessageHeader header;
338 uint32_t reserved GNUNET_PACKED;
341 * How long was the ACK delayed relative to the average time of
342 * receipt of the messages being acknowledged? Used to calculate
343 * the average RTT by taking the receipt time of the ack minus the
344 * average transmission time of the sender minus this value.
346 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
348 /* followed by any number of `struct GNUNET_ShortHashCode`
349 messages providing ACKs */
354 * Outer layer of an encapsulated fragmented application message.
356 struct TransportFragmentBox
359 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
361 struct GNUNET_MessageHeader header;
364 * Unique ID of this fragment (and fragment transmission!). Will
365 * change even if a fragement is retransmitted to make each
366 * transmission attempt unique! Should be incremented by one for
367 * each fragment transmission. If a client receives a duplicate
368 * fragment (same @e frag_off), it must send
369 * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
371 uint32_t frag_uuid GNUNET_PACKED;
374 * Original message ID for of the message that all the1
375 * fragments belong to. Must be the same for all fragments.
377 struct GNUNET_ShortHashCode msg_uuid;
380 * Offset of this fragment in the overall message.
382 uint16_t frag_off GNUNET_PACKED;
385 * Total size of the message that is being fragmented.
387 uint16_t msg_size GNUNET_PACKED;
393 * Outer layer of an fragmented application message sent over a queue
394 * with finite MTU. When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
395 * received, the receiver has two RTTs or 64 further fragments with
396 * the same basic message time to send an acknowledgement, possibly
397 * acknowledging up to 65 fragments in one ACK. ACKs must also be
398 * sent immediately once all fragments were sent.
400 struct TransportFragmentAckMessage
403 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
405 struct GNUNET_MessageHeader header;
408 * Unique ID of the lowest fragment UUID being acknowledged.
410 uint32_t frag_uuid GNUNET_PACKED;
413 * Bitfield of up to 64 additional fragments following the
414 * @e msg_uuid being acknowledged by this message.
416 uint64_t extra_acks GNUNET_PACKED;
419 * Original message ID for of the message that all the
420 * fragments belong to.
422 struct GNUNET_ShortHashCode msg_uuid;
425 * How long was the ACK delayed relative to the average time of
426 * receipt of the fragments being acknowledged? Used to calculate
427 * the average RTT by taking the receipt time of the ack minus the
428 * average transmission time of the sender minus this value.
430 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
433 * How long until the receiver will stop trying reassembly
436 struct GNUNET_TIME_RelativeNBO reassembly_timeout;
441 * Internal message used by transport for distance vector learning.
442 * If @e num_hops does not exceed the threshold, peers should append
443 * themselves to the peer list and flood the message (possibly only
444 * to a subset of their neighbours to limit discoverability of the
445 * network topology). To the extend that the @e bidirectional bits
446 * are set, peers may learn the inverse paths even if they did not
449 * Unless received on a bidirectional queue and @e num_hops just
450 * zero, peers that can forward to the initator should always try to
451 * forward to the initiator.
453 struct TransportDVLearn
456 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
458 struct GNUNET_MessageHeader header;
461 * Number of hops this messages has travelled, in NBO. Zero if
464 uint16_t num_hops GNUNET_PACKED;
467 * Bitmask of the last 16 hops indicating whether they are confirmed
468 * available (without DV) in both directions or not, in NBO. Used
469 * to possibly instantly learn a path in both directions. Each peer
470 * should shift this value by one to the left, and then set the
471 * lowest bit IF the current sender can be reached from it (without
474 uint16_t bidirectional GNUNET_PACKED;
477 * Peers receiving this message and delaying forwarding to other
478 * peers for any reason should increment this value such as to
479 * enable the origin to determine the actual network-only delay
480 * in addition to the real-time delay (assuming the message loops
481 * back to the origin).
483 struct GNUNET_TIME_Relative cummulative_non_network_delay;
486 * Identity of the peer that started this learning activity.
488 struct GNUNET_PeerIdentity initiator;
490 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
491 excluding the initiator of the DV trace; the last entry is the
492 current sender; the current peer must not be included. */
498 * Outer layer of an encapsulated message send over multiple hops.
499 * The path given only includes the identities of the subsequent
500 * peers, i.e. it will be empty if we are the receiver. Each
501 * forwarding peer should scan the list from the end, and if it can,
502 * forward to the respective peer. The list should then be shortened
503 * by all the entries up to and including that peer. Each hop should
504 * also increment @e total_hops to allow the receiver to get a precise
505 * estimate on the number of hops the message travelled. Senders must
506 * provide a learned path that thus should work, but intermediaries
507 * know of a shortcut, they are allowed to send the message via that
510 * If a peer finds itself still on the list, it must drop the message.
512 struct TransportDVBox
515 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
517 struct GNUNET_MessageHeader header;
520 * Number of total hops this messages travelled. In NBO.
521 * @e origin sets this to zero, to be incremented at
524 uint16_t total_hops GNUNET_PACKED;
527 * Number of hops this messages includes. In NBO.
529 uint16_t num_hops GNUNET_PACKED;
532 * Identity of the peer that originated the message.
534 struct GNUNET_PeerIdentity origin;
536 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
537 excluding the @e origin and the current peer, the last must be
538 the ultimate target; if @e num_hops is zero, the receiver of this
539 message is the ultimate target. */
541 /* Followed by the actual message, which itself may be
542 another box, but not a DV_LEARN or DV_BOX message! */
546 GNUNET_NETWORK_STRUCT_END
551 * What type of client is the `struct TransportClient` about?
556 * We do not know yet (client is fresh).
561 * Is the CORE service, we need to forward traffic to it.
566 * It is a monitor, forward monitor data.
571 * It is a communicator, use for communication.
578 * Entry in our cache of ephemeral keys we currently use.
579 * This way, we only sign an ephemeral once per @e target,
580 * and then can re-use it over multiple
581 * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION
582 * messages (as signing is expensive).
584 struct EphemeralCacheEntry
588 * Target's peer identity (we don't re-use ephemerals
589 * to limit linkability of messages).
591 struct GNUNET_PeerIdentity target;
594 * Signature affirming @e ephemeral_key of type
595 * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
597 struct GNUNET_CRYPTO_EddsaSignature sender_sig;
600 * How long is @e sender_sig valid
602 struct GNUNET_TIME_Absolute ephemeral_validity;
607 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
610 * Our private ephemeral key.
612 struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
615 * Node in the ephemeral cache for this entry.
616 * Used for expiration.
618 struct GNUNET_CONTAINER_HeapNode *hn;
623 * Client connected to the transport service.
625 struct TransportClient;
629 * A neighbour that at least one communicator is connected to.
635 * Entry in our #dv_routes table, representing a (set of) distance
636 * vector routes to a particular peer.
638 struct DistanceVector;
641 * One possible hop towards a DV target.
643 struct DistanceVectorHop
647 * Kept in a MDLL, sorted by @e timeout.
649 struct DistanceVectorHop *next_dv;
652 * Kept in a MDLL, sorted by @e timeout.
654 struct DistanceVectorHop *prev_dv;
659 struct DistanceVectorHop *next_neighbour;
664 struct DistanceVectorHop *prev_neighbour;
667 * What would be the next hop to @e target?
669 struct Neighbour *next_hop;
672 * Distance vector entry this hop belongs with.
674 struct DistanceVector *dv;
677 * Array of @e distance hops to the target, excluding @e next_hop.
678 * NULL if the entire path is us to @e next_hop to `target`. Allocated
679 * at the end of this struct.
681 const struct GNUNET_PeerIdentity *path;
684 * At what time do we forget about this path unless we see it again
687 struct GNUNET_TIME_Absolute timeout;
690 * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
691 * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
693 unsigned int distance;
698 * Entry in our #dv_routes table, representing a (set of) distance
699 * vector routes to a particular peer.
701 struct DistanceVector
705 * To which peer is this a route?
707 struct GNUNET_PeerIdentity target;
710 * Known paths to @e target.
712 struct DistanceVectorHop *dv_head;
715 * Known paths to @e target.
717 struct DistanceVectorHop *dv_tail;
720 * Task scheduled to purge expired paths from @e dv_head MDLL.
722 struct GNUNET_SCHEDULER_Task *timeout_task;
727 * Entry identifying transmission in one of our `struct
728 * GNUNET_ATS_Sessions` which still awaits an ACK. This is used to
729 * ensure we do not overwhelm a communicator and limit the number of
730 * messages outstanding per communicator (say in case communicator is
731 * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
732 * what the communicator can actually provide towards a particular
741 struct QueueEntry *next;
746 struct QueueEntry *prev;
749 * ATS session this entry is queued with.
751 struct GNUNET_ATS_Session *session;
754 * Message ID used for this message with the queue used for transmission.
761 * An ATS session is a message queue provided by a communicator
762 * via which we can reach a particular neighbour.
764 struct GNUNET_ATS_Session
769 struct GNUNET_ATS_Session *next_neighbour;
774 struct GNUNET_ATS_Session *prev_neighbour;
779 struct GNUNET_ATS_Session *prev_client;
784 struct GNUNET_ATS_Session *next_client;
787 * Head of DLL of unacked transmission requests.
789 struct QueueEntry *queue_head;
792 * End of DLL of unacked transmission requests.
794 struct QueueEntry *queue_tail;
797 * Which neighbour is this ATS session for?
799 struct Neighbour *neighbour;
802 * Which communicator offers this ATS session?
804 struct TransportClient *tc;
807 * Address served by the ATS session.
812 * Handle by which we inform ATS about this queue.
814 struct GNUNET_ATS_SessionRecord *sr;
817 * Task scheduled for the time when this queue can (likely) transmit the
818 * next message. Still needs to check with the @e tracker_out to be sure.
820 struct GNUNET_SCHEDULER_Task *transmit_task;
823 * Our current RTT estimate for this ATS session.
825 struct GNUNET_TIME_Relative rtt;
828 * Message ID generator for transmissions on this queue.
833 * Unique identifier of this ATS session with the communicator.
838 * Maximum transmission unit supported by this ATS session.
843 * Distance to the target of this ATS session.
850 uint32_t num_msg_pending;
855 uint32_t num_bytes_pending;
858 * Length of the DLL starting at @e queue_head.
860 unsigned int queue_length;
863 * Network type offered by this ATS session.
865 enum GNUNET_NetworkType nt;
868 * Connection status for this ATS session.
870 enum GNUNET_TRANSPORT_ConnectionStatus cs;
873 * How much outbound bandwidth do we have available for this session?
875 struct GNUNET_BANDWIDTH_Tracker tracker_out;
878 * How much inbound bandwidth do we have available for this session?
880 struct GNUNET_BANDWIDTH_Tracker tracker_in;
885 * Information we keep for a message that we are reassembling.
887 struct ReassemblyContext
891 * Original message ID for of the message that all the
892 * fragments belong to.
894 struct GNUNET_ShortHashCode msg_uuid;
897 * Which neighbour is this context for?
899 struct Neighbour *neighbour;
902 * Entry in the reassembly heap (sorted by expiration).
904 struct GNUNET_CONTAINER_HeapNode *hn;
907 * Bitfield with @e msg_size bits representing the positions
908 * where we have received fragments. When we receive a fragment,
909 * we check the bits in @e bitfield before incrementing @e msg_missing.
911 * Allocated after the reassembled message.
916 * Task for sending ACK. We may send ACKs either because of hitting
917 * the @e extra_acks limit, or based on time and @e num_acks. This
918 * task is for the latter case.
920 struct GNUNET_SCHEDULER_Task *ack_task;
923 * At what time will we give up reassembly of this message?
925 struct GNUNET_TIME_Absolute reassembly_timeout;
928 * Average delay of all acks in @e extra_acks and @e frag_uuid.
929 * Should be reset to zero when @e num_acks is set to 0.
931 struct GNUNET_TIME_Relative avg_ack_delay;
934 * Time we received the last fragment. @e avg_ack_delay must be
935 * incremented by now - @e last_frag multiplied by @e num_acks.
937 struct GNUNET_TIME_Absolute last_frag;
940 * Bitfield of up to 64 additional fragments following @e frag_uuid
941 * to be acknowledged in the next cummulative ACK.
946 * Unique ID of the lowest fragment UUID to be acknowledged in the
947 * next cummulative ACK. Only valid if @e num_acks > 0.
952 * Number of ACKs we have accumulated so far. Reset to 0
953 * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
955 unsigned int num_acks;
958 * How big is the message we are reassembling in total?
963 * How many bytes of the message are still missing? Defragmentation
964 * is complete when @e msg_missing == 0.
966 uint16_t msg_missing;
968 /* Followed by @e msg_size bytes of the (partially) defragmented original message */
970 /* Followed by @e bitfield data */
975 * A neighbour that at least one communicator is connected to.
981 * Which peer is this about?
983 struct GNUNET_PeerIdentity pid;
986 * Map with `struct ReassemblyContext` structs for fragments under
987 * reassembly. May be NULL if we currently have no fragments from
988 * this @e pid (lazy initialization).
990 struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
993 * Heap with `struct ReassemblyContext` structs for fragments under
994 * reassembly. May be NULL if we currently have no fragments from
995 * this @e pid (lazy initialization).
997 struct GNUNET_CONTAINER_Heap *reassembly_heap;
1000 * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
1002 struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
1005 * Head of list of messages pending for this neighbour.
1007 struct PendingMessage *pending_msg_head;
1010 * Tail of list of messages pending for this neighbour.
1012 struct PendingMessage *pending_msg_tail;
1015 * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1016 * purged if this neighbour goes down.
1018 struct DistanceVectorHop *dv_head;
1021 * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1022 * purged if this neighbour goes down.
1024 struct DistanceVectorHop *dv_tail;
1027 * Head of DLL of ATS sessions to this peer.
1029 struct GNUNET_ATS_Session *session_head;
1032 * Tail of DLL of ATS sessions to this peer.
1034 struct GNUNET_ATS_Session *session_tail;
1037 * Task run to cleanup pending messages that have exceeded their timeout.
1039 struct GNUNET_SCHEDULER_Task *timeout_task;
1042 * Quota at which CORE is allowed to transmit to this peer
1045 * FIXME: not yet used, tricky to get right given multiple queues!
1046 * (=> Idea: let ATS set a quota per queue and we add them up here?)
1047 * FIXME: how do we set this value initially when we tell CORE?
1048 * Options: start at a minimum value or at literally zero (before ATS?)
1049 * (=> Current thought: clean would be zero!)
1051 struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1054 * What is the earliest timeout of any message in @e pending_msg_tail?
1056 struct GNUNET_TIME_Absolute earliest_timeout;
1062 * Types of different pending messages.
1064 enum PendingMessageType
1068 * Ordinary message received from the CORE service.
1075 PMT_FRAGMENT_BOX = 1,
1080 PMT_RELIABILITY_BOX = 2,
1083 * Any type of acknowledgement.
1085 PMT_ACKNOWLEDGEMENT = 3
1092 * Transmission request that is awaiting delivery. The original
1093 * transmission requests from CORE may be too big for some queues.
1094 * In this case, a *tree* of fragments is created. At each
1095 * level of the tree, fragments are kept in a DLL ordered by which
1096 * fragment should be sent next (at the head). The tree is searched
1097 * top-down, with the original message at the root.
1099 * To select a node for transmission, first it is checked if the
1100 * current node's message fits with the MTU. If it does not, we
1101 * either calculate the next fragment (based on @e frag_off) from the
1102 * current node, or, if all fragments have already been created,
1103 * descend to the @e head_frag. Even though the node was already
1104 * fragmented, the fragment may be too big if the fragment was
1105 * generated for a queue with a larger MTU. In this case, the node
1106 * may be fragmented again, thus creating a tree.
1108 * When acknowledgements for fragments are received, the tree
1109 * must be pruned, removing those parts that were already
1110 * acknowledged. When fragments are sent over a reliable
1111 * channel, they can be immediately removed.
1113 * If a message is ever fragmented, then the original "full" message
1114 * is never again transmitted (even if it fits below the MTU), and
1115 * only (remaining) fragments are sent.
1117 struct PendingMessage
1120 * Kept in a MDLL of messages for this @a target.
1122 struct PendingMessage *next_neighbour;
1125 * Kept in a MDLL of messages for this @a target.
1127 struct PendingMessage *prev_neighbour;
1130 * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1132 struct PendingMessage *next_client;
1135 * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1137 struct PendingMessage *prev_client;
1140 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1142 struct PendingMessage *next_frag;
1145 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX)
1147 struct PendingMessage *prev_frag;
1150 * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1152 struct PendingMessage *bpm;
1155 * Target of the request.
1157 struct Neighbour *target;
1160 * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1162 struct TransportClient *client;
1165 * Head of a MDLL of fragments created for this core message.
1167 struct PendingMessage *head_frag;
1170 * Tail of a MDLL of fragments created for this core message.
1172 struct PendingMessage *tail_frag;
1175 * Our parent in the fragmentation tree.
1177 struct PendingMessage *frag_parent;
1180 * At what time should we give up on the transmission (and no longer retry)?
1182 struct GNUNET_TIME_Absolute timeout;
1185 * What is the earliest time for us to retry transmission of this message?
1187 struct GNUNET_TIME_Absolute next_attempt;
1190 * UUID to use for this message (used for reassembly of fragments, only
1191 * initialized if @e msg_uuid_set is #GNUNET_YES).
1193 struct GNUNET_ShortHashCode msg_uuid;
1196 * Counter incremented per generated fragment.
1198 uint32_t frag_uuidgen;
1201 * Type of the pending message.
1203 enum PendingMessageType pmt;
1206 * Size of the original message.
1211 * Offset at which we should generate the next fragment.
1216 * #GNUNET_YES once @e msg_uuid was initialized
1218 int16_t msg_uuid_set;
1220 /* Followed by @e bytes_msg to transmit */
1225 * One of the addresses of this peer.
1227 struct AddressListEntry
1233 struct AddressListEntry *next;
1238 struct AddressListEntry *prev;
1241 * Which communicator provides this address?
1243 struct TransportClient *tc;
1246 * The actual address.
1248 const char *address;
1251 * Current context for storing this address in the peerstore.
1253 struct GNUNET_PEERSTORE_StoreContext *sc;
1256 * Task to periodically do @e st operation.
1258 struct GNUNET_SCHEDULER_Task *st;
1261 * What is a typical lifetime the communicator expects this
1262 * address to have? (Always from now.)
1264 struct GNUNET_TIME_Relative expiration;
1267 * Address identifier used by the communicator.
1272 * Network type offered by this address.
1274 enum GNUNET_NetworkType nt;
1280 * Client connected to the transport service.
1282 struct TransportClient
1288 struct TransportClient *next;
1293 struct TransportClient *prev;
1296 * Handle to the client.
1298 struct GNUNET_SERVICE_Client *client;
1301 * Message queue to the client.
1303 struct GNUNET_MQ_Handle *mq;
1306 * What type of client is this?
1308 enum ClientType type;
1314 * Information for @e type #CT_CORE.
1319 * Head of list of messages pending for this client, sorted by
1320 * transmission time ("next_attempt" + possibly internal prioritization).
1322 struct PendingMessage *pending_msg_head;
1325 * Tail of list of messages pending for this client.
1327 struct PendingMessage *pending_msg_tail;
1332 * Information for @e type #CT_MONITOR.
1337 * Peer identity to monitor the addresses of.
1338 * Zero to monitor all neighbours. Valid if
1339 * @e type is #CT_MONITOR.
1341 struct GNUNET_PeerIdentity peer;
1344 * Is this a one-shot monitor?
1352 * Information for @e type #CT_COMMUNICATOR.
1356 * If @e type is #CT_COMMUNICATOR, this communicator
1357 * supports communicating using these addresses.
1359 char *address_prefix;
1362 * Head of DLL of queues offered by this communicator.
1364 struct GNUNET_ATS_Session *session_head;
1367 * Tail of DLL of queues offered by this communicator.
1369 struct GNUNET_ATS_Session *session_tail;
1372 * Head of list of the addresses of this peer offered by this communicator.
1374 struct AddressListEntry *addr_head;
1377 * Tail of list of the addresses of this peer offered by this communicator.
1379 struct AddressListEntry *addr_tail;
1382 * Number of queue entries in all queues to this communicator. Used
1383 * throttle sending to a communicator if we see that the communicator
1384 * is globally unable to keep up.
1386 unsigned int total_queue_length;
1389 * Characteristics of this communicator.
1391 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1401 * Head of linked list of all clients to this service.
1403 static struct TransportClient *clients_head;
1406 * Tail of linked list of all clients to this service.
1408 static struct TransportClient *clients_tail;
1411 * Statistics handle.
1413 static struct GNUNET_STATISTICS_Handle *GST_stats;
1416 * Configuration handle.
1418 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1423 static struct GNUNET_PeerIdentity GST_my_identity;
1428 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1431 * Map from PIDs to `struct Neighbour` entries. A peer is
1432 * a neighbour if we have an MQ to it from some communicator.
1434 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1437 * Map from PIDs to `struct DistanceVector` entries describing
1438 * known paths to the peer.
1440 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1443 * Database for peer's HELLOs.
1445 static struct GNUNET_PEERSTORE_Handle *peerstore;
1448 * Heap sorting `struct EphemeralCacheEntry` by their
1449 * key/signature validity.
1451 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1454 * Hash map for looking up `struct EphemeralCacheEntry`s
1455 * by peer identity. (We may have ephemerals in our
1456 * cache for which we do not have a neighbour entry,
1457 * and similar many neighbours may not need ephemerals,
1458 * so we use a second map.)
1460 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1463 * Task to free expired ephemerals.
1465 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
1468 * Our connection to ATS for allocation and bootstrapping.
1470 static struct GNUNET_ATS_TransportHandle *ats;
1474 * Free cached ephemeral key.
1476 * @param ece cached signature to free
1479 free_ephemeral (struct EphemeralCacheEntry *ece)
1481 GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1484 GNUNET_CONTAINER_heap_remove_node (ece->hn);
1490 * Lookup neighbour record for peer @a pid.
1492 * @param pid neighbour to look for
1493 * @return NULL if we do not have this peer as a neighbour
1495 static struct Neighbour *
1496 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1498 return GNUNET_CONTAINER_multipeermap_get (neighbours,
1504 * Details about what to notify monitors about.
1509 * @deprecated To be discussed if we keep these...
1511 struct GNUNET_TIME_Absolute last_validation;
1512 struct GNUNET_TIME_Absolute valid_until;
1513 struct GNUNET_TIME_Absolute next_validation;
1516 * Current round-trip time estimate.
1518 struct GNUNET_TIME_Relative rtt;
1521 * Connection status.
1523 enum GNUNET_TRANSPORT_ConnectionStatus cs;
1528 uint32_t num_msg_pending;
1533 uint32_t num_bytes_pending;
1540 * Free a @dvh, and if it is the last path to the `target`,also
1541 * free the associated DV entry in #dv_routes.
1543 * @param dvh hop to free
1546 free_distance_vector_hop (struct DistanceVectorHop *dvh)
1548 struct Neighbour *n = dvh->next_hop;
1549 struct DistanceVector *dv = dvh->dv;
1551 GNUNET_CONTAINER_MDLL_remove (neighbour,
1555 GNUNET_CONTAINER_MDLL_remove (dv,
1560 if (NULL == dv->dv_head)
1562 GNUNET_assert (GNUNET_YES ==
1563 GNUNET_CONTAINER_multipeermap_remove (dv_routes,
1566 if (NULL != dv->timeout_task)
1567 GNUNET_SCHEDULER_cancel (dv->timeout_task);
1574 * Free entry in #dv_routes. First frees all hops to the target, and
1575 * the last target will implicitly free @a dv as well.
1577 * @param dv route to free
1580 free_dv_route (struct DistanceVector *dv)
1582 struct DistanceVectorHop *dvh;
1584 while (NULL != (dvh = dv->dv_head))
1585 free_distance_vector_hop (dvh);
1590 * Notify monitor @a tc about an event. That @a tc
1591 * cares about the event has already been checked.
1593 * Send @a tc information in @a me about a @a peer's status with
1594 * respect to some @a address to all monitors that care.
1596 * @param tc monitor to inform
1597 * @param peer peer the information is about
1598 * @param address address the information is about
1599 * @param nt network type associated with @a address
1600 * @param me detailed information to transmit
1603 notify_monitor (struct TransportClient *tc,
1604 const struct GNUNET_PeerIdentity *peer,
1605 const char *address,
1606 enum GNUNET_NetworkType nt,
1607 const struct MonitorEvent *me)
1609 struct GNUNET_MQ_Envelope *env;
1610 struct GNUNET_TRANSPORT_MonitorData *md;
1611 size_t addr_len = strlen (address) + 1;
1613 env = GNUNET_MQ_msg_extra (md,
1615 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1616 md->nt = htonl ((uint32_t) nt);
1618 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1619 md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1620 md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1621 md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1622 md->cs = htonl ((uint32_t) me->cs);
1623 md->num_msg_pending = htonl (me->num_msg_pending);
1624 md->num_bytes_pending = htonl (me->num_bytes_pending);
1628 GNUNET_MQ_send (tc->mq,
1634 * Send information in @a me about a @a peer's status with respect
1635 * to some @a address to all monitors that care.
1637 * @param peer peer the information is about
1638 * @param address address the information is about
1639 * @param nt network type associated with @a address
1640 * @param me detailed information to transmit
1643 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1644 const char *address,
1645 enum GNUNET_NetworkType nt,
1646 const struct MonitorEvent *me)
1648 static struct GNUNET_PeerIdentity zero;
1650 for (struct TransportClient *tc = clients_head;
1654 if (CT_MONITOR != tc->type)
1656 if (tc->details.monitor.one_shot)
1658 if ( (0 != memcmp (&tc->details.monitor.peer,
1661 (0 != memcmp (&tc->details.monitor.peer,
1675 * Called whenever a client connects. Allocates our
1676 * data structures associated with that client.
1678 * @param cls closure, NULL
1679 * @param client identification of the client
1680 * @param mq message queue for the client
1681 * @return our `struct TransportClient`
1684 client_connect_cb (void *cls,
1685 struct GNUNET_SERVICE_Client *client,
1686 struct GNUNET_MQ_Handle *mq)
1688 struct TransportClient *tc;
1690 tc = GNUNET_new (struct TransportClient);
1691 tc->client = client;
1693 GNUNET_CONTAINER_DLL_insert (clients_head,
1696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697 "Client %p connected\n",
1706 * @param rc data structure to free
1709 free_reassembly_context (struct ReassemblyContext *rc)
1711 struct Neighbour *n = rc->neighbour;
1713 GNUNET_assert (rc ==
1714 GNUNET_CONTAINER_heap_remove_node (rc->hn));
1715 GNUNET_assert (GNUNET_OK ==
1716 GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
1724 * Task run to clean up reassembly context of a neighbour that have expired.
1726 * @param cls a `struct Neighbour`
1729 reassembly_cleanup_task (void *cls)
1731 struct Neighbour *n = cls;
1732 struct ReassemblyContext *rc;
1734 n->reassembly_timeout_task = NULL;
1735 while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
1737 if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
1739 free_reassembly_context (rc);
1742 GNUNET_assert (NULL == n->reassembly_timeout_task);
1743 n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
1744 &reassembly_cleanup_task,
1752 * function called to #free_reassembly_context().
1756 * @param value a `struct ReassemblyContext` to free
1757 * @return #GNUNET_OK (continue iteration)
1760 free_reassembly_cb (void *cls,
1761 const struct GNUNET_ShortHashCode *key,
1764 struct ReassemblyContext *rc = value;
1768 free_reassembly_context (rc);
1774 * Release memory used by @a neighbour.
1776 * @param neighbour neighbour entry to free
1779 free_neighbour (struct Neighbour *neighbour)
1781 struct DistanceVectorHop *dvh;
1783 GNUNET_assert (NULL == neighbour->session_head);
1784 GNUNET_assert (GNUNET_YES ==
1785 GNUNET_CONTAINER_multipeermap_remove (neighbours,
1788 if (NULL != neighbour->timeout_task)
1789 GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1790 if (NULL != neighbour->reassembly_map)
1792 GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
1793 &free_reassembly_cb,
1795 GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
1796 neighbour->reassembly_map = NULL;
1797 GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
1798 neighbour->reassembly_heap = NULL;
1800 while (NULL != (dvh = neighbour->dv_head))
1801 free_distance_vector_hop (dvh);
1802 if (NULL != neighbour->reassembly_timeout_task)
1803 GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
1804 GNUNET_free (neighbour);
1809 * Send message to CORE clients that we lost a connection.
1811 * @param tc client to inform (must be CORE client)
1812 * @param pid peer the connection is for
1813 * @param quota_out current quota for the peer
1816 core_send_connect_info (struct TransportClient *tc,
1817 const struct GNUNET_PeerIdentity *pid,
1818 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1820 struct GNUNET_MQ_Envelope *env;
1821 struct ConnectInfoMessage *cim;
1823 GNUNET_assert (CT_CORE == tc->type);
1824 env = GNUNET_MQ_msg (cim,
1825 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1826 cim->quota_out = quota_out;
1828 GNUNET_MQ_send (tc->mq,
1834 * Send message to CORE clients that we gained a connection
1836 * @param pid peer the queue was for
1837 * @param quota_out current quota for the peer
1840 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1841 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1843 for (struct TransportClient *tc = clients_head;
1847 if (CT_CORE != tc->type)
1849 core_send_connect_info (tc,
1857 * Send message to CORE clients that we lost a connection.
1859 * @param pid peer the connection was for
1862 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1864 for (struct TransportClient *tc = clients_head;
1868 struct GNUNET_MQ_Envelope *env;
1869 struct DisconnectInfoMessage *dim;
1871 if (CT_CORE != tc->type)
1873 env = GNUNET_MQ_msg (dim,
1874 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1876 GNUNET_MQ_send (tc->mq,
1883 * We believe we are ready to transmit a message on a queue. Double-checks
1884 * with the queue's "tracker_out" and then gives the message to the
1885 * communicator for transmission (updating the tracker, and re-scheduling
1886 * itself if applicable).
1888 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1891 transmit_on_queue (void *cls);
1895 * Schedule next run of #transmit_on_queue(). Does NOTHING if
1896 * we should run immediately or if the message queue is empty.
1897 * Test for no task being added AND queue not being empty to
1898 * transmit immediately afterwards! This function must only
1899 * be called if the message queue is non-empty!
1901 * @param queue the queue to do scheduling for
1904 schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1906 struct Neighbour *n = queue->neighbour;
1907 struct PendingMessage *pm = n->pending_msg_head;
1908 struct GNUNET_TIME_Relative out_delay;
1911 GNUNET_assert (NULL != pm);
1912 if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1914 GNUNET_STATISTICS_update (GST_stats,
1915 "# Transmission throttled due to communicator queue limit",
1920 if (queue->queue_length >= SESSION_QUEUE_LIMIT)
1922 GNUNET_STATISTICS_update (GST_stats,
1923 "# Transmission throttled due to session queue limit",
1929 wsize = (0 == queue->mtu)
1930 ? pm->bytes_msg /* FIXME: add overheads? */
1932 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1934 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1936 if (0 == out_delay.rel_value_us)
1937 return; /* we should run immediately! */
1938 /* queue has changed since we were scheduled, reschedule again */
1939 queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1942 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1943 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1944 "Next transmission on queue `%s' in %s (high delay)\n",
1946 GNUNET_STRINGS_relative_time_to_string (out_delay,
1949 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1950 "Next transmission on queue `%s' in %s\n",
1952 GNUNET_STRINGS_relative_time_to_string (out_delay,
1960 * @param session the session to free
1963 free_session (struct GNUNET_ATS_Session *session)
1965 struct Neighbour *neighbour = session->neighbour;
1966 struct TransportClient *tc = session->tc;
1967 struct MonitorEvent me = {
1968 .cs = GNUNET_TRANSPORT_CS_DOWN,
1969 .rtt = GNUNET_TIME_UNIT_FOREVER_REL
1971 struct QueueEntry *qe;
1974 if (NULL != session->transmit_task)
1976 GNUNET_SCHEDULER_cancel (session->transmit_task);
1977 session->transmit_task = NULL;
1979 GNUNET_CONTAINER_MDLL_remove (neighbour,
1980 neighbour->session_head,
1981 neighbour->session_tail,
1983 GNUNET_CONTAINER_MDLL_remove (client,
1984 tc->details.communicator.session_head,
1985 tc->details.communicator.session_tail,
1987 maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
1988 while (NULL != (qe = session->queue_head))
1990 GNUNET_CONTAINER_DLL_remove (session->queue_head,
1991 session->queue_tail,
1993 session->queue_length--;
1994 tc->details.communicator.total_queue_length--;
1997 GNUNET_assert (0 == session->queue_length);
1999 (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2001 /* Communicator dropped below threshold, resume all queues */
2002 GNUNET_STATISTICS_update (GST_stats,
2003 "# Transmission throttled due to communicator queue limit",
2006 for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
2009 schedule_transmit_on_queue (s);
2011 notify_monitors (&neighbour->pid,
2015 GNUNET_ATS_session_del (session->sr);
2016 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
2017 GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
2018 GNUNET_free (session);
2019 if (NULL == neighbour->session_head)
2021 cores_send_disconnect_info (&neighbour->pid);
2022 free_neighbour (neighbour);
2030 * @param ale address list entry to free
2033 free_address_list_entry (struct AddressListEntry *ale)
2035 struct TransportClient *tc = ale->tc;
2037 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2038 tc->details.communicator.addr_tail,
2040 if (NULL != ale->sc)
2042 GNUNET_PEERSTORE_store_cancel (ale->sc);
2045 if (NULL != ale->st)
2047 GNUNET_SCHEDULER_cancel (ale->st);
2055 * Called whenever a client is disconnected. Frees our
2056 * resources associated with that client.
2058 * @param cls closure, NULL
2059 * @param client identification of the client
2060 * @param app_ctx our `struct TransportClient`
2063 client_disconnect_cb (void *cls,
2064 struct GNUNET_SERVICE_Client *client,
2067 struct TransportClient *tc = app_ctx;
2069 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2070 "Client %p disconnected, cleaning up.\n",
2072 GNUNET_CONTAINER_DLL_remove (clients_head,
2081 struct PendingMessage *pm;
2083 while (NULL != (pm = tc->details.core.pending_msg_head))
2085 GNUNET_CONTAINER_MDLL_remove (client,
2086 tc->details.core.pending_msg_head,
2087 tc->details.core.pending_msg_tail,
2095 case CT_COMMUNICATOR:
2097 struct GNUNET_ATS_Session *q;
2098 struct AddressListEntry *ale;
2100 while (NULL != (q = tc->details.communicator.session_head))
2102 while (NULL != (ale = tc->details.communicator.addr_head))
2103 free_address_list_entry (ale);
2104 GNUNET_free (tc->details.communicator.address_prefix);
2113 * Iterator telling new CORE client about all existing
2114 * connections to peers.
2116 * @param cls the new `struct TransportClient`
2117 * @param pid a connected peer
2118 * @param value the `struct Neighbour` with more information
2119 * @return #GNUNET_OK (continue to iterate)
2122 notify_client_connect_info (void *cls,
2123 const struct GNUNET_PeerIdentity *pid,
2126 struct TransportClient *tc = cls;
2127 struct Neighbour *neighbour = value;
2129 core_send_connect_info (tc,
2131 neighbour->quota_out);
2137 * Initialize a "CORE" client. We got a start message from this
2138 * client, so add it to the list of clients for broadcasting of
2141 * @param cls the client
2142 * @param start the start message that was sent
2145 handle_client_start (void *cls,
2146 const struct StartMessage *start)
2148 struct TransportClient *tc = cls;
2151 options = ntohl (start->options);
2152 if ( (0 != (1 & options)) &&
2154 memcmp (&start->self,
2156 sizeof (struct GNUNET_PeerIdentity)) ) )
2158 /* client thinks this is a different peer, reject */
2160 GNUNET_SERVICE_client_drop (tc->client);
2163 if (CT_NONE != tc->type)
2166 GNUNET_SERVICE_client_drop (tc->client);
2170 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2171 ¬ify_client_connect_info,
2173 GNUNET_SERVICE_client_continue (tc->client);
2178 * Client asked for transmission to a peer. Process the request.
2180 * @param cls the client
2181 * @param obm the send message that was sent
2184 check_client_send (void *cls,
2185 const struct OutboundMessage *obm)
2187 struct TransportClient *tc = cls;
2189 const struct GNUNET_MessageHeader *obmm;
2191 if (CT_CORE != tc->type)
2194 return GNUNET_SYSERR;
2196 size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2197 if (size < sizeof (struct GNUNET_MessageHeader))
2200 return GNUNET_SYSERR;
2202 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2203 if (size != ntohs (obmm->size))
2206 return GNUNET_SYSERR;
2213 * Free fragment tree below @e root, excluding @e root itself.
2215 * @param root root of the tree to free
2218 free_fragment_tree (struct PendingMessage *root)
2220 struct PendingMessage *frag;
2222 while (NULL != (frag = root->head_frag))
2224 free_fragment_tree (frag);
2225 GNUNET_CONTAINER_MDLL_remove (frag,
2235 * Release memory associated with @a pm and remove @a pm from associated
2236 * data structures. @a pm must be a top-level pending message and not
2237 * a fragment in the tree. The entire tree is freed (if applicable).
2239 * @param pm the pending message to free
2242 free_pending_message (struct PendingMessage *pm)
2244 struct TransportClient *tc = pm->client;
2245 struct Neighbour *target = pm->target;
2249 GNUNET_CONTAINER_MDLL_remove (client,
2250 tc->details.core.pending_msg_head,
2251 tc->details.core.pending_msg_tail,
2254 GNUNET_CONTAINER_MDLL_remove (neighbour,
2255 target->pending_msg_head,
2256 target->pending_msg_tail,
2258 free_fragment_tree (pm);
2259 GNUNET_free_non_null (pm->bpm);
2265 * Send a response to the @a pm that we have processed a
2266 * "send" request with status @a success. We
2267 * transmitted @a bytes_physical on the actual wire.
2268 * Sends a confirmation to the "core" client responsible
2269 * for the original request and free's @a pm.
2271 * @param pm handle to the original pending message
2272 * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2273 * for transmission failure
2274 * @param bytes_physical amount of bandwidth consumed
2277 client_send_response (struct PendingMessage *pm,
2279 uint32_t bytes_physical)
2281 struct TransportClient *tc = pm->client;
2282 struct Neighbour *target = pm->target;
2283 struct GNUNET_MQ_Envelope *env;
2284 struct SendOkMessage *som;
2288 env = GNUNET_MQ_msg (som,
2289 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2290 som->success = htonl ((uint32_t) success);
2291 som->bytes_msg = htons (pm->bytes_msg);
2292 som->bytes_physical = htonl (bytes_physical);
2293 som->peer = target->pid;
2294 GNUNET_MQ_send (tc->mq,
2297 free_pending_message (pm);
2302 * Checks the message queue for a neighbour for messages that have timed
2303 * out and purges them.
2305 * @param cls a `struct Neighbour`
2308 check_queue_timeouts (void *cls)
2310 struct Neighbour *n = cls;
2311 struct PendingMessage *pm;
2312 struct GNUNET_TIME_Absolute now;
2313 struct GNUNET_TIME_Absolute earliest_timeout;
2315 n->timeout_task = NULL;
2316 earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2317 now = GNUNET_TIME_absolute_get ();
2318 for (struct PendingMessage *pos = n->pending_msg_head;
2322 pm = pos->next_neighbour;
2323 if (pos->timeout.abs_value_us <= now.abs_value_us)
2325 GNUNET_STATISTICS_update (GST_stats,
2326 "# messages dropped (timeout before confirmation)",
2329 client_send_response (pm,
2334 earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2337 n->earliest_timeout = earliest_timeout;
2338 if (NULL != n->pending_msg_head)
2339 n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2340 &check_queue_timeouts,
2346 * Client asked for transmission to a peer. Process the request.
2348 * @param cls the client
2349 * @param obm the send message that was sent
2352 handle_client_send (void *cls,
2353 const struct OutboundMessage *obm)
2355 struct TransportClient *tc = cls;
2356 struct PendingMessage *pm;
2357 const struct GNUNET_MessageHeader *obmm;
2358 struct Neighbour *target;
2361 GNUNET_assert (CT_CORE == tc->type);
2362 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2363 bytes_msg = ntohs (obmm->size);
2364 target = lookup_neighbour (&obm->peer);
2367 /* Failure: don't have this peer as a neighbour (anymore).
2368 Might have gone down asynchronously, so this is NOT
2369 a protocol violation by CORE. Still count the event,
2370 as this should be rare. */
2371 struct GNUNET_MQ_Envelope *env;
2372 struct SendOkMessage *som;
2374 env = GNUNET_MQ_msg (som,
2375 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2376 som->success = htonl (GNUNET_SYSERR);
2377 som->bytes_msg = htonl (bytes_msg);
2378 som->bytes_physical = htonl (0);
2379 som->peer = obm->peer;
2380 GNUNET_MQ_send (tc->mq,
2382 GNUNET_SERVICE_client_continue (tc->client);
2383 GNUNET_STATISTICS_update (GST_stats,
2384 "# messages dropped (neighbour unknown)",
2389 pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2391 pm->target = target;
2392 pm->bytes_msg = bytes_msg;
2393 pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2397 GNUNET_CONTAINER_MDLL_insert (neighbour,
2398 target->pending_msg_head,
2399 target->pending_msg_tail,
2401 GNUNET_CONTAINER_MDLL_insert (client,
2402 tc->details.core.pending_msg_head,
2403 tc->details.core.pending_msg_tail,
2405 if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2407 target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2408 if (NULL != target->timeout_task)
2409 GNUNET_SCHEDULER_cancel (target->timeout_task);
2410 target->timeout_task
2411 = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2412 &check_queue_timeouts,
2419 * Communicator started. Test message is well-formed.
2421 * @param cls the client
2422 * @param cam the send message that was sent
2425 check_communicator_available (void *cls,
2426 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2428 struct TransportClient *tc = cls;
2431 if (CT_NONE != tc->type)
2434 return GNUNET_SYSERR;
2436 tc->type = CT_COMMUNICATOR;
2437 size = ntohs (cam->header.size) - sizeof (*cam);
2439 return GNUNET_OK; /* receive-only communicator */
2440 GNUNET_MQ_check_zero_termination (cam);
2446 * Communicator started. Process the request.
2448 * @param cls the client
2449 * @param cam the send message that was sent
2452 handle_communicator_available (void *cls,
2453 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2455 struct TransportClient *tc = cls;
2458 size = ntohs (cam->header.size) - sizeof (*cam);
2460 return; /* receive-only communicator */
2461 tc->details.communicator.address_prefix
2462 = GNUNET_strdup ((const char *) &cam[1]);
2463 tc->details.communicator.cc
2464 = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2465 GNUNET_SERVICE_client_continue (tc->client);
2470 * Communicator requests backchannel transmission. Check the request.
2472 * @param cls the client
2473 * @param cb the send message that was sent
2474 * @return #GNUNET_OK if message is well-formed
2477 check_communicator_backchannel (void *cls,
2478 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2480 const struct GNUNET_MessageHeader *inbox;
2485 msize = ntohs (cb->header.size) - sizeof (*cb);
2486 if (UINT16_MAX - msize >
2487 sizeof (struct TransportBackchannelEncapsulationMessage) +
2488 sizeof (struct TransportBackchannelRequestPayload) )
2491 return GNUNET_SYSERR;
2493 inbox = (const struct GNUNET_MessageHeader *) &cb[1];
2494 isize = ntohs (inbox->size);
2498 return GNUNET_SYSERR;
2500 is = (const char *) inbox;
2503 GNUNET_assert (msize > 0);
2504 if ('\0' != is[msize-1])
2507 return GNUNET_SYSERR;
2514 * Remove memory used by expired ephemeral keys.
2519 expire_ephemerals (void *cls)
2521 struct EphemeralCacheEntry *ece;
2524 ephemeral_task = NULL;
2525 while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
2527 if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
2529 free_ephemeral (ece);
2532 ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2541 * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
2542 * one, cache it and return it.
2544 * @param pid peer to look up ephemeral for
2545 * @param private_key[out] set to the private key
2546 * @param ephemeral_key[out] set to the key
2547 * @param ephemeral_sender_sig[out] set to the signature
2548 * @param ephemeral_validity[out] set to the validity expiration time
2551 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
2552 struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
2553 struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
2554 struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
2555 struct GNUNET_TIME_Absolute *ephemeral_validity)
2557 struct EphemeralCacheEntry *ece;
2558 struct EphemeralConfirmation ec;
2560 ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
2562 if ( (NULL != ece) &&
2563 (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
2565 free_ephemeral (ece);
2570 ece = GNUNET_new (struct EphemeralCacheEntry);
2572 ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
2573 EPHEMERAL_VALIDITY);
2574 GNUNET_assert (GNUNET_OK ==
2575 GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
2576 GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
2577 &ece->ephemeral_key);
2578 ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
2579 ec.purpose.size = htonl (sizeof (ec));
2581 ec.ephemeral_key = ece->ephemeral_key;
2582 GNUNET_assert (GNUNET_OK ==
2583 GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
2586 ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
2588 ece->ephemeral_validity.abs_value_us);
2589 GNUNET_assert (GNUNET_OK ==
2590 GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
2593 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2594 if (NULL == ephemeral_task)
2595 ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2599 *private_key = ece->private_key;
2600 *ephemeral_key = ece->ephemeral_key;
2601 *ephemeral_sender_sig = ece->sender_sig;
2602 *ephemeral_validity = ece->ephemeral_validity;
2607 * We need to transmit @a hdr to @a target. If necessary, this may
2608 * involve DV routing or even broadcasting and fragmentation.
2610 * @param target peer to receive @a hdr
2611 * @param hdr header of the message to route
2614 route_message (const struct GNUNET_PeerIdentity *target,
2615 struct GNUNET_MessageHeader *hdr)
2617 // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
2623 * Communicator requests backchannel transmission. Process the request.
2625 * @param cls the client
2626 * @param cb the send message that was sent
2629 handle_communicator_backchannel (void *cls,
2630 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2632 struct TransportClient *tc = cls;
2633 struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
2634 struct GNUNET_TIME_Absolute ephemeral_validity;
2635 struct TransportBackchannelEncapsulationMessage *enc;
2636 struct TransportBackchannelRequestPayload ppay;
2640 /* encapsulate and encrypt message */
2641 msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
2642 enc = GNUNET_malloc (sizeof (*enc) + msize);
2643 enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
2644 enc->header.size = htons (sizeof (*enc) + msize);
2645 enc->target = cb->pid;
2646 lookup_ephemeral (&cb->pid,
2648 &enc->ephemeral_key,
2650 &ephemeral_validity);
2651 // FIXME: setup 'iv'
2653 dh_key_derive (&private_key,
2658 ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
2659 ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
2660 mpos = (char *) &enc[1];
2669 ntohs (cb->header.size) - sizeof (*cb));
2673 route_message (&cb->pid,
2675 GNUNET_SERVICE_client_continue (tc->client);
2680 * Address of our peer added. Test message is well-formed.
2682 * @param cls the client
2683 * @param aam the send message that was sent
2684 * @return #GNUNET_OK if message is well-formed
2687 check_add_address (void *cls,
2688 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2690 struct TransportClient *tc = cls;
2692 if (CT_COMMUNICATOR != tc->type)
2695 return GNUNET_SYSERR;
2697 GNUNET_MQ_check_zero_termination (aam);
2703 * Ask peerstore to store our address.
2705 * @param cls an `struct AddressListEntry *`
2708 store_pi (void *cls);
2712 * Function called when peerstore is done storing our address.
2715 peerstore_store_cb (void *cls,
2718 struct AddressListEntry *ale = cls;
2721 if (GNUNET_YES != success)
2722 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2723 "Failed to store our own address `%s' in peerstore!\n",
2725 /* refresh period is 1/4 of expiration time, that should be plenty
2726 without being excessive. */
2727 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
2735 * Ask peerstore to store our address.
2737 * @param cls an `struct AddressListEntry *`
2740 store_pi (void *cls)
2742 struct AddressListEntry *ale = cls;
2745 struct GNUNET_TIME_Absolute expiration;
2748 expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
2749 GNUNET_HELLO_sign_address (ale->address,
2755 ale->sc = GNUNET_PEERSTORE_store (peerstore,
2758 GNUNET_HELLO_PEERSTORE_KEY,
2762 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
2763 &peerstore_store_cb,
2766 if (NULL == ale->sc)
2768 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2769 "Failed to store our address `%s' with peerstore\n",
2771 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
2779 * Address of our peer added. Process the request.
2781 * @param cls the client
2782 * @param aam the send message that was sent
2785 handle_add_address (void *cls,
2786 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2788 struct TransportClient *tc = cls;
2789 struct AddressListEntry *ale;
2792 slen = ntohs (aam->header.size) - sizeof (*aam);
2793 ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
2795 ale->address = (const char *) &ale[1];
2796 ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
2797 ale->aid = aam->aid;
2798 ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
2802 GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
2803 tc->details.communicator.addr_tail,
2805 ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
2807 GNUNET_SERVICE_client_continue (tc->client);
2812 * Address of our peer deleted. Process the request.
2814 * @param cls the client
2815 * @param dam the send message that was sent
2818 handle_del_address (void *cls,
2819 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
2821 struct TransportClient *tc = cls;
2823 if (CT_COMMUNICATOR != tc->type)
2826 GNUNET_SERVICE_client_drop (tc->client);
2829 for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
2833 if (dam->aid != ale->aid)
2835 GNUNET_assert (ale->tc == tc);
2836 free_address_list_entry (ale);
2837 GNUNET_SERVICE_client_continue (tc->client);
2840 GNUNET_SERVICE_client_drop (tc->client);
2845 * Context from #handle_incoming_msg(). Closure for many
2846 * message handlers below.
2848 struct CommunicatorMessageContext
2851 * Which communicator provided us with the message.
2853 struct TransportClient *tc;
2856 * Additional information for flow control and about the sender.
2858 struct GNUNET_TRANSPORT_IncomingMessage im;
2861 * Number of hops the message has travelled (if DV-routed).
2862 * FIXME: make use of this in ACK handling!
2864 uint16_t total_hops;
2869 * Given an inbound message @a msg from a communicator @a cmc,
2870 * demultiplex it based on the type calling the right handler.
2872 * @param cmc context for demultiplexing
2873 * @param msg message to demultiplex
2876 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
2877 const struct GNUNET_MessageHeader *msg);
2881 * Send ACK to communicator (if requested) and free @a cmc.
2883 * @param cmc context for which we are done handling the message
2886 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
2888 if (0 != ntohl (cmc->im.fc_on))
2890 /* send ACK when done to communicator for flow control! */
2891 struct GNUNET_MQ_Envelope *env;
2892 struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
2894 env = GNUNET_MQ_msg (ack,
2895 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
2896 ack->reserved = htonl (0);
2897 ack->fc_id = cmc->im.fc_id;
2898 ack->sender = cmc->im.sender;
2899 GNUNET_MQ_send (cmc->tc->mq,
2902 GNUNET_SERVICE_client_continue (cmc->tc->client);
2908 * Communicator gave us an unencapsulated message to pass as-is to
2909 * CORE. Process the request.
2911 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2912 * @param mh the message that was received
2915 handle_raw_message (void *cls,
2916 const struct GNUNET_MessageHeader *mh)
2918 struct CommunicatorMessageContext *cmc = cls;
2919 uint16_t size = ntohs (mh->size);
2921 if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
2922 (size < sizeof (struct GNUNET_MessageHeader)) )
2924 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
2927 finish_cmc_handling (cmc);
2928 GNUNET_SERVICE_client_drop (client);
2931 /* Forward to all CORE clients */
2932 for (struct TransportClient *tc = clients_head;
2936 struct GNUNET_MQ_Envelope *env;
2937 struct InboundMessage *im;
2939 if (CT_CORE != tc->type)
2941 env = GNUNET_MQ_msg_extra (im,
2943 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2944 im->peer = cmc->im.sender;
2948 GNUNET_MQ_send (tc->mq,
2951 /* FIXME: consider doing this _only_ once the message
2952 was drained from the CORE MQs to extend flow control to CORE!
2953 (basically, increment counter in cmc, decrement on MQ send continuation! */
2954 finish_cmc_handling (cmc);
2959 * Communicator gave us a fragment box. Check the message.
2961 * @param cls a `struct CommunicatorMessageContext`
2962 * @param fb the send message that was sent
2963 * @return #GNUNET_YES if message is well-formed
2966 check_fragment_box (void *cls,
2967 const struct TransportFragmentBox *fb)
2969 uint16_t size = ntohs (fb->header.size);
2970 uint16_t bsize = size - sizeof (*fb);
2974 GNUNET_break_op (0);
2975 return GNUNET_SYSERR;
2977 if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
2979 GNUNET_break_op (0);
2980 return GNUNET_SYSERR;
2982 if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
2984 GNUNET_break_op (0);
2985 return GNUNET_SYSERR;
2992 * Generate a fragment acknowledgement for an @a rc.
2994 * @param rc context to generate ACK for, @a rc ACK state is reset
2997 send_fragment_ack (struct ReassemblyContext *rc)
2999 struct TransportFragmentAckMessage *ack;
3001 ack = GNUNET_new (struct TransportFragmentAckMessage);
3002 ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3003 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3004 ack->frag_uuid = htonl (rc->frag_uuid);
3005 ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3006 ack->msg_uuid = rc->msg_uuid;
3007 ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3008 if (0 == rc->msg_missing)
3009 ack->reassembly_timeout
3010 = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3012 ack->reassembly_timeout
3013 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3014 route_message (&rc->neighbour->pid,
3016 rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3018 rc->extra_acks = 0LLU;
3023 * Communicator gave us a fragment. Process the request.
3025 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3026 * @param fb the message that was received
3029 handle_fragment_box (void *cls,
3030 const struct TransportFragmentBox *fb)
3032 struct CommunicatorMessageContext *cmc = cls;
3033 struct Neighbour *n;
3034 struct ReassemblyContext *rc;
3035 const struct GNUNET_MessageHeader *msg;
3041 struct GNUNET_TIME_Relative cdelay;
3044 n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3048 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3051 finish_cmc_handling (cmc);
3052 GNUNET_SERVICE_client_drop (client);
3055 if (NULL == n->reassembly_map)
3057 n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
3059 n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3060 n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
3061 &reassembly_cleanup_task,
3064 msize = ntohs (fb->msg_size);
3065 rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
3069 rc = GNUNET_malloc (sizeof (*rc) +
3070 msize + /* reassembly payload buffer */
3071 (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
3072 rc->msg_uuid = fb->msg_uuid;
3074 rc->msg_size = msize;
3075 rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
3076 rc->last_frag = GNUNET_TIME_absolute_get ();
3077 rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
3079 rc->reassembly_timeout.abs_value_us);
3080 GNUNET_assert (GNUNET_OK ==
3081 GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
3084 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3085 target = (char *) &rc[1];
3086 rc->bitfield = (uint8_t *) (target + rc->msg_size);
3087 rc->msg_missing = rc->msg_size;
3091 target = (char *) &rc[1];
3093 if (msize != rc->msg_size)
3096 finish_cmc_handling (cmc);
3101 fsize = ntohs (fb->header.size) - sizeof (*fb);
3102 frag_off = ntohs (fb->frag_off);
3103 memcpy (&target[frag_off],
3106 /* update bitfield and msg_missing */
3107 for (unsigned int i=frag_off;i<frag_off+fsize;i++)
3109 if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
3111 rc->bitfield[i / 8] |= (1 << (i % 8));
3116 /* Compute cummulative ACK */
3117 frag_uuid = ntohl (fb->frag_uuid);
3118 cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
3119 cdelay = GNUNET_TIME_relative_multiply (cdelay,
3121 rc->last_frag = GNUNET_TIME_absolute_get ();
3122 rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
3124 ack_now = GNUNET_NO;
3125 if (0 == rc->num_acks)
3127 /* case one: first ack */
3128 rc->frag_uuid = frag_uuid;
3129 rc->extra_acks = 0LLU;
3132 else if ( (frag_uuid >= rc->frag_uuid) &&
3133 (frag_uuid <= rc->frag_uuid + 64) )
3135 /* case two: ack fits after existing min UUID */
3136 if ( (frag_uuid == rc->frag_uuid) ||
3137 (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
3139 /* duplicate fragment, ack now! */
3140 ack_now = GNUNET_YES;
3144 rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
3148 else if ( (rc->frag_uuid > frag_uuid) &&
3149 ( ( (rc->frag_uuid == frag_uuid + 64) &&
3150 (0 == rc->extra_acks) ) ||
3151 ( (rc->frag_uuid < frag_uuid + 64) &&
3152 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
3154 /* can fit ack by shifting extra acks and starting at
3155 frag_uid, test above esured that the bits we will
3156 shift 'extra_acks' by are all zero. */
3157 rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
3158 rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
3159 rc->frag_uuid = frag_uuid;
3162 if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
3163 ack_now = GNUNET_YES; /* maximum acks received */
3164 // FIXME: possibly also ACK based on RTT (but for that we'd need to
3165 // determine the session used for the ACK first!)
3167 /* is reassembly complete? */
3168 if (0 != rc->msg_missing)
3171 send_fragment_ack (rc);
3172 finish_cmc_handling (cmc);
3175 /* reassembly is complete, verify result */
3176 msg = (const struct GNUNET_MessageHeader *) &rc[1];
3177 if (ntohs (msg->size) != rc->msg_size)
3180 free_reassembly_context (rc);
3181 finish_cmc_handling (cmc);
3184 /* successful reassembly */
3185 send_fragment_ack (rc);
3186 demultiplex_with_cmc (cmc,
3188 /* FIXME: really free here? Might be bad if fragments are still
3189 en-route and we forget that we finished this reassembly immediately!
3190 -> keep around until timeout?
3191 -> shorten timeout based on ACK? */
3192 free_reassembly_context (rc);
3197 * Communicator gave us a fragment acknowledgement. Process the request.
3199 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3200 * @param fa the message that was received
3203 handle_fragment_ack (void *cls,
3204 const struct TransportFragmentAckMessage *fa)
3206 struct CommunicatorMessageContext *cmc = cls;
3208 // FIXME: do work: identify original message; then identify fragments being acked;
3209 // remove those from the tree to prevent retransmission;
3211 // if entire message is ACKed, handle that as well.
3212 finish_cmc_handling (cmc);
3217 * Communicator gave us a reliability box. Check the message.
3219 * @param cls a `struct CommunicatorMessageContext`
3220 * @param rb the send message that was sent
3221 * @return #GNUNET_YES if message is well-formed
3224 check_reliability_box (void *cls,
3225 const struct TransportReliabilityBox *rb)
3227 GNUNET_MQ_check_boxed_message (rb);
3233 * Communicator gave us a reliability box. Process the request.
3235 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3236 * @param rb the message that was received
3239 handle_reliability_box (void *cls,
3240 const struct TransportReliabilityBox *rb)
3242 struct CommunicatorMessageContext *cmc = cls;
3243 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
3245 if (0 == ntohl (rb->ack_countdown))
3247 struct TransportReliabilityAckMessage *ack;
3249 /* FIXME: implement cummulative ACKs and ack_countdown,
3250 then setting the avg_ack_delay field below: */
3251 ack = GNUNET_malloc (sizeof (*ack) +
3252 sizeof (struct GNUNET_ShortHashCode));
3253 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
3254 ack->header.size = htons (sizeof (*ack) +
3255 sizeof (struct GNUNET_ShortHashCode));
3258 sizeof (struct GNUNET_ShortHashCode));
3259 route_message (&cmc->im.sender,
3262 /* continue with inner message */
3263 demultiplex_with_cmc (cmc,
3269 * Communicator gave us a reliability ack. Process the request.
3271 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3272 * @param ra the message that was received
3275 handle_reliability_ack (void *cls,
3276 const struct TransportReliabilityAckMessage *ra)
3278 struct CommunicatorMessageContext *cmc = cls;
3280 // FIXME: do work: find message that was acknowledged, and
3281 // remove from transmission queue; update RTT.
3282 finish_cmc_handling (cmc);
3287 * Communicator gave us a backchannel encapsulation. Check the message.
3289 * @param cls a `struct CommunicatorMessageContext`
3290 * @param be the send message that was sent
3291 * @return #GNUNET_YES if message is well-formed
3294 check_backchannel_encapsulation (void *cls,
3295 const struct TransportBackchannelEncapsulationMessage *be)
3297 uint16_t size = ntohs (be->header.size);
3299 if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
3301 GNUNET_break_op (0);
3302 return GNUNET_SYSERR;
3309 * Communicator gave us a backchannel encapsulation. Process the request.
3311 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3312 * @param be the message that was received
3315 handle_backchannel_encapsulation (void *cls,
3316 const struct TransportBackchannelEncapsulationMessage *be)
3318 struct CommunicatorMessageContext *cmc = cls;
3320 if (0 != memcmp (&be->target,
3322 sizeof (struct GNUNET_PeerIdentity)))
3324 /* not for me, try to route to target */
3325 route_message (&be->target,
3326 GNUNET_copy_message (&be->header));
3327 finish_cmc_handling (cmc);
3330 // FIXME: compute shared secret
3331 // FIXME: check HMAC
3332 // FIXME: decrypt payload
3333 // FIXME: forward to specified communicator!
3334 // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
3335 finish_cmc_handling (cmc);
3340 * Communicator gave us a DV learn message. Check the message.
3342 * @param cls a `struct CommunicatorMessageContext`
3343 * @param dvl the send message that was sent
3344 * @return #GNUNET_YES if message is well-formed
3347 check_dv_learn (void *cls,
3348 const struct TransportDVLearn *dvl)
3350 uint16_t size = ntohs (dvl->header.size);
3351 uint16_t num_hops = ntohs (dvl->num_hops);
3352 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvl[1];
3354 if (size != sizeof (*dvl) + num_hops * sizeof (struct GNUNET_PeerIdentity))
3356 GNUNET_break_op (0);
3357 return GNUNET_SYSERR;
3359 for (unsigned int i=0;i<num_hops;i++)
3361 if (0 == memcmp (&dvl->initiator,
3363 sizeof (struct GNUNET_PeerIdentity)))
3365 GNUNET_break_op (0);
3366 return GNUNET_SYSERR;
3368 if (0 == memcmp (&GST_my_identity,
3370 sizeof (struct GNUNET_PeerIdentity)))
3372 GNUNET_break_op (0);
3373 return GNUNET_SYSERR;
3381 * Communicator gave us a DV learn message. Process the request.
3383 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3384 * @param dvl the message that was received
3387 handle_dv_learn (void *cls,
3388 const struct TransportDVLearn *dvl)
3390 struct CommunicatorMessageContext *cmc = cls;
3392 // FIXME: learn path from DV message (if bi-directional flags are set)
3393 // FIXME: expand DV message, forward on (unless path is getting too long)
3394 finish_cmc_handling (cmc);
3399 * Communicator gave us a DV box. Check the message.
3401 * @param cls a `struct CommunicatorMessageContext`
3402 * @param dvb the send message that was sent
3403 * @return #GNUNET_YES if message is well-formed
3406 check_dv_box (void *cls,
3407 const struct TransportDVBox *dvb)
3409 uint16_t size = ntohs (dvb->header.size);
3410 uint16_t num_hops = ntohs (dvb->num_hops);
3411 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3412 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3416 if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
3418 GNUNET_break_op (0);
3419 return GNUNET_SYSERR;
3421 isize = ntohs (inbox->size);
3422 if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
3424 GNUNET_break_op (0);
3425 return GNUNET_SYSERR;
3427 itype = ntohs (inbox->type);
3428 if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
3429 (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
3431 GNUNET_break_op (0);
3432 return GNUNET_SYSERR;
3439 * Communicator gave us a DV box. Process the request.
3441 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3442 * @param dvb the message that was received
3445 handle_dv_box (void *cls,
3446 const struct TransportDVBox *dvb)
3448 struct CommunicatorMessageContext *cmc = cls;
3449 uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
3450 uint16_t num_hops = ntohs (dvb->num_hops);
3451 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3452 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3456 // FIXME: if we are not the target, shorten path and forward along.
3457 // Try from the _end_ of hops array if we know the given
3458 // neighbour (shortening the path!).
3459 // NOTE: increment total_hops!
3460 finish_cmc_handling (cmc);
3463 /* We are the target. Unbox and handle message. */
3464 cmc->im.sender = dvb->origin;
3465 cmc->total_hops = ntohs (dvb->total_hops);
3466 demultiplex_with_cmc (cmc,
3472 * Client notified us about transmission from a peer. Process the request.
3474 * @param cls a `struct TransportClient` which sent us the message
3475 * @param obm the send message that was sent
3476 * @return #GNUNET_YES if message is well-formed
3479 check_incoming_msg (void *cls,
3480 const struct GNUNET_TRANSPORT_IncomingMessage *im)
3482 struct TransportClient *tc = cls;
3484 if (CT_COMMUNICATOR != tc->type)
3487 return GNUNET_SYSERR;
3489 GNUNET_MQ_check_boxed_message (im);
3495 * Incoming meessage. Process the request.
3497 * @param im the send message that was received
3500 handle_incoming_msg (void *cls,
3501 const struct GNUNET_TRANSPORT_IncomingMessage *im)
3503 struct TransportClient *tc = cls;
3504 struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
3508 demultiplex_with_cmc (cmc,
3509 (const struct GNUNET_MessageHeader *) &im[1]);
3514 * Given an inbound message @a msg from a communicator @a cmc,
3515 * demultiplex it based on the type calling the right handler.
3517 * @param cmc context for demultiplexing
3518 * @param msg message to demultiplex
3521 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3522 const struct GNUNET_MessageHeader *msg)
3524 struct GNUNET_MQ_MessageHandler handlers[] = {
3525 GNUNET_MQ_hd_var_size (fragment_box,
3526 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
3527 struct TransportFragmentBox,
3529 GNUNET_MQ_hd_fixed_size (fragment_ack,
3530 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
3531 struct TransportFragmentAckMessage,
3533 GNUNET_MQ_hd_var_size (reliability_box,
3534 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
3535 struct TransportReliabilityBox,
3537 GNUNET_MQ_hd_fixed_size (reliability_ack,
3538 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
3539 struct TransportReliabilityAckMessage,
3541 GNUNET_MQ_hd_var_size (backchannel_encapsulation,
3542 GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
3543 struct TransportBackchannelEncapsulationMessage,
3545 GNUNET_MQ_hd_var_size (dv_learn,
3546 GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
3547 struct TransportDVLearn,
3549 GNUNET_MQ_hd_var_size (dv_box,
3550 GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
3551 struct TransportDVBox,
3553 GNUNET_MQ_handler_end()
3557 ret = GNUNET_MQ_handle_message (handlers,
3559 if (GNUNET_SYSERR == ret)
3562 GNUNET_SERVICE_client_drop (cmc->tc->client);
3566 if (GNUNET_NO == ret)
3568 /* unencapsulated 'raw' message */
3569 handle_raw_message (&cmc,
3576 * New queue became available. Check message.
3578 * @param cls the client
3579 * @param aqm the send message that was sent
3582 check_add_queue_message (void *cls,
3583 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
3585 struct TransportClient *tc = cls;
3587 if (CT_COMMUNICATOR != tc->type)
3590 return GNUNET_SYSERR;
3592 GNUNET_MQ_check_zero_termination (aqm);
3598 * Bandwidth tracker informs us that the delay until we should receive
3601 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
3604 tracker_update_in_cb (void *cls)
3606 struct GNUNET_ATS_Session *queue = cls;
3607 struct GNUNET_TIME_Relative in_delay;
3610 rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
3611 in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
3613 // FIXME: how exactly do we do inbound flow control?
3618 * If necessary, generates the UUID for a @a pm
3620 * @param pm pending message to generate UUID for.
3623 set_pending_message_uuid (struct PendingMessage *pm)
3625 if (pm->msg_uuid_set)
3627 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
3629 sizeof (pm->msg_uuid));
3630 pm->msg_uuid_set = GNUNET_YES;
3635 * Fragment the given @a pm to the given @a mtu. Adds
3636 * additional fragments to the neighbour as well. If the
3637 * @a mtu is too small, generates and error for the @a pm
3640 * @param pm pending message to fragment for transmission
3641 * @param mtu MTU to apply
3642 * @return new message to transmit
3644 static struct PendingMessage *
3645 fragment_message (struct PendingMessage *pm,
3648 struct PendingMessage *ff;
3650 set_pending_message_uuid (pm);
3652 /* This invariant is established in #handle_add_queue_message() */
3653 GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
3655 /* select fragment for transmission, descending the tree if it has
3656 been expanded until we are at a leaf or at a fragment that is small enough */
3658 while ( ( (ff->bytes_msg > mtu) ||
3660 (ff->frag_off == ff->bytes_msg) &&
3661 (NULL != ff->head_frag) )
3663 ff = ff->head_frag; /* descent into fragmented fragments */
3666 if ( ( (ff->bytes_msg > mtu) ||
3668 (pm->frag_off < pm->bytes_msg) )
3670 /* Did not yet calculate all fragments, calculate next fragment */
3671 struct PendingMessage *frag;
3672 struct TransportFragmentBox tfb;
3680 orig = (const char *) &ff[1];
3681 msize = ff->bytes_msg;
3684 const struct TransportFragmentBox *tfbo;
3686 tfbo = (const struct TransportFragmentBox *) orig;
3687 orig += sizeof (struct TransportFragmentBox);
3688 msize -= sizeof (struct TransportFragmentBox);
3689 xoff = ntohs (tfbo->frag_off);
3691 fragmax = mtu - sizeof (struct TransportFragmentBox);
3692 fragsize = GNUNET_MIN (msize - ff->frag_off,
3694 frag = GNUNET_malloc (sizeof (struct PendingMessage) +
3695 sizeof (struct TransportFragmentBox) +
3697 frag->target = pm->target;
3698 frag->frag_parent = ff;
3699 frag->timeout = pm->timeout;
3700 frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
3701 frag->pmt = PMT_FRAGMENT_BOX;
3702 msg = (char *) &frag[1];
3703 tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
3704 tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
3706 tfb.frag_uuid = htonl (pm->frag_uuidgen++);
3707 tfb.msg_uuid = pm->msg_uuid;
3708 tfb.frag_off = htons (ff->frag_off + xoff);
3709 tfb.msg_size = htons (pm->bytes_msg);
3713 memcpy (&msg[sizeof (tfb)],
3714 &orig[ff->frag_off],
3716 GNUNET_CONTAINER_MDLL_insert (frag,
3720 ff->frag_off += fragsize;
3724 /* Move head to the tail and return it */
3725 GNUNET_CONTAINER_MDLL_remove (frag,
3726 ff->frag_parent->head_frag,
3727 ff->frag_parent->tail_frag,
3729 GNUNET_CONTAINER_MDLL_insert_tail (frag,
3730 ff->frag_parent->head_frag,
3731 ff->frag_parent->tail_frag,
3738 * Reliability-box the given @a pm. On error (can there be any), NULL
3739 * may be returned, otherwise the "replacement" for @a pm (which
3740 * should then be added to the respective neighbour's queue instead of
3741 * @a pm). If the @a pm is already fragmented or reliability boxed,
3742 * or itself an ACK, this function simply returns @a pm.
3744 * @param pm pending message to box for transmission over unreliabile queue
3745 * @return new message to transmit
3747 static struct PendingMessage *
3748 reliability_box_message (struct PendingMessage *pm)
3750 struct TransportReliabilityBox rbox;
3751 struct PendingMessage *bpm;
3754 if (PMT_CORE != pm->pmt)
3755 return pm; /* already fragmented or reliability boxed, or control message: do nothing */
3756 if (NULL != pm->bpm)
3757 return pm->bpm; /* already computed earlier: do nothing */
3758 GNUNET_assert (NULL == pm->head_frag);
3759 if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
3763 client_send_response (pm,
3768 bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
3771 bpm->target = pm->target;
3772 bpm->frag_parent = pm;
3773 GNUNET_CONTAINER_MDLL_insert (frag,
3777 bpm->timeout = pm->timeout;
3778 bpm->pmt = PMT_RELIABILITY_BOX;
3779 bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
3780 set_pending_message_uuid (bpm);
3781 rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
3782 rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
3783 rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
3784 rbox.msg_uuid = pm->msg_uuid;
3785 msg = (char *) &bpm[1];
3789 memcpy (&msg[sizeof (rbox)],
3798 * We believe we are ready to transmit a message on a queue. Double-checks
3799 * with the queue's "tracker_out" and then gives the message to the
3800 * communicator for transmission (updating the tracker, and re-scheduling
3801 * itself if applicable).
3803 * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
3806 transmit_on_queue (void *cls)
3808 struct GNUNET_ATS_Session *queue = cls;
3809 struct Neighbour *n = queue->neighbour;
3810 struct QueueEntry *qe;
3811 struct PendingMessage *pm;
3812 struct PendingMessage *s;
3814 struct GNUNET_TRANSPORT_SendMessageTo *smt;
3815 struct GNUNET_MQ_Envelope *env;
3817 queue->transmit_task = NULL;
3818 if (NULL == (pm = n->pending_msg_head))
3820 /* no message pending, nothing to do here! */
3823 schedule_transmit_on_queue (queue);
3824 if (NULL != queue->transmit_task)
3825 return; /* do it later */
3827 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3828 overhead += sizeof (struct TransportReliabilityBox);
3830 if ( ( (0 != queue->mtu) &&
3831 (pm->bytes_msg + overhead > queue->mtu) ) ||
3832 (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
3833 (NULL != pm->head_frag /* fragments already exist, should
3834 respect that even if MTU is 0 for
3836 s = fragment_message (s,
3838 ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
3842 /* Fragmentation failed, try next message... */
3843 schedule_transmit_on_queue (queue);
3846 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3847 s = reliability_box_message (s);
3850 /* Reliability boxing failed, try next message... */
3851 schedule_transmit_on_queue (queue);
3855 /* Pass 's' for transission to the communicator */
3856 qe = GNUNET_new (struct QueueEntry);
3857 qe->mid = queue->mid_gen++;
3858 qe->session = queue;
3859 // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
3860 GNUNET_CONTAINER_DLL_insert (queue->queue_head,
3863 env = GNUNET_MQ_msg_extra (smt,
3865 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
3866 smt->qid = queue->qid;
3868 smt->receiver = n->pid;
3872 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3873 queue->queue_length++;
3874 queue->tc->details.communicator.total_queue_length++;
3875 GNUNET_MQ_send (queue->tc->mq,
3878 // FIXME: do something similar to the logic below
3879 // in defragmentation / reliability ACK handling!
3881 /* Check if this transmission somehow conclusively finished handing 'pm'
3882 even without any explicit ACKs */
3883 if ( (PMT_CORE == s->pmt) &&
3884 (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
3886 /* Full message sent, and over reliabile channel */
3887 client_send_response (pm,
3891 else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
3892 (PMT_FRAGMENT_BOX == s->pmt) )
3894 struct PendingMessage *pos;
3896 /* Fragment sent over reliabile channel */
3897 free_fragment_tree (s);
3898 pos = s->frag_parent;
3899 GNUNET_CONTAINER_MDLL_remove (frag,
3904 /* check if subtree is done */
3905 while ( (NULL == pos->head_frag) &&
3906 (pos->frag_off == pos->bytes_msg) &&
3910 pos = s->frag_parent;
3911 GNUNET_CONTAINER_MDLL_remove (frag,
3918 /* Was this the last applicable fragmment? */
3919 if ( (NULL == pm->head_frag) &&
3920 (pm->frag_off == pm->bytes_msg) )
3921 client_send_response (pm,
3923 pm->bytes_msg /* FIXME: calculate and add overheads! */);
3925 else if (PMT_CORE != pm->pmt)
3927 /* This was an acknowledgement of some type, always free */
3928 free_pending_message (pm);
3932 /* message not finished, waiting for acknowledgement */
3933 struct Neighbour *neighbour = pm->target;
3934 /* Update time by which we might retransmit 's' based on queue
3935 characteristics (i.e. RTT); it takes one RTT for the message to
3936 arrive and the ACK to come back in the best case; but the other
3937 side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
3938 retransmitting. Note that in the future this heuristic should
3939 likely be improved further (measure RTT stability, consider
3940 message urgency and size when delaying ACKs, etc.) */
3941 s->next_attempt = GNUNET_TIME_relative_to_absolute
3942 (GNUNET_TIME_relative_multiply (queue->rtt,
3946 struct PendingMessage *pos;
3948 /* re-insert sort in neighbour list */
3949 GNUNET_CONTAINER_MDLL_remove (neighbour,
3950 neighbour->pending_msg_head,
3951 neighbour->pending_msg_tail,
3953 pos = neighbour->pending_msg_tail;
3954 while ( (NULL != pos) &&
3955 (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3956 pos = pos->prev_neighbour;
3957 GNUNET_CONTAINER_MDLL_insert_after (neighbour,
3958 neighbour->pending_msg_head,
3959 neighbour->pending_msg_tail,
3965 /* re-insert sort in fragment list */
3966 struct PendingMessage *fp = s->frag_parent;
3967 struct PendingMessage *pos;
3969 GNUNET_CONTAINER_MDLL_remove (frag,
3973 pos = fp->tail_frag;
3974 while ( (NULL != pos) &&
3975 (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3976 pos = pos->prev_frag;
3977 GNUNET_CONTAINER_MDLL_insert_after (frag,
3985 /* finally, re-schedule queue transmission task itself */
3986 schedule_transmit_on_queue (queue);
3991 * Bandwidth tracker informs us that the delay until we
3992 * can transmit again changed.
3994 * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
3997 tracker_update_out_cb (void *cls)
3999 struct GNUNET_ATS_Session *queue = cls;
4000 struct Neighbour *n = queue->neighbour;
4002 if (NULL == n->pending_msg_head)
4004 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4005 "Bandwidth allocation updated for empty transmission queue `%s'\n",
4007 return; /* no message pending, nothing to do here! */
4009 GNUNET_SCHEDULER_cancel (queue->transmit_task);
4010 queue->transmit_task = NULL;
4011 schedule_transmit_on_queue (queue);
4016 * Bandwidth tracker informs us that excessive outbound bandwidth was
4017 * allocated which is not being used.
4019 * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
4022 tracker_excess_out_cb (void *cls)
4024 /* FIXME: trigger excess bandwidth report to core? Right now,
4025 this is done internally within transport_api2_core already,
4026 but we probably want to change the logic and trigger it
4027 from here via a message instead! */
4028 /* TODO: maybe inform ATS at this point? */
4029 GNUNET_STATISTICS_update (GST_stats,
4030 "# Excess outbound bandwidth reported",
4038 * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
4039 * which is not being used.
4041 * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
4044 tracker_excess_in_cb (void *cls)
4046 /* TODO: maybe inform ATS at this point? */
4047 GNUNET_STATISTICS_update (GST_stats,
4048 "# Excess inbound bandwidth reported",
4055 * New queue became available. Process the request.
4057 * @param cls the client
4058 * @param aqm the send message that was sent
4061 handle_add_queue_message (void *cls,
4062 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
4064 struct TransportClient *tc = cls;
4065 struct GNUNET_ATS_Session *queue;
4066 struct Neighbour *neighbour;
4070 if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
4072 /* MTU so small as to be useless for transmissions,
4073 required for #fragment_message()! */
4074 GNUNET_break_op (0);
4075 GNUNET_SERVICE_client_drop (tc->client);
4078 neighbour = lookup_neighbour (&aqm->receiver);
4079 if (NULL == neighbour)
4081 neighbour = GNUNET_new (struct Neighbour);
4082 neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
4083 neighbour->pid = aqm->receiver;
4084 GNUNET_assert (GNUNET_OK ==
4085 GNUNET_CONTAINER_multipeermap_put (neighbours,
4088 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4089 cores_send_connect_info (&neighbour->pid,
4090 GNUNET_BANDWIDTH_ZERO);
4092 addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
4093 addr = (const char *) &aqm[1];
4095 queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
4097 queue->address = (const char *) &queue[1];
4098 queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
4099 queue->qid = aqm->qid;
4100 queue->mtu = ntohl (aqm->mtu);
4101 queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
4102 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
4103 queue->neighbour = neighbour;
4104 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
4105 &tracker_update_in_cb,
4107 GNUNET_BANDWIDTH_ZERO,
4108 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4109 &tracker_excess_in_cb,
4111 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
4112 &tracker_update_out_cb,
4114 GNUNET_BANDWIDTH_ZERO,
4115 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4116 &tracker_excess_out_cb,
4121 /* notify ATS about new queue */
4123 struct GNUNET_ATS_Properties prop = {
4124 .delay = GNUNET_TIME_UNIT_FOREVER_REL,
4127 .cc = tc->details.communicator.cc
4130 queue->sr = GNUNET_ATS_session_add (ats,
4135 if (NULL == queue->sr)
4137 /* This can only happen if the 'address' was way too long for ATS
4138 (approaching 64k in strlen()!). In this case, the communicator
4139 must be buggy and we drop it. */
4141 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
4142 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
4143 GNUNET_free (queue);
4144 if (NULL == neighbour->session_head)
4146 cores_send_disconnect_info (&neighbour->pid);
4147 free_neighbour (neighbour);
4149 GNUNET_SERVICE_client_drop (tc->client);
4153 /* notify monitors about new queue */
4155 struct MonitorEvent me = {
4160 notify_monitors (&neighbour->pid,
4165 GNUNET_CONTAINER_MDLL_insert (neighbour,
4166 neighbour->session_head,
4167 neighbour->session_tail,
4169 GNUNET_CONTAINER_MDLL_insert (client,
4170 tc->details.communicator.session_head,
4171 tc->details.communicator.session_tail,
4173 GNUNET_SERVICE_client_continue (tc->client);
4178 * Queue to a peer went down. Process the request.
4180 * @param cls the client
4181 * @param dqm the send message that was sent
4184 handle_del_queue_message (void *cls,
4185 const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
4187 struct TransportClient *tc = cls;
4189 if (CT_COMMUNICATOR != tc->type)
4192 GNUNET_SERVICE_client_drop (tc->client);
4195 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4197 session = session->next_client)
4199 struct Neighbour *neighbour = session->neighbour;
4201 if ( (dqm->qid != session->qid) ||
4202 (0 != memcmp (&dqm->receiver,
4204 sizeof (struct GNUNET_PeerIdentity))) )
4206 free_session (session);
4207 GNUNET_SERVICE_client_continue (tc->client);
4211 GNUNET_SERVICE_client_drop (tc->client);
4216 * Message was transmitted. Process the request.
4218 * @param cls the client
4219 * @param sma the send message that was sent
4222 handle_send_message_ack (void *cls,
4223 const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
4225 struct TransportClient *tc = cls;
4226 struct QueueEntry *queue;
4228 if (CT_COMMUNICATOR != tc->type)
4231 GNUNET_SERVICE_client_drop (tc->client);
4235 /* find our queue entry matching the ACK */
4237 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4239 session = session->next_client)
4241 if (0 != memcmp (&session->neighbour->pid,
4243 sizeof (struct GNUNET_PeerIdentity)))
4245 for (struct QueueEntry *qe = session->queue_head;
4249 if (qe->mid != sma->mid)
4258 /* this should never happen */
4260 GNUNET_SERVICE_client_drop (tc->client);
4263 GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
4264 queue->session->queue_tail,
4266 queue->session->queue_length--;
4267 tc->details.communicator.total_queue_length--;
4268 GNUNET_SERVICE_client_continue (tc->client);
4270 /* if applicable, resume transmissions that waited on ACK */
4271 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
4273 /* Communicator dropped below threshold, resume all queues */
4274 GNUNET_STATISTICS_update (GST_stats,
4275 "# Transmission throttled due to communicator queue limit",
4278 for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4280 session = session->next_client)
4281 schedule_transmit_on_queue (session);
4283 else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
4285 /* queue dropped below threshold; only resume this one queue */
4286 GNUNET_STATISTICS_update (GST_stats,
4287 "# Transmission throttled due to session queue limit",
4290 schedule_transmit_on_queue (queue->session);
4293 /* TODO: we also should react on the status! */
4294 // FIXME: this probably requires queue->pm = s assignment!
4295 // FIXME: react to communicator status about transmission request. We got:
4296 sma->status; // OK success, SYSERR failure
4298 GNUNET_free (queue);
4303 * Iterator telling new MONITOR client about all existing
4306 * @param cls the new `struct TransportClient`
4307 * @param pid a connected peer
4308 * @param value the `struct Neighbour` with more information
4309 * @return #GNUNET_OK (continue to iterate)
4312 notify_client_queues (void *cls,
4313 const struct GNUNET_PeerIdentity *pid,
4316 struct TransportClient *tc = cls;
4317 struct Neighbour *neighbour = value;
4319 GNUNET_assert (CT_MONITOR == tc->type);
4320 for (struct GNUNET_ATS_Session *q = neighbour->session_head;
4322 q = q->next_neighbour)
4324 struct MonitorEvent me = {
4327 .num_msg_pending = q->num_msg_pending,
4328 .num_bytes_pending = q->num_bytes_pending
4342 * Initialize a monitor client.
4344 * @param cls the client
4345 * @param start the start message that was sent
4348 handle_monitor_start (void *cls,
4349 const struct GNUNET_TRANSPORT_MonitorStart *start)
4351 struct TransportClient *tc = cls;
4353 if (CT_NONE != tc->type)
4356 GNUNET_SERVICE_client_drop (tc->client);
4359 tc->type = CT_MONITOR;
4360 tc->details.monitor.peer = start->peer;
4361 tc->details.monitor.one_shot = ntohl (start->one_shot);
4362 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4363 ¬ify_client_queues,
4365 GNUNET_SERVICE_client_mark_monitor (tc->client);
4366 GNUNET_SERVICE_client_continue (tc->client);
4371 * Signature of a function called by ATS with the current bandwidth
4372 * allocation to be used as determined by ATS.
4374 * @param cls closure, NULL
4375 * @param session session this is about
4376 * @param bandwidth_out assigned outbound bandwidth for the connection,
4377 * 0 to signal disconnect
4378 * @param bandwidth_in assigned inbound bandwidth for the connection,
4379 * 0 to signal disconnect
4382 ats_allocation_cb (void *cls,
4383 struct GNUNET_ATS_Session *session,
4384 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
4385 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
4388 GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
4390 GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
4396 * Find transport client providing communication service
4397 * for the protocol @a prefix.
4399 * @param prefix communicator name
4400 * @return NULL if no such transport client is available
4402 static struct TransportClient *
4403 lookup_communicator (const char *prefix)
4405 for (struct TransportClient *tc = clients_head;
4409 if (CT_COMMUNICATOR != tc->type)
4411 if (0 == strcmp (prefix,
4412 tc->details.communicator.address_prefix))
4415 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4416 "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
4423 * Signature of a function called by ATS suggesting transport to
4424 * try connecting with a particular address.
4426 * @param cls closure, NULL
4427 * @param pid target peer
4428 * @param address the address to try
4431 ats_suggestion_cb (void *cls,
4432 const struct GNUNET_PeerIdentity *pid,
4433 const char *address)
4435 static uint32_t idgen;
4436 struct TransportClient *tc;
4438 struct GNUNET_TRANSPORT_CreateQueue *cqm;
4439 struct GNUNET_MQ_Envelope *env;
4443 prefix = GNUNET_HELLO_address_to_prefix (address);
4446 GNUNET_break (0); /* ATS gave invalid address!? */
4449 tc = lookup_communicator (prefix);
4452 GNUNET_STATISTICS_update (GST_stats,
4453 "# ATS suggestions ignored due to missing communicator",
4458 /* forward suggestion for queue creation to communicator */
4459 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4460 "Request #%u for `%s' communicator to create queue to `%s'\n",
4461 (unsigned int) idgen,
4464 alen = strlen (address) + 1;
4465 env = GNUNET_MQ_msg_extra (cqm,
4467 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
4468 cqm->request_id = htonl (idgen++);
4469 cqm->receiver = *pid;
4473 GNUNET_MQ_send (tc->mq,
4479 * Communicator tells us that our request to create a queue "worked", that
4480 * is setting up the queue is now in process.
4482 * @param cls the `struct TransportClient`
4483 * @param cqr confirmation message
4486 handle_queue_create_ok (void *cls,
4487 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4489 struct TransportClient *tc = cls;
4491 if (CT_COMMUNICATOR != tc->type)
4494 GNUNET_SERVICE_client_drop (tc->client);
4497 GNUNET_STATISTICS_update (GST_stats,
4498 "# ATS suggestions succeeded at communicator",
4501 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4502 "Request #%u for communicator to create queue succeeded\n",
4503 (unsigned int) ntohs (cqr->request_id));
4504 GNUNET_SERVICE_client_continue (tc->client);
4509 * Communicator tells us that our request to create a queue failed. This usually
4510 * indicates that the provided address is simply invalid or that the communicator's
4511 * resources are exhausted.
4513 * @param cls the `struct TransportClient`
4514 * @param cqr failure message
4517 handle_queue_create_fail (void *cls,
4518 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4520 struct TransportClient *tc = cls;
4522 if (CT_COMMUNICATOR != tc->type)
4525 GNUNET_SERVICE_client_drop (tc->client);
4528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4529 "Request #%u for communicator to create queue failed\n",
4530 (unsigned int) ntohs (cqr->request_id));
4531 GNUNET_STATISTICS_update (GST_stats,
4532 "# ATS suggestions failed in queue creation at communicator",
4535 GNUNET_SERVICE_client_continue (tc->client);
4540 * Free neighbour entry.
4544 * @param value a `struct Neighbour`
4545 * @return #GNUNET_OK (always)
4548 free_neighbour_cb (void *cls,
4549 const struct GNUNET_PeerIdentity *pid,
4552 struct Neighbour *neighbour = value;
4556 GNUNET_break (0); // should this ever happen?
4557 free_neighbour (neighbour);
4564 * Free DV route entry.
4568 * @param value a `struct DistanceVector`
4569 * @return #GNUNET_OK (always)
4572 free_dv_routes_cb (void *cls,
4573 const struct GNUNET_PeerIdentity *pid,
4576 struct DistanceVector *dv = value;
4587 * Free ephemeral entry.
4591 * @param value a `struct Neighbour`
4592 * @return #GNUNET_OK (always)
4595 free_ephemeral_cb (void *cls,
4596 const struct GNUNET_PeerIdentity *pid,
4599 struct EphemeralCacheEntry *ece = value;
4603 free_ephemeral (ece);
4609 * Function called when the service shuts down. Unloads our plugins
4610 * and cancels pending validations.
4612 * @param cls closure, unused
4615 do_shutdown (void *cls)
4619 if (NULL != ephemeral_task)
4621 GNUNET_SCHEDULER_cancel (ephemeral_task);
4622 ephemeral_task = NULL;
4624 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4629 GNUNET_ATS_transport_done (ats);
4632 if (NULL != peerstore)
4634 GNUNET_PEERSTORE_disconnect (peerstore,
4638 if (NULL != GST_stats)
4640 GNUNET_STATISTICS_destroy (GST_stats,
4644 if (NULL != GST_my_private_key)
4646 GNUNET_free (GST_my_private_key);
4647 GST_my_private_key = NULL;
4649 GNUNET_CONTAINER_multipeermap_destroy (neighbours);
4651 GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
4654 GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
4656 GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
4659 GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
4660 ephemeral_map = NULL;
4661 GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
4662 ephemeral_heap = NULL;
4667 * Initiate transport service.
4669 * @param cls closure
4670 * @param c configuration to use
4671 * @param service the initialized service
4675 const struct GNUNET_CONFIGURATION_Handle *c,
4676 struct GNUNET_SERVICE_Handle *service)
4681 neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
4683 dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
4685 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
4687 ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4688 GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
4689 if (NULL == GST_my_private_key)
4691 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4692 _("Transport service is lacking key configuration settings. Exiting.\n"));
4693 GNUNET_SCHEDULER_shutdown ();
4696 GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
4697 &GST_my_identity.public_key);
4698 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
4699 "My identity is `%s'\n",
4700 GNUNET_i2s_full (&GST_my_identity));
4701 GST_stats = GNUNET_STATISTICS_create ("transport",
4703 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
4705 peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
4706 if (NULL == peerstore)
4709 GNUNET_SCHEDULER_shutdown ();
4712 ats = GNUNET_ATS_transport_init (GST_cfg,
4720 GNUNET_SCHEDULER_shutdown ();
4727 * Define "main" method using service macro.
4731 GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
4734 &client_disconnect_cb,
4736 /* communication with core */
4737 GNUNET_MQ_hd_fixed_size (client_start,
4738 GNUNET_MESSAGE_TYPE_TRANSPORT_START,
4739 struct StartMessage,
4741 GNUNET_MQ_hd_var_size (client_send,
4742 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
4743 struct OutboundMessage,
4745 /* communication with communicators */
4746 GNUNET_MQ_hd_var_size (communicator_available,
4747 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
4748 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
4750 GNUNET_MQ_hd_var_size (communicator_backchannel,
4751 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
4752 struct GNUNET_TRANSPORT_CommunicatorBackchannel,
4754 GNUNET_MQ_hd_var_size (add_address,
4755 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
4756 struct GNUNET_TRANSPORT_AddAddressMessage,
4758 GNUNET_MQ_hd_fixed_size (del_address,
4759 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
4760 struct GNUNET_TRANSPORT_DelAddressMessage,
4762 GNUNET_MQ_hd_var_size (incoming_msg,
4763 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
4764 struct GNUNET_TRANSPORT_IncomingMessage,
4766 GNUNET_MQ_hd_fixed_size (queue_create_ok,
4767 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
4768 struct GNUNET_TRANSPORT_CreateQueueResponse,
4770 GNUNET_MQ_hd_fixed_size (queue_create_fail,
4771 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
4772 struct GNUNET_TRANSPORT_CreateQueueResponse,
4774 GNUNET_MQ_hd_var_size (add_queue_message,
4775 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
4776 struct GNUNET_TRANSPORT_AddQueueMessage,
4778 GNUNET_MQ_hd_fixed_size (del_queue_message,
4779 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
4780 struct GNUNET_TRANSPORT_DelQueueMessage,
4782 GNUNET_MQ_hd_fixed_size (send_message_ack,
4783 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
4784 struct GNUNET_TRANSPORT_SendMessageToAck,
4786 /* communication with monitors */
4787 GNUNET_MQ_hd_fixed_size (monitor_start,
4788 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
4789 struct GNUNET_TRANSPORT_MonitorStart,
4791 GNUNET_MQ_handler_end ());
4794 /* end of file gnunet-service-transport.c */