2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file transport/gnunet-service-tng.c
22 * @brief main for gnunet-service-tng
23 * @author Christian Grothoff
26 * - figure out how to transmit (selective) ACKs in case of uni-directional
27 * communicators (with/without core? DV-only?) When do we use ACKs?
28 * => communicators use selective ACKs for flow control
29 * => transport uses message-level ACKs for RTT, fragment confirmation
30 * => integrate DV into transport, use neither core nor communicators
31 * but rather give communicators transport-encapsulated messages
32 * (which could be core-data, background-channel traffic, or
33 * transport-to-transport traffic)
36 * - address validation: what is our plan here?
37 * #1 Peerstore only gets 'validated' addresses
38 * #2 transport needs another API to "trigger" validation!
39 * API may be used by core/application or communicators;
40 * => use yet another lib/MQ/connection?
41 * #3 transport should use validation to also establish
42 * effective flow control (for uni-directional transports!)
43 * #4 UDP broadcasting logic must be extended to use the new API
44 * #5 only validated addresses are selected for scheduling; that
45 * also ensures we know the RTT
46 * #6 to ensure flow control and RTT are OK, we always do the
47 * 'validation', even if address comes from PEERSTORE
49 * - ACK handling / retransmission
50 * - address verification
51 * - track RTT, distance, loss, etc.
52 * - DV data structures:
56 * - routing of messages (using DV data structures!)
57 * - handling of DV-boxed messages that need to be forwarded
58 * - backchannel message encryption & decryption
62 * - figure out how to call XXX_suggestion_cb!
65 * - change transport-core API to provide proper flow control in both
66 * directions, allow multiple messages per peer simultaneously (tag
67 * confirmations with unique message ID), and replace quota-out with
68 * proper flow control;
69 * - if messages are below MTU, consider adding ACKs and other stuff
70 * (requires planning at receiver, and additional MST-style demultiplex
72 * - could avoid copying body of message into each fragment and keep
73 * fragments as just pointers into the original message and only
74 * fully build fragments just before transmission (optimization, should
75 * reduce CPU and memory use)
77 * Design realizations / discussion:
78 * - communicators do flow control by calling MQ "notify sent"
79 * when 'ready'. They determine flow implicitly (i.e. TCP blocking)
80 * or explicitly via background channel FC ACKs. As long as the
81 * channel is not full, they may 'notify sent' even if the other
82 * peer has not yet confirmed receipt. The other peer confirming
83 * is _only_ for FC, not for more reliable transmission; reliable
84 * transmission (i.e. of fragments) is left to _transport_.
85 * - ACKs sent back in uni-directional communicators are done via
86 * the background channel API; here transport _may_ initially
87 * broadcast (with bounded # hops) if no path is known;
88 * - transport should _integrate_ DV-routing and build a view of
89 * the network; then background channel traffic can be
90 * routed via DV as well as explicit "DV" traffic.
91 * - background channel is also used for ACKs and NAT traversal support
92 * - transport service is responsible for AEAD'ing the background
93 * channel, timestamps and monotonic time are used against replay
94 * of old messages -> peerstore needs to be supplied with
95 * "latest timestamps seen" data
96 * - if transport implements DV, we likely need a 3rd peermap
97 * in addition to ephemerals and (direct) neighbours
98 * ==> check if stuff needs to be moved out of "Neighbour"
99 * - transport should encapsualte core-level messages and do its
100 * own ACKing for RTT/goodput/loss measurements _and_ fragment
103 #include "platform.h"
104 #include "gnunet_util_lib.h"
105 #include "gnunet_statistics_service.h"
106 #include "gnunet_transport_monitor_service.h"
107 #include "gnunet_peerstore_service.h"
108 #include "gnunet_hello_lib.h"
109 #include "gnunet_signatures.h"
110 #include "transport.h"
114 * What is the size we assume for a read operation in the
115 * absence of an MTU for the purpose of flow control?
117 #define IN_PACKET_SIZE_WITHOUT_MTU 128
120 * If a queue delays the next message by more than this number
121 * of seconds we log a warning. Note: this is for testing,
122 * the value chosen here might be too aggressively low!
124 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
127 * How long are ephemeral keys valid?
129 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
132 * How long do we keep partially reassembled messages around before giving up?
134 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
137 * How many messages can we have pending for a given communicator
138 * process before we start to throttle that communicator?
140 * Used if a communicator might be CPU-bound and cannot handle the traffic.
142 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
145 * How many messages can we have pending for a given queue (queue to
146 * a particular peer via a communicator) process before we start to
147 * throttle that queue?
149 #define QUEUE_LENGTH_LIMIT 32
152 GNUNET_NETWORK_STRUCT_BEGIN
155 * Outer layer of an encapsulated backchannel message.
157 struct TransportBackchannelEncapsulationMessage
160 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
162 struct GNUNET_MessageHeader header;
165 * Distance the backchannel message has traveled, to be updated at
166 * each hop. Used to bound the number of hops in case a backchannel
167 * message is broadcast and thus travels without routing
168 * information (during initial backchannel discovery).
173 * Target's peer identity (as backchannels may be transmitted
174 * indirectly, or even be broadcast).
176 struct GNUNET_PeerIdentity target;
179 * Ephemeral key setup by the sender for @e target, used
180 * to encrypt the payload.
182 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
184 // FIXME: probably should add random IV here as well,
185 // especially if we re-use ephemeral keys!
188 * HMAC over the ciphertext of the encrypted, variable-size
189 * body that follows. Verified via DH of @e target and
192 struct GNUNET_HashCode hmac;
194 /* Followed by encrypted, variable-size payload */
199 * Body by which a peer confirms that it is using an ephemeral key.
201 struct EphemeralConfirmation
205 * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
207 struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
210 * How long is this signature over the ephemeral key valid?
211 * Note that the receiver MUST IGNORE the absolute time, and
212 * only interpret the value as a mononic time and reject
213 * "older" values than the last one observed. Even with this,
214 * there is no real guarantee against replay achieved here,
215 * as the latest timestamp is not persisted. This is
216 * necessary as we do not want to require synchronized
217 * clocks and may not have a bidirectional communication
218 * channel. Communicators must protect against replay
219 * attacks when using backchannel communication!
221 struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
224 * Target's peer identity.
226 struct GNUNET_PeerIdentity target;
229 * Ephemeral key setup by the sender for @e target, used
230 * to encrypt the payload.
232 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
238 * Plaintext of the variable-size payload that is encrypted
239 * within a `struct TransportBackchannelEncapsulationMessage`
241 struct TransportBackchannelRequestPayload
245 * Sender's peer identity.
247 struct GNUNET_PeerIdentity sender;
250 * Signature of the sender over an
251 * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
253 struct GNUNET_CRYPTO_EddsaSignature sender_sig;
256 * How long is this signature over the ephemeral key
259 struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
262 * Current monotonic time of the sending transport service. Used to
263 * detect replayed messages. Note that the receiver should remember
264 * a list of the recently seen timestamps and only reject messages
265 * if the timestamp is in the list, or the list is "full" and the
266 * timestamp is smaller than the lowest in the list. This list of
267 * timestamps per peer should be persisted to guard against replays
270 struct GNUNET_TIME_AbsoluteNBO monotonic_time;
272 /* Followed by a `struct GNUNET_MessageHeader` with a message
273 for a communicator */
275 /* Followed by a 0-termianted string specifying the name of
276 the communicator which is to receive the message */
282 * Outer layer of an encapsulated unfragmented application message sent
283 * over an unreliable channel.
285 struct TransportReliabilityBox
288 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
290 struct GNUNET_MessageHeader header;
293 * Number of messages still to be sent before a commulative
294 * ACK is requested. Zero if an ACK is requested immediately.
295 * In NBO. Note that the receiver may send the ACK faster
296 * if it believes that is reasonable.
298 uint32_t ack_countdown GNUNET_PACKED;
301 * Unique ID of the message used for signalling receipt of
302 * messages sent over possibly unreliable channels. Should
305 struct GNUNET_ShortHashCode msg_uuid;
310 * Confirmation that the receiver got a
311 * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
312 * confirmation may be transmitted over a completely different queue,
313 * so ACKs are identified by a combination of PID of sender and
314 * message UUID, without the queue playing any role!
316 struct TransportReliabilityAckMessage
319 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
321 struct GNUNET_MessageHeader header;
326 uint32_t reserved GNUNET_PACKED;
329 * How long was the ACK delayed relative to the average time of
330 * receipt of the messages being acknowledged? Used to calculate
331 * the average RTT by taking the receipt time of the ack minus the
332 * average transmission time of the sender minus this value.
334 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
336 /* followed by any number of `struct GNUNET_ShortHashCode`
337 messages providing ACKs */
342 * Outer layer of an encapsulated fragmented application message.
344 struct TransportFragmentBox
347 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
349 struct GNUNET_MessageHeader header;
352 * Unique ID of this fragment (and fragment transmission!). Will
353 * change even if a fragement is retransmitted to make each
354 * transmission attempt unique! Should be incremented by one for
355 * each fragment transmission. If a client receives a duplicate
356 * fragment (same @e frag_off), it must send
357 * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
359 uint32_t frag_uuid GNUNET_PACKED;
362 * Original message ID for of the message that all the1
363 * fragments belong to. Must be the same for all fragments.
365 struct GNUNET_ShortHashCode msg_uuid;
368 * Offset of this fragment in the overall message.
370 uint16_t frag_off GNUNET_PACKED;
373 * Total size of the message that is being fragmented.
375 uint16_t msg_size GNUNET_PACKED;
381 * Outer layer of an fragmented application message sent over a queue
382 * with finite MTU. When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
383 * received, the receiver has two RTTs or 64 further fragments with
384 * the same basic message time to send an acknowledgement, possibly
385 * acknowledging up to 65 fragments in one ACK. ACKs must also be
386 * sent immediately once all fragments were sent.
388 struct TransportFragmentAckMessage
391 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
393 struct GNUNET_MessageHeader header;
396 * Unique ID of the lowest fragment UUID being acknowledged.
398 uint32_t frag_uuid GNUNET_PACKED;
401 * Bitfield of up to 64 additional fragments following the
402 * @e msg_uuid being acknowledged by this message.
404 uint64_t extra_acks GNUNET_PACKED;
407 * Original message ID for of the message that all the
408 * fragments belong to.
410 struct GNUNET_ShortHashCode msg_uuid;
413 * How long was the ACK delayed relative to the average time of
414 * receipt of the fragments being acknowledged? Used to calculate
415 * the average RTT by taking the receipt time of the ack minus the
416 * average transmission time of the sender minus this value.
418 struct GNUNET_TIME_RelativeNBO avg_ack_delay;
421 * How long until the receiver will stop trying reassembly
424 struct GNUNET_TIME_RelativeNBO reassembly_timeout;
429 * Internal message used by transport for distance vector learning.
430 * If @e num_hops does not exceed the threshold, peers should append
431 * themselves to the peer list and flood the message (possibly only
432 * to a subset of their neighbours to limit discoverability of the
433 * network topology). To the extend that the @e bidirectional bits
434 * are set, peers may learn the inverse paths even if they did not
437 * Unless received on a bidirectional queue and @e num_hops just
438 * zero, peers that can forward to the initator should always try to
439 * forward to the initiator.
441 struct TransportDVLearn
444 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
446 struct GNUNET_MessageHeader header;
449 * Number of hops this messages has travelled, in NBO. Zero if
452 uint16_t num_hops GNUNET_PACKED;
455 * Bitmask of the last 16 hops indicating whether they are confirmed
456 * available (without DV) in both directions or not, in NBO. Used
457 * to possibly instantly learn a path in both directions. Each peer
458 * should shift this value by one to the left, and then set the
459 * lowest bit IF the current sender can be reached from it (without
462 uint16_t bidirectional GNUNET_PACKED;
465 * Peers receiving this message and delaying forwarding to other
466 * peers for any reason should increment this value such as to
467 * enable the origin to determine the actual network-only delay
468 * in addition to the real-time delay (assuming the message loops
469 * back to the origin).
471 struct GNUNET_TIME_Relative cummulative_non_network_delay;
474 * Identity of the peer that started this learning activity.
476 struct GNUNET_PeerIdentity initiator;
478 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
479 excluding the initiator of the DV trace; the last entry is the
480 current sender; the current peer must not be included. */
486 * Outer layer of an encapsulated message send over multiple hops.
487 * The path given only includes the identities of the subsequent
488 * peers, i.e. it will be empty if we are the receiver. Each
489 * forwarding peer should scan the list from the end, and if it can,
490 * forward to the respective peer. The list should then be shortened
491 * by all the entries up to and including that peer. Each hop should
492 * also increment @e total_hops to allow the receiver to get a precise
493 * estimate on the number of hops the message travelled. Senders must
494 * provide a learned path that thus should work, but intermediaries
495 * know of a shortcut, they are allowed to send the message via that
498 * If a peer finds itself still on the list, it must drop the message.
500 struct TransportDVBox
503 * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
505 struct GNUNET_MessageHeader header;
508 * Number of total hops this messages travelled. In NBO.
509 * @e origin sets this to zero, to be incremented at
512 uint16_t total_hops GNUNET_PACKED;
515 * Number of hops this messages includes. In NBO.
517 uint16_t num_hops GNUNET_PACKED;
520 * Identity of the peer that originated the message.
522 struct GNUNET_PeerIdentity origin;
524 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
525 excluding the @e origin and the current peer, the last must be
526 the ultimate target; if @e num_hops is zero, the receiver of this
527 message is the ultimate target. */
529 /* Followed by the actual message, which itself may be
530 another box, but not a DV_LEARN or DV_BOX message! */
534 GNUNET_NETWORK_STRUCT_END
538 * What type of client is the `struct TransportClient` about?
543 * We do not know yet (client is fresh).
548 * Is the CORE service, we need to forward traffic to it.
553 * It is a monitor, forward monitor data.
558 * It is a communicator, use for communication.
563 * "Application" telling us where to connect (i.e. TOPOLOGY, DHT or CADET).
570 * Entry in our cache of ephemeral keys we currently use.
571 * This way, we only sign an ephemeral once per @e target,
572 * and then can re-use it over multiple
573 * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION
574 * messages (as signing is expensive).
576 struct EphemeralCacheEntry
580 * Target's peer identity (we don't re-use ephemerals
581 * to limit linkability of messages).
583 struct GNUNET_PeerIdentity target;
586 * Signature affirming @e ephemeral_key of type
587 * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
589 struct GNUNET_CRYPTO_EddsaSignature sender_sig;
592 * How long is @e sender_sig valid
594 struct GNUNET_TIME_Absolute ephemeral_validity;
599 struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
602 * Our private ephemeral key.
604 struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
607 * Node in the ephemeral cache for this entry.
608 * Used for expiration.
610 struct GNUNET_CONTAINER_HeapNode *hn;
615 * Client connected to the transport service.
617 struct TransportClient;
621 * A neighbour that at least one communicator is connected to.
627 * Entry in our #dv_routes table, representing a (set of) distance
628 * vector routes to a particular peer.
630 struct DistanceVector;
633 * One possible hop towards a DV target.
635 struct DistanceVectorHop
639 * Kept in a MDLL, sorted by @e timeout.
641 struct DistanceVectorHop *next_dv;
644 * Kept in a MDLL, sorted by @e timeout.
646 struct DistanceVectorHop *prev_dv;
651 struct DistanceVectorHop *next_neighbour;
656 struct DistanceVectorHop *prev_neighbour;
659 * What would be the next hop to @e target?
661 struct Neighbour *next_hop;
664 * Distance vector entry this hop belongs with.
666 struct DistanceVector *dv;
669 * Array of @e distance hops to the target, excluding @e next_hop.
670 * NULL if the entire path is us to @e next_hop to `target`. Allocated
671 * at the end of this struct.
673 const struct GNUNET_PeerIdentity *path;
676 * At what time do we forget about this path unless we see it again
679 struct GNUNET_TIME_Absolute timeout;
682 * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
683 * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
685 unsigned int distance;
690 * Entry in our #dv_routes table, representing a (set of) distance
691 * vector routes to a particular peer.
693 struct DistanceVector
697 * To which peer is this a route?
699 struct GNUNET_PeerIdentity target;
702 * Known paths to @e target.
704 struct DistanceVectorHop *dv_head;
707 * Known paths to @e target.
709 struct DistanceVectorHop *dv_tail;
712 * Task scheduled to purge expired paths from @e dv_head MDLL.
714 struct GNUNET_SCHEDULER_Task *timeout_task;
719 * A queue is a message queue provided by a communicator
720 * via which we can reach a particular neighbour.
726 * Entry identifying transmission in one of our `struct
727 * Queue` which still awaits an ACK. This is used to
728 * ensure we do not overwhelm a communicator and limit the number of
729 * messages outstanding per communicator (say in case communicator is
730 * CPU bound) and per queue (in case bandwidth allocation exceeds
731 * what the communicator can actually provide towards a particular
740 struct QueueEntry *next;
745 struct QueueEntry *prev;
748 * Queue this entry is queued with.
753 * Message ID used for this message with the queue used for transmission.
760 * A queue is a message queue provided by a communicator
761 * via which we can reach a particular neighbour.
768 struct Queue *next_neighbour;
773 struct Queue *prev_neighbour;
778 struct Queue *prev_client;
783 struct Queue *next_client;
786 * Head of DLL of unacked transmission requests.
788 struct QueueEntry *queue_head;
791 * End of DLL of unacked transmission requests.
793 struct QueueEntry *queue_tail;
796 * Which neighbour is this queue for?
798 struct Neighbour *neighbour;
801 * Which communicator offers this queue?
803 struct TransportClient *tc;
806 * Address served by the queue.
811 * Task scheduled for the time when this queue can (likely) transmit the
812 * next message. Still needs to check with the @e tracker_out to be sure.
814 struct GNUNET_SCHEDULER_Task *transmit_task;
817 * Our current RTT estimate for this queue.
819 struct GNUNET_TIME_Relative rtt;
822 * Message ID generator for transmissions on this queue.
827 * Unique identifier of this queue with the communicator.
832 * Maximum transmission unit supported by this queue.
837 * Distance to the target of this queue.
844 uint32_t num_msg_pending;
849 uint32_t num_bytes_pending;
852 * Length of the DLL starting at @e queue_head.
854 unsigned int queue_length;
857 * Network type offered by this queue.
859 enum GNUNET_NetworkType nt;
862 * Connection status for this queue.
864 enum GNUNET_TRANSPORT_ConnectionStatus cs;
867 * How much outbound bandwidth do we have available for this queue?
869 struct GNUNET_BANDWIDTH_Tracker tracker_out;
872 * How much inbound bandwidth do we have available for this queue?
874 struct GNUNET_BANDWIDTH_Tracker tracker_in;
879 * Information we keep for a message that we are reassembling.
881 struct ReassemblyContext
885 * Original message ID for of the message that all the
886 * fragments belong to.
888 struct GNUNET_ShortHashCode msg_uuid;
891 * Which neighbour is this context for?
893 struct Neighbour *neighbour;
896 * Entry in the reassembly heap (sorted by expiration).
898 struct GNUNET_CONTAINER_HeapNode *hn;
901 * Bitfield with @e msg_size bits representing the positions
902 * where we have received fragments. When we receive a fragment,
903 * we check the bits in @e bitfield before incrementing @e msg_missing.
905 * Allocated after the reassembled message.
910 * Task for sending ACK. We may send ACKs either because of hitting
911 * the @e extra_acks limit, or based on time and @e num_acks. This
912 * task is for the latter case.
914 struct GNUNET_SCHEDULER_Task *ack_task;
917 * At what time will we give up reassembly of this message?
919 struct GNUNET_TIME_Absolute reassembly_timeout;
922 * Average delay of all acks in @e extra_acks and @e frag_uuid.
923 * Should be reset to zero when @e num_acks is set to 0.
925 struct GNUNET_TIME_Relative avg_ack_delay;
928 * Time we received the last fragment. @e avg_ack_delay must be
929 * incremented by now - @e last_frag multiplied by @e num_acks.
931 struct GNUNET_TIME_Absolute last_frag;
934 * Bitfield of up to 64 additional fragments following @e frag_uuid
935 * to be acknowledged in the next cummulative ACK.
940 * Unique ID of the lowest fragment UUID to be acknowledged in the
941 * next cummulative ACK. Only valid if @e num_acks > 0.
946 * Number of ACKs we have accumulated so far. Reset to 0
947 * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
949 unsigned int num_acks;
952 * How big is the message we are reassembling in total?
957 * How many bytes of the message are still missing? Defragmentation
958 * is complete when @e msg_missing == 0.
960 uint16_t msg_missing;
962 /* Followed by @e msg_size bytes of the (partially) defragmented original message */
964 /* Followed by @e bitfield data */
969 * A neighbour that at least one communicator is connected to.
975 * Which peer is this about?
977 struct GNUNET_PeerIdentity pid;
980 * Map with `struct ReassemblyContext` structs for fragments under
981 * reassembly. May be NULL if we currently have no fragments from
982 * this @e pid (lazy initialization).
984 struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
987 * Heap with `struct ReassemblyContext` structs for fragments under
988 * reassembly. May be NULL if we currently have no fragments from
989 * this @e pid (lazy initialization).
991 struct GNUNET_CONTAINER_Heap *reassembly_heap;
994 * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
996 struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
999 * Head of list of messages pending for this neighbour.
1001 struct PendingMessage *pending_msg_head;
1004 * Tail of list of messages pending for this neighbour.
1006 struct PendingMessage *pending_msg_tail;
1009 * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1010 * purged if this neighbour goes down.
1012 struct DistanceVectorHop *dv_head;
1015 * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1016 * purged if this neighbour goes down.
1018 struct DistanceVectorHop *dv_tail;
1021 * Head of DLL of queues to this peer.
1023 struct Queue *queue_head;
1026 * Tail of DLL of queues to this peer.
1028 struct Queue *queue_tail;
1031 * Task run to cleanup pending messages that have exceeded their timeout.
1033 struct GNUNET_SCHEDULER_Task *timeout_task;
1036 * Quota at which CORE is allowed to transmit to this peer.
1038 * FIXME: not yet used, tricky to get right given multiple queues!
1039 * (=> Idea: measure???)
1040 * FIXME: how do we set this value initially when we tell CORE?
1041 * Options: start at a minimum value or at literally zero?
1042 * (=> Current thought: clean would be zero!)
1044 struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1047 * What is the earliest timeout of any message in @e pending_msg_tail?
1049 struct GNUNET_TIME_Absolute earliest_timeout;
1055 * A peer that an application (client) would like us to talk to directly.
1061 * Which peer is this about?
1063 struct GNUNET_PeerIdentity pid;
1066 * Client responsible for the request.
1068 struct TransportClient *tc;
1071 * Handle for watching the peerstore for HELLOs for this peer.
1073 struct GNUNET_PEERSTORE_WatchContext *wc;
1076 * What kind of performance preference does this @e tc have?
1078 enum GNUNET_MQ_PreferenceKind pk;
1081 * How much bandwidth would this @e tc like to see?
1083 struct GNUNET_BANDWIDTH_Value32NBO bw;
1089 * Types of different pending messages.
1091 enum PendingMessageType
1095 * Ordinary message received from the CORE service.
1102 PMT_FRAGMENT_BOX = 1,
1107 PMT_RELIABILITY_BOX = 2,
1110 * Any type of acknowledgement.
1112 PMT_ACKNOWLEDGEMENT = 3
1119 * Transmission request that is awaiting delivery. The original
1120 * transmission requests from CORE may be too big for some queues.
1121 * In this case, a *tree* of fragments is created. At each
1122 * level of the tree, fragments are kept in a DLL ordered by which
1123 * fragment should be sent next (at the head). The tree is searched
1124 * top-down, with the original message at the root.
1126 * To select a node for transmission, first it is checked if the
1127 * current node's message fits with the MTU. If it does not, we
1128 * either calculate the next fragment (based on @e frag_off) from the
1129 * current node, or, if all fragments have already been created,
1130 * descend to the @e head_frag. Even though the node was already
1131 * fragmented, the fragment may be too big if the fragment was
1132 * generated for a queue with a larger MTU. In this case, the node
1133 * may be fragmented again, thus creating a tree.
1135 * When acknowledgements for fragments are received, the tree
1136 * must be pruned, removing those parts that were already
1137 * acknowledged. When fragments are sent over a reliable
1138 * channel, they can be immediately removed.
1140 * If a message is ever fragmented, then the original "full" message
1141 * is never again transmitted (even if it fits below the MTU), and
1142 * only (remaining) fragments are sent.
1144 struct PendingMessage
1147 * Kept in a MDLL of messages for this @a target.
1149 struct PendingMessage *next_neighbour;
1152 * Kept in a MDLL of messages for this @a target.
1154 struct PendingMessage *prev_neighbour;
1157 * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1159 struct PendingMessage *next_client;
1162 * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1164 struct PendingMessage *prev_client;
1167 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1169 struct PendingMessage *next_frag;
1172 * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX)
1174 struct PendingMessage *prev_frag;
1177 * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1179 struct PendingMessage *bpm;
1182 * Target of the request.
1184 struct Neighbour *target;
1187 * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1189 struct TransportClient *client;
1192 * Head of a MDLL of fragments created for this core message.
1194 struct PendingMessage *head_frag;
1197 * Tail of a MDLL of fragments created for this core message.
1199 struct PendingMessage *tail_frag;
1202 * Our parent in the fragmentation tree.
1204 struct PendingMessage *frag_parent;
1207 * At what time should we give up on the transmission (and no longer retry)?
1209 struct GNUNET_TIME_Absolute timeout;
1212 * What is the earliest time for us to retry transmission of this message?
1214 struct GNUNET_TIME_Absolute next_attempt;
1217 * UUID to use for this message (used for reassembly of fragments, only
1218 * initialized if @e msg_uuid_set is #GNUNET_YES).
1220 struct GNUNET_ShortHashCode msg_uuid;
1223 * Counter incremented per generated fragment.
1225 uint32_t frag_uuidgen;
1228 * Type of the pending message.
1230 enum PendingMessageType pmt;
1233 * Size of the original message.
1238 * Offset at which we should generate the next fragment.
1243 * #GNUNET_YES once @e msg_uuid was initialized
1245 int16_t msg_uuid_set;
1247 /* Followed by @e bytes_msg to transmit */
1252 * One of the addresses of this peer.
1254 struct AddressListEntry
1260 struct AddressListEntry *next;
1265 struct AddressListEntry *prev;
1268 * Which communicator provides this address?
1270 struct TransportClient *tc;
1273 * The actual address.
1275 const char *address;
1278 * Current context for storing this address in the peerstore.
1280 struct GNUNET_PEERSTORE_StoreContext *sc;
1283 * Task to periodically do @e st operation.
1285 struct GNUNET_SCHEDULER_Task *st;
1288 * What is a typical lifetime the communicator expects this
1289 * address to have? (Always from now.)
1291 struct GNUNET_TIME_Relative expiration;
1294 * Address identifier used by the communicator.
1299 * Network type offered by this address.
1301 enum GNUNET_NetworkType nt;
1307 * Client connected to the transport service.
1309 struct TransportClient
1315 struct TransportClient *next;
1320 struct TransportClient *prev;
1323 * Handle to the client.
1325 struct GNUNET_SERVICE_Client *client;
1328 * Message queue to the client.
1330 struct GNUNET_MQ_Handle *mq;
1333 * What type of client is this?
1335 enum ClientType type;
1341 * Information for @e type #CT_CORE.
1346 * Head of list of messages pending for this client, sorted by
1347 * transmission time ("next_attempt" + possibly internal prioritization).
1349 struct PendingMessage *pending_msg_head;
1352 * Tail of list of messages pending for this client.
1354 struct PendingMessage *pending_msg_tail;
1359 * Information for @e type #CT_MONITOR.
1364 * Peer identity to monitor the addresses of.
1365 * Zero to monitor all neighbours. Valid if
1366 * @e type is #CT_MONITOR.
1368 struct GNUNET_PeerIdentity peer;
1371 * Is this a one-shot monitor?
1379 * Information for @e type #CT_COMMUNICATOR.
1383 * If @e type is #CT_COMMUNICATOR, this communicator
1384 * supports communicating using these addresses.
1386 char *address_prefix;
1389 * Head of DLL of queues offered by this communicator.
1391 struct Queue *queue_head;
1394 * Tail of DLL of queues offered by this communicator.
1396 struct Queue *queue_tail;
1399 * Head of list of the addresses of this peer offered by this communicator.
1401 struct AddressListEntry *addr_head;
1404 * Tail of list of the addresses of this peer offered by this communicator.
1406 struct AddressListEntry *addr_tail;
1409 * Number of queue entries in all queues to this communicator. Used
1410 * throttle sending to a communicator if we see that the communicator
1411 * is globally unable to keep up.
1413 unsigned int total_queue_length;
1416 * Characteristics of this communicator.
1418 enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1423 * Information for @e type #CT_APPLICATION
1428 * Map of requests for peers the given client application would like to
1429 * see connections for. Maps from PIDs to `struct PeerRequest`.
1431 struct GNUNET_CONTAINER_MultiPeerMap *requests;
1441 * Head of linked list of all clients to this service.
1443 static struct TransportClient *clients_head;
1446 * Tail of linked list of all clients to this service.
1448 static struct TransportClient *clients_tail;
1451 * Statistics handle.
1453 static struct GNUNET_STATISTICS_Handle *GST_stats;
1456 * Configuration handle.
1458 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1463 static struct GNUNET_PeerIdentity GST_my_identity;
1468 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1471 * Map from PIDs to `struct Neighbour` entries. A peer is
1472 * a neighbour if we have an MQ to it from some communicator.
1474 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1477 * Map from PIDs to `struct DistanceVector` entries describing
1478 * known paths to the peer.
1480 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1483 * Database for peer's HELLOs.
1485 static struct GNUNET_PEERSTORE_Handle *peerstore;
1488 * Heap sorting `struct EphemeralCacheEntry` by their
1489 * key/signature validity.
1491 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1494 * Hash map for looking up `struct EphemeralCacheEntry`s
1495 * by peer identity. (We may have ephemerals in our
1496 * cache for which we do not have a neighbour entry,
1497 * and similar many neighbours may not need ephemerals,
1498 * so we use a second map.)
1500 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1503 * Task to free expired ephemerals.
1505 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
1509 * Free cached ephemeral key.
1511 * @param ece cached signature to free
1514 free_ephemeral (struct EphemeralCacheEntry *ece)
1516 GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1519 GNUNET_CONTAINER_heap_remove_node (ece->hn);
1525 * Lookup neighbour record for peer @a pid.
1527 * @param pid neighbour to look for
1528 * @return NULL if we do not have this peer as a neighbour
1530 static struct Neighbour *
1531 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1533 return GNUNET_CONTAINER_multipeermap_get (neighbours,
1539 * Details about what to notify monitors about.
1544 * @deprecated To be discussed if we keep these...
1546 struct GNUNET_TIME_Absolute last_validation;
1547 struct GNUNET_TIME_Absolute valid_until;
1548 struct GNUNET_TIME_Absolute next_validation;
1551 * Current round-trip time estimate.
1553 struct GNUNET_TIME_Relative rtt;
1556 * Connection status.
1558 enum GNUNET_TRANSPORT_ConnectionStatus cs;
1563 uint32_t num_msg_pending;
1568 uint32_t num_bytes_pending;
1575 * Free a @dvh, and if it is the last path to the `target`,also
1576 * free the associated DV entry in #dv_routes.
1578 * @param dvh hop to free
1581 free_distance_vector_hop (struct DistanceVectorHop *dvh)
1583 struct Neighbour *n = dvh->next_hop;
1584 struct DistanceVector *dv = dvh->dv;
1586 GNUNET_CONTAINER_MDLL_remove (neighbour,
1590 GNUNET_CONTAINER_MDLL_remove (dv,
1595 if (NULL == dv->dv_head)
1597 GNUNET_assert (GNUNET_YES ==
1598 GNUNET_CONTAINER_multipeermap_remove (dv_routes,
1601 if (NULL != dv->timeout_task)
1602 GNUNET_SCHEDULER_cancel (dv->timeout_task);
1609 * Free entry in #dv_routes. First frees all hops to the target, and
1610 * the last target will implicitly free @a dv as well.
1612 * @param dv route to free
1615 free_dv_route (struct DistanceVector *dv)
1617 struct DistanceVectorHop *dvh;
1619 while (NULL != (dvh = dv->dv_head))
1620 free_distance_vector_hop (dvh);
1625 * Notify monitor @a tc about an event. That @a tc
1626 * cares about the event has already been checked.
1628 * Send @a tc information in @a me about a @a peer's status with
1629 * respect to some @a address to all monitors that care.
1631 * @param tc monitor to inform
1632 * @param peer peer the information is about
1633 * @param address address the information is about
1634 * @param nt network type associated with @a address
1635 * @param me detailed information to transmit
1638 notify_monitor (struct TransportClient *tc,
1639 const struct GNUNET_PeerIdentity *peer,
1640 const char *address,
1641 enum GNUNET_NetworkType nt,
1642 const struct MonitorEvent *me)
1644 struct GNUNET_MQ_Envelope *env;
1645 struct GNUNET_TRANSPORT_MonitorData *md;
1646 size_t addr_len = strlen (address) + 1;
1648 env = GNUNET_MQ_msg_extra (md,
1650 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1651 md->nt = htonl ((uint32_t) nt);
1653 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1654 md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1655 md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1656 md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1657 md->cs = htonl ((uint32_t) me->cs);
1658 md->num_msg_pending = htonl (me->num_msg_pending);
1659 md->num_bytes_pending = htonl (me->num_bytes_pending);
1663 GNUNET_MQ_send (tc->mq,
1669 * Send information in @a me about a @a peer's status with respect
1670 * to some @a address to all monitors that care.
1672 * @param peer peer the information is about
1673 * @param address address the information is about
1674 * @param nt network type associated with @a address
1675 * @param me detailed information to transmit
1678 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1679 const char *address,
1680 enum GNUNET_NetworkType nt,
1681 const struct MonitorEvent *me)
1683 static struct GNUNET_PeerIdentity zero;
1685 for (struct TransportClient *tc = clients_head;
1689 if (CT_MONITOR != tc->type)
1691 if (tc->details.monitor.one_shot)
1693 if ( (0 != memcmp (&tc->details.monitor.peer,
1696 (0 != memcmp (&tc->details.monitor.peer,
1710 * Called whenever a client connects. Allocates our
1711 * data structures associated with that client.
1713 * @param cls closure, NULL
1714 * @param client identification of the client
1715 * @param mq message queue for the client
1716 * @return our `struct TransportClient`
1719 client_connect_cb (void *cls,
1720 struct GNUNET_SERVICE_Client *client,
1721 struct GNUNET_MQ_Handle *mq)
1723 struct TransportClient *tc;
1725 tc = GNUNET_new (struct TransportClient);
1726 tc->client = client;
1728 GNUNET_CONTAINER_DLL_insert (clients_head,
1731 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1732 "Client %p connected\n",
1741 * @param rc data structure to free
1744 free_reassembly_context (struct ReassemblyContext *rc)
1746 struct Neighbour *n = rc->neighbour;
1748 GNUNET_assert (rc ==
1749 GNUNET_CONTAINER_heap_remove_node (rc->hn));
1750 GNUNET_assert (GNUNET_OK ==
1751 GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
1759 * Task run to clean up reassembly context of a neighbour that have expired.
1761 * @param cls a `struct Neighbour`
1764 reassembly_cleanup_task (void *cls)
1766 struct Neighbour *n = cls;
1767 struct ReassemblyContext *rc;
1769 n->reassembly_timeout_task = NULL;
1770 while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
1772 if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
1774 free_reassembly_context (rc);
1777 GNUNET_assert (NULL == n->reassembly_timeout_task);
1778 n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
1779 &reassembly_cleanup_task,
1787 * function called to #free_reassembly_context().
1791 * @param value a `struct ReassemblyContext` to free
1792 * @return #GNUNET_OK (continue iteration)
1795 free_reassembly_cb (void *cls,
1796 const struct GNUNET_ShortHashCode *key,
1799 struct ReassemblyContext *rc = value;
1803 free_reassembly_context (rc);
1809 * Release memory used by @a neighbour.
1811 * @param neighbour neighbour entry to free
1814 free_neighbour (struct Neighbour *neighbour)
1816 struct DistanceVectorHop *dvh;
1818 GNUNET_assert (NULL == neighbour->queue_head);
1819 GNUNET_assert (GNUNET_YES ==
1820 GNUNET_CONTAINER_multipeermap_remove (neighbours,
1823 if (NULL != neighbour->timeout_task)
1824 GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1825 if (NULL != neighbour->reassembly_map)
1827 GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
1828 &free_reassembly_cb,
1830 GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
1831 neighbour->reassembly_map = NULL;
1832 GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
1833 neighbour->reassembly_heap = NULL;
1835 while (NULL != (dvh = neighbour->dv_head))
1836 free_distance_vector_hop (dvh);
1837 if (NULL != neighbour->reassembly_timeout_task)
1838 GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
1839 GNUNET_free (neighbour);
1844 * Send message to CORE clients that we lost a connection.
1846 * @param tc client to inform (must be CORE client)
1847 * @param pid peer the connection is for
1848 * @param quota_out current quota for the peer
1851 core_send_connect_info (struct TransportClient *tc,
1852 const struct GNUNET_PeerIdentity *pid,
1853 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1855 struct GNUNET_MQ_Envelope *env;
1856 struct ConnectInfoMessage *cim;
1858 GNUNET_assert (CT_CORE == tc->type);
1859 env = GNUNET_MQ_msg (cim,
1860 GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1861 cim->quota_out = quota_out;
1863 GNUNET_MQ_send (tc->mq,
1869 * Send message to CORE clients that we gained a connection
1871 * @param pid peer the queue was for
1872 * @param quota_out current quota for the peer
1875 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1876 struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1878 for (struct TransportClient *tc = clients_head;
1882 if (CT_CORE != tc->type)
1884 core_send_connect_info (tc,
1892 * Send message to CORE clients that we lost a connection.
1894 * @param pid peer the connection was for
1897 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1899 for (struct TransportClient *tc = clients_head;
1903 struct GNUNET_MQ_Envelope *env;
1904 struct DisconnectInfoMessage *dim;
1906 if (CT_CORE != tc->type)
1908 env = GNUNET_MQ_msg (dim,
1909 GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1911 GNUNET_MQ_send (tc->mq,
1918 * We believe we are ready to transmit a message on a queue. Double-checks
1919 * with the queue's "tracker_out" and then gives the message to the
1920 * communicator for transmission (updating the tracker, and re-scheduling
1921 * itself if applicable).
1923 * @param cls the `struct Queue` to process transmissions for
1926 transmit_on_queue (void *cls);
1930 * Schedule next run of #transmit_on_queue(). Does NOTHING if
1931 * we should run immediately or if the message queue is empty.
1932 * Test for no task being added AND queue not being empty to
1933 * transmit immediately afterwards! This function must only
1934 * be called if the message queue is non-empty!
1936 * @param queue the queue to do scheduling for
1939 schedule_transmit_on_queue (struct Queue *queue)
1941 struct Neighbour *n = queue->neighbour;
1942 struct PendingMessage *pm = n->pending_msg_head;
1943 struct GNUNET_TIME_Relative out_delay;
1946 GNUNET_assert (NULL != pm);
1947 if (queue->tc->details.communicator.total_queue_length >=
1948 COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1950 GNUNET_STATISTICS_update (GST_stats,
1951 "# Transmission throttled due to communicator queue limit",
1956 if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
1958 GNUNET_STATISTICS_update (GST_stats,
1959 "# Transmission throttled due to queue queue limit",
1965 wsize = (0 == queue->mtu)
1966 ? pm->bytes_msg /* FIXME: add overheads? */
1968 out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1970 out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1972 if (0 == out_delay.rel_value_us)
1973 return; /* we should run immediately! */
1974 /* queue has changed since we were scheduled, reschedule again */
1975 queue->transmit_task
1976 = GNUNET_SCHEDULER_add_delayed (out_delay,
1979 if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1980 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1981 "Next transmission on queue `%s' in %s (high delay)\n",
1983 GNUNET_STRINGS_relative_time_to_string (out_delay,
1986 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1987 "Next transmission on queue `%s' in %s\n",
1989 GNUNET_STRINGS_relative_time_to_string (out_delay,
1997 * @param queue the queue to free
2000 free_queue (struct Queue *queue)
2002 struct Neighbour *neighbour = queue->neighbour;
2003 struct TransportClient *tc = queue->tc;
2004 struct MonitorEvent me = {
2005 .cs = GNUNET_TRANSPORT_CS_DOWN,
2006 .rtt = GNUNET_TIME_UNIT_FOREVER_REL
2008 struct QueueEntry *qe;
2011 if (NULL != queue->transmit_task)
2013 GNUNET_SCHEDULER_cancel (queue->transmit_task);
2014 queue->transmit_task = NULL;
2016 GNUNET_CONTAINER_MDLL_remove (neighbour,
2017 neighbour->queue_head,
2018 neighbour->queue_tail,
2020 GNUNET_CONTAINER_MDLL_remove (client,
2021 tc->details.communicator.queue_head,
2022 tc->details.communicator.queue_tail,
2024 maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
2025 while (NULL != (qe = queue->queue_head))
2027 GNUNET_CONTAINER_DLL_remove (queue->queue_head,
2030 queue->queue_length--;
2031 tc->details.communicator.total_queue_length--;
2034 GNUNET_assert (0 == queue->queue_length);
2036 (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2038 /* Communicator dropped below threshold, resume all queues */
2039 GNUNET_STATISTICS_update (GST_stats,
2040 "# Transmission throttled due to communicator queue limit",
2043 for (struct Queue *s = tc->details.communicator.queue_head;
2046 schedule_transmit_on_queue (s);
2048 notify_monitors (&neighbour->pid,
2052 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2053 GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2054 GNUNET_free (queue);
2055 if (NULL == neighbour->queue_head)
2057 cores_send_disconnect_info (&neighbour->pid);
2058 free_neighbour (neighbour);
2066 * @param ale address list entry to free
2069 free_address_list_entry (struct AddressListEntry *ale)
2071 struct TransportClient *tc = ale->tc;
2073 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2074 tc->details.communicator.addr_tail,
2076 if (NULL != ale->sc)
2078 GNUNET_PEERSTORE_store_cancel (ale->sc);
2081 if (NULL != ale->st)
2083 GNUNET_SCHEDULER_cancel (ale->st);
2091 * Stop the peer request in @a value.
2093 * @param cls a `struct TransportClient` that no longer makes the request
2094 * @param pid the peer's identity
2095 * @param value a `struct PeerRequest`
2096 * @return #GNUNET_YES (always)
2099 stop_peer_request (void *cls,
2100 const struct GNUNET_PeerIdentity *pid,
2103 struct TransportClient *tc = cls;
2104 struct PeerRequest *pr = value;
2106 GNUNET_PEERSTORE_watch_cancel (pr->wc);
2107 GNUNET_assert (GNUNET_YES ==
2108 GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
2118 * Called whenever a client is disconnected. Frees our
2119 * resources associated with that client.
2121 * @param cls closure, NULL
2122 * @param client identification of the client
2123 * @param app_ctx our `struct TransportClient`
2126 client_disconnect_cb (void *cls,
2127 struct GNUNET_SERVICE_Client *client,
2130 struct TransportClient *tc = app_ctx;
2132 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2133 "Client %p disconnected, cleaning up.\n",
2135 GNUNET_CONTAINER_DLL_remove (clients_head,
2144 struct PendingMessage *pm;
2146 while (NULL != (pm = tc->details.core.pending_msg_head))
2148 GNUNET_CONTAINER_MDLL_remove (client,
2149 tc->details.core.pending_msg_head,
2150 tc->details.core.pending_msg_tail,
2158 case CT_COMMUNICATOR:
2161 struct AddressListEntry *ale;
2163 while (NULL != (q = tc->details.communicator.queue_head))
2165 while (NULL != (ale = tc->details.communicator.addr_head))
2166 free_address_list_entry (ale);
2167 GNUNET_free (tc->details.communicator.address_prefix);
2170 case CT_APPLICATION:
2171 GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
2174 GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
2182 * Iterator telling new CORE client about all existing
2183 * connections to peers.
2185 * @param cls the new `struct TransportClient`
2186 * @param pid a connected peer
2187 * @param value the `struct Neighbour` with more information
2188 * @return #GNUNET_OK (continue to iterate)
2191 notify_client_connect_info (void *cls,
2192 const struct GNUNET_PeerIdentity *pid,
2195 struct TransportClient *tc = cls;
2196 struct Neighbour *neighbour = value;
2198 core_send_connect_info (tc,
2200 neighbour->quota_out);
2206 * Initialize a "CORE" client. We got a start message from this
2207 * client, so add it to the list of clients for broadcasting of
2210 * @param cls the client
2211 * @param start the start message that was sent
2214 handle_client_start (void *cls,
2215 const struct StartMessage *start)
2217 struct TransportClient *tc = cls;
2220 options = ntohl (start->options);
2221 if ( (0 != (1 & options)) &&
2223 memcmp (&start->self,
2225 sizeof (struct GNUNET_PeerIdentity)) ) )
2227 /* client thinks this is a different peer, reject */
2229 GNUNET_SERVICE_client_drop (tc->client);
2232 if (CT_NONE != tc->type)
2235 GNUNET_SERVICE_client_drop (tc->client);
2239 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2240 ¬ify_client_connect_info,
2242 GNUNET_SERVICE_client_continue (tc->client);
2247 * Client asked for transmission to a peer. Process the request.
2249 * @param cls the client
2250 * @param obm the send message that was sent
2253 check_client_send (void *cls,
2254 const struct OutboundMessage *obm)
2256 struct TransportClient *tc = cls;
2258 const struct GNUNET_MessageHeader *obmm;
2260 if (CT_CORE != tc->type)
2263 return GNUNET_SYSERR;
2265 size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2266 if (size < sizeof (struct GNUNET_MessageHeader))
2269 return GNUNET_SYSERR;
2271 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2272 if (size != ntohs (obmm->size))
2275 return GNUNET_SYSERR;
2282 * Free fragment tree below @e root, excluding @e root itself.
2284 * @param root root of the tree to free
2287 free_fragment_tree (struct PendingMessage *root)
2289 struct PendingMessage *frag;
2291 while (NULL != (frag = root->head_frag))
2293 free_fragment_tree (frag);
2294 GNUNET_CONTAINER_MDLL_remove (frag,
2304 * Release memory associated with @a pm and remove @a pm from associated
2305 * data structures. @a pm must be a top-level pending message and not
2306 * a fragment in the tree. The entire tree is freed (if applicable).
2308 * @param pm the pending message to free
2311 free_pending_message (struct PendingMessage *pm)
2313 struct TransportClient *tc = pm->client;
2314 struct Neighbour *target = pm->target;
2318 GNUNET_CONTAINER_MDLL_remove (client,
2319 tc->details.core.pending_msg_head,
2320 tc->details.core.pending_msg_tail,
2323 GNUNET_CONTAINER_MDLL_remove (neighbour,
2324 target->pending_msg_head,
2325 target->pending_msg_tail,
2327 free_fragment_tree (pm);
2328 GNUNET_free_non_null (pm->bpm);
2334 * Send a response to the @a pm that we have processed a
2335 * "send" request with status @a success. We
2336 * transmitted @a bytes_physical on the actual wire.
2337 * Sends a confirmation to the "core" client responsible
2338 * for the original request and free's @a pm.
2340 * @param pm handle to the original pending message
2341 * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2342 * for transmission failure
2343 * @param bytes_physical amount of bandwidth consumed
2346 client_send_response (struct PendingMessage *pm,
2348 uint32_t bytes_physical)
2350 struct TransportClient *tc = pm->client;
2351 struct Neighbour *target = pm->target;
2352 struct GNUNET_MQ_Envelope *env;
2353 struct SendOkMessage *som;
2357 env = GNUNET_MQ_msg (som,
2358 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2359 som->success = htonl ((uint32_t) success);
2360 som->bytes_msg = htons (pm->bytes_msg);
2361 som->bytes_physical = htonl (bytes_physical);
2362 som->peer = target->pid;
2363 GNUNET_MQ_send (tc->mq,
2366 free_pending_message (pm);
2371 * Checks the message queue for a neighbour for messages that have timed
2372 * out and purges them.
2374 * @param cls a `struct Neighbour`
2377 check_queue_timeouts (void *cls)
2379 struct Neighbour *n = cls;
2380 struct PendingMessage *pm;
2381 struct GNUNET_TIME_Absolute now;
2382 struct GNUNET_TIME_Absolute earliest_timeout;
2384 n->timeout_task = NULL;
2385 earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2386 now = GNUNET_TIME_absolute_get ();
2387 for (struct PendingMessage *pos = n->pending_msg_head;
2391 pm = pos->next_neighbour;
2392 if (pos->timeout.abs_value_us <= now.abs_value_us)
2394 GNUNET_STATISTICS_update (GST_stats,
2395 "# messages dropped (timeout before confirmation)",
2398 client_send_response (pm,
2403 earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2406 n->earliest_timeout = earliest_timeout;
2407 if (NULL != n->pending_msg_head)
2408 n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2409 &check_queue_timeouts,
2415 * Client asked for transmission to a peer. Process the request.
2417 * @param cls the client
2418 * @param obm the send message that was sent
2421 handle_client_send (void *cls,
2422 const struct OutboundMessage *obm)
2424 struct TransportClient *tc = cls;
2425 struct PendingMessage *pm;
2426 const struct GNUNET_MessageHeader *obmm;
2427 struct Neighbour *target;
2431 GNUNET_assert (CT_CORE == tc->type);
2432 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2433 bytes_msg = ntohs (obmm->size);
2434 target = lookup_neighbour (&obm->peer);
2437 /* Failure: don't have this peer as a neighbour (anymore).
2438 Might have gone down asynchronously, so this is NOT
2439 a protocol violation by CORE. Still count the event,
2440 as this should be rare. */
2441 struct GNUNET_MQ_Envelope *env;
2442 struct SendOkMessage *som;
2444 env = GNUNET_MQ_msg (som,
2445 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2446 som->success = htonl (GNUNET_SYSERR);
2447 som->bytes_msg = htonl (bytes_msg);
2448 som->bytes_physical = htonl (0);
2449 som->peer = obm->peer;
2450 GNUNET_MQ_send (tc->mq,
2452 GNUNET_SERVICE_client_continue (tc->client);
2453 GNUNET_STATISTICS_update (GST_stats,
2454 "# messages dropped (neighbour unknown)",
2459 was_empty = (NULL == target->pending_msg_head);
2460 pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2462 pm->target = target;
2463 pm->bytes_msg = bytes_msg;
2464 pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2468 GNUNET_CONTAINER_MDLL_insert (neighbour,
2469 target->pending_msg_head,
2470 target->pending_msg_tail,
2472 GNUNET_CONTAINER_MDLL_insert (client,
2473 tc->details.core.pending_msg_head,
2474 tc->details.core.pending_msg_tail,
2476 if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2478 target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2479 if (NULL != target->timeout_task)
2480 GNUNET_SCHEDULER_cancel (target->timeout_task);
2481 target->timeout_task
2482 = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2483 &check_queue_timeouts,
2487 return; /* all queues must already be busy */
2488 for (struct Queue *queue = target->queue_head;
2490 queue = queue->next_neighbour)
2492 /* try transmission on any queue that is idle */
2493 if (NULL == queue->transmit_task)
2494 queue->transmit_task = GNUNET_SCHEDULER_add_now (&transmit_on_queue,
2501 * Communicator started. Test message is well-formed.
2503 * @param cls the client
2504 * @param cam the send message that was sent
2507 check_communicator_available (void *cls,
2508 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2510 struct TransportClient *tc = cls;
2513 if (CT_NONE != tc->type)
2516 return GNUNET_SYSERR;
2518 tc->type = CT_COMMUNICATOR;
2519 size = ntohs (cam->header.size) - sizeof (*cam);
2521 return GNUNET_OK; /* receive-only communicator */
2522 GNUNET_MQ_check_zero_termination (cam);
2528 * Communicator started. Process the request.
2530 * @param cls the client
2531 * @param cam the send message that was sent
2534 handle_communicator_available (void *cls,
2535 const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2537 struct TransportClient *tc = cls;
2540 size = ntohs (cam->header.size) - sizeof (*cam);
2542 return; /* receive-only communicator */
2543 tc->details.communicator.address_prefix
2544 = GNUNET_strdup ((const char *) &cam[1]);
2545 tc->details.communicator.cc
2546 = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2547 GNUNET_SERVICE_client_continue (tc->client);
2552 * Communicator requests backchannel transmission. Check the request.
2554 * @param cls the client
2555 * @param cb the send message that was sent
2556 * @return #GNUNET_OK if message is well-formed
2559 check_communicator_backchannel (void *cls,
2560 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2562 const struct GNUNET_MessageHeader *inbox;
2567 msize = ntohs (cb->header.size) - sizeof (*cb);
2568 if (UINT16_MAX - msize >
2569 sizeof (struct TransportBackchannelEncapsulationMessage) +
2570 sizeof (struct TransportBackchannelRequestPayload) )
2573 return GNUNET_SYSERR;
2575 inbox = (const struct GNUNET_MessageHeader *) &cb[1];
2576 isize = ntohs (inbox->size);
2580 return GNUNET_SYSERR;
2582 is = (const char *) inbox;
2585 GNUNET_assert (msize > 0);
2586 if ('\0' != is[msize-1])
2589 return GNUNET_SYSERR;
2596 * Remove memory used by expired ephemeral keys.
2601 expire_ephemerals (void *cls)
2603 struct EphemeralCacheEntry *ece;
2606 ephemeral_task = NULL;
2607 while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
2609 if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
2611 free_ephemeral (ece);
2614 ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2623 * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
2624 * one, cache it and return it.
2626 * @param pid peer to look up ephemeral for
2627 * @param private_key[out] set to the private key
2628 * @param ephemeral_key[out] set to the key
2629 * @param ephemeral_sender_sig[out] set to the signature
2630 * @param ephemeral_validity[out] set to the validity expiration time
2633 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
2634 struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
2635 struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
2636 struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
2637 struct GNUNET_TIME_Absolute *ephemeral_validity)
2639 struct EphemeralCacheEntry *ece;
2640 struct EphemeralConfirmation ec;
2642 ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
2644 if ( (NULL != ece) &&
2645 (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
2647 free_ephemeral (ece);
2652 ece = GNUNET_new (struct EphemeralCacheEntry);
2654 ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
2655 EPHEMERAL_VALIDITY);
2656 GNUNET_assert (GNUNET_OK ==
2657 GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
2658 GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
2659 &ece->ephemeral_key);
2660 ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
2661 ec.purpose.size = htonl (sizeof (ec));
2663 ec.ephemeral_key = ece->ephemeral_key;
2664 GNUNET_assert (GNUNET_OK ==
2665 GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
2668 ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
2670 ece->ephemeral_validity.abs_value_us);
2671 GNUNET_assert (GNUNET_OK ==
2672 GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
2675 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2676 if (NULL == ephemeral_task)
2677 ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2681 *private_key = ece->private_key;
2682 *ephemeral_key = ece->ephemeral_key;
2683 *ephemeral_sender_sig = ece->sender_sig;
2684 *ephemeral_validity = ece->ephemeral_validity;
2689 * We need to transmit @a hdr to @a target. If necessary, this may
2690 * involve DV routing or even broadcasting and fragmentation.
2692 * @param target peer to receive @a hdr
2693 * @param hdr header of the message to route
2696 route_message (const struct GNUNET_PeerIdentity *target,
2697 struct GNUNET_MessageHeader *hdr)
2699 // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
2705 * Communicator requests backchannel transmission. Process the request.
2707 * @param cls the client
2708 * @param cb the send message that was sent
2711 handle_communicator_backchannel (void *cls,
2712 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2714 struct TransportClient *tc = cls;
2715 struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
2716 struct GNUNET_TIME_Absolute ephemeral_validity;
2717 struct TransportBackchannelEncapsulationMessage *enc;
2718 struct TransportBackchannelRequestPayload ppay;
2722 /* encapsulate and encrypt message */
2723 msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
2724 enc = GNUNET_malloc (sizeof (*enc) + msize);
2725 enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
2726 enc->header.size = htons (sizeof (*enc) + msize);
2727 enc->target = cb->pid;
2728 lookup_ephemeral (&cb->pid,
2730 &enc->ephemeral_key,
2732 &ephemeral_validity);
2733 // FIXME: setup 'iv'
2735 dh_key_derive (&private_key,
2740 ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
2741 ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
2742 mpos = (char *) &enc[1];
2751 ntohs (cb->header.size) - sizeof (*cb));
2755 route_message (&cb->pid,
2757 GNUNET_SERVICE_client_continue (tc->client);
2762 * Address of our peer added. Test message is well-formed.
2764 * @param cls the client
2765 * @param aam the send message that was sent
2766 * @return #GNUNET_OK if message is well-formed
2769 check_add_address (void *cls,
2770 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2772 struct TransportClient *tc = cls;
2774 if (CT_COMMUNICATOR != tc->type)
2777 return GNUNET_SYSERR;
2779 GNUNET_MQ_check_zero_termination (aam);
2785 * Ask peerstore to store our address.
2787 * @param cls an `struct AddressListEntry *`
2790 store_pi (void *cls);
2794 * Function called when peerstore is done storing our address.
2797 peerstore_store_cb (void *cls,
2800 struct AddressListEntry *ale = cls;
2803 if (GNUNET_YES != success)
2804 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2805 "Failed to store our own address `%s' in peerstore!\n",
2807 /* refresh period is 1/4 of expiration time, that should be plenty
2808 without being excessive. */
2809 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
2817 * Ask peerstore to store our address.
2819 * @param cls an `struct AddressListEntry *`
2822 store_pi (void *cls)
2824 struct AddressListEntry *ale = cls;
2827 struct GNUNET_TIME_Absolute expiration;
2830 expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
2831 GNUNET_HELLO_sign_address (ale->address,
2837 ale->sc = GNUNET_PEERSTORE_store (peerstore,
2840 GNUNET_HELLO_PEERSTORE_KEY,
2844 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
2845 &peerstore_store_cb,
2848 if (NULL == ale->sc)
2850 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2851 "Failed to store our address `%s' with peerstore\n",
2853 ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
2861 * Address of our peer added. Process the request.
2863 * @param cls the client
2864 * @param aam the send message that was sent
2867 handle_add_address (void *cls,
2868 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2870 struct TransportClient *tc = cls;
2871 struct AddressListEntry *ale;
2874 slen = ntohs (aam->header.size) - sizeof (*aam);
2875 ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
2877 ale->address = (const char *) &ale[1];
2878 ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
2879 ale->aid = aam->aid;
2880 ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
2884 GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
2885 tc->details.communicator.addr_tail,
2887 ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
2889 GNUNET_SERVICE_client_continue (tc->client);
2894 * Address of our peer deleted. Process the request.
2896 * @param cls the client
2897 * @param dam the send message that was sent
2900 handle_del_address (void *cls,
2901 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
2903 struct TransportClient *tc = cls;
2905 if (CT_COMMUNICATOR != tc->type)
2908 GNUNET_SERVICE_client_drop (tc->client);
2911 for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
2915 if (dam->aid != ale->aid)
2917 GNUNET_assert (ale->tc == tc);
2918 free_address_list_entry (ale);
2919 GNUNET_SERVICE_client_continue (tc->client);
2922 GNUNET_SERVICE_client_drop (tc->client);
2927 * Context from #handle_incoming_msg(). Closure for many
2928 * message handlers below.
2930 struct CommunicatorMessageContext
2933 * Which communicator provided us with the message.
2935 struct TransportClient *tc;
2938 * Additional information for flow control and about the sender.
2940 struct GNUNET_TRANSPORT_IncomingMessage im;
2943 * Number of hops the message has travelled (if DV-routed).
2944 * FIXME: make use of this in ACK handling!
2946 uint16_t total_hops;
2951 * Given an inbound message @a msg from a communicator @a cmc,
2952 * demultiplex it based on the type calling the right handler.
2954 * @param cmc context for demultiplexing
2955 * @param msg message to demultiplex
2958 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
2959 const struct GNUNET_MessageHeader *msg);
2963 * Send ACK to communicator (if requested) and free @a cmc.
2965 * @param cmc context for which we are done handling the message
2968 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
2970 if (0 != ntohl (cmc->im.fc_on))
2972 /* send ACK when done to communicator for flow control! */
2973 struct GNUNET_MQ_Envelope *env;
2974 struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
2976 env = GNUNET_MQ_msg (ack,
2977 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
2978 ack->reserved = htonl (0);
2979 ack->fc_id = cmc->im.fc_id;
2980 ack->sender = cmc->im.sender;
2981 GNUNET_MQ_send (cmc->tc->mq,
2984 GNUNET_SERVICE_client_continue (cmc->tc->client);
2990 * Communicator gave us an unencapsulated message to pass as-is to
2991 * CORE. Process the request.
2993 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2994 * @param mh the message that was received
2997 handle_raw_message (void *cls,
2998 const struct GNUNET_MessageHeader *mh)
3000 struct CommunicatorMessageContext *cmc = cls;
3001 uint16_t size = ntohs (mh->size);
3003 if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
3004 (size < sizeof (struct GNUNET_MessageHeader)) )
3006 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3009 finish_cmc_handling (cmc);
3010 GNUNET_SERVICE_client_drop (client);
3013 /* Forward to all CORE clients */
3014 for (struct TransportClient *tc = clients_head;
3018 struct GNUNET_MQ_Envelope *env;
3019 struct InboundMessage *im;
3021 if (CT_CORE != tc->type)
3023 env = GNUNET_MQ_msg_extra (im,
3025 GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
3026 im->peer = cmc->im.sender;
3030 GNUNET_MQ_send (tc->mq,
3033 /* FIXME: consider doing this _only_ once the message
3034 was drained from the CORE MQs to extend flow control to CORE!
3035 (basically, increment counter in cmc, decrement on MQ send continuation! */
3036 finish_cmc_handling (cmc);
3041 * Communicator gave us a fragment box. Check the message.
3043 * @param cls a `struct CommunicatorMessageContext`
3044 * @param fb the send message that was sent
3045 * @return #GNUNET_YES if message is well-formed
3048 check_fragment_box (void *cls,
3049 const struct TransportFragmentBox *fb)
3051 uint16_t size = ntohs (fb->header.size);
3052 uint16_t bsize = size - sizeof (*fb);
3056 GNUNET_break_op (0);
3057 return GNUNET_SYSERR;
3059 if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
3061 GNUNET_break_op (0);
3062 return GNUNET_SYSERR;
3064 if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
3066 GNUNET_break_op (0);
3067 return GNUNET_SYSERR;
3074 * Generate a fragment acknowledgement for an @a rc.
3076 * @param rc context to generate ACK for, @a rc ACK state is reset
3079 send_fragment_ack (struct ReassemblyContext *rc)
3081 struct TransportFragmentAckMessage *ack;
3083 ack = GNUNET_new (struct TransportFragmentAckMessage);
3084 ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3085 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3086 ack->frag_uuid = htonl (rc->frag_uuid);
3087 ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3088 ack->msg_uuid = rc->msg_uuid;
3089 ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3090 if (0 == rc->msg_missing)
3091 ack->reassembly_timeout
3092 = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3094 ack->reassembly_timeout
3095 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3096 route_message (&rc->neighbour->pid,
3098 rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3100 rc->extra_acks = 0LLU;
3105 * Communicator gave us a fragment. Process the request.
3107 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3108 * @param fb the message that was received
3111 handle_fragment_box (void *cls,
3112 const struct TransportFragmentBox *fb)
3114 struct CommunicatorMessageContext *cmc = cls;
3115 struct Neighbour *n;
3116 struct ReassemblyContext *rc;
3117 const struct GNUNET_MessageHeader *msg;
3123 struct GNUNET_TIME_Relative cdelay;
3126 n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3130 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3133 finish_cmc_handling (cmc);
3134 GNUNET_SERVICE_client_drop (client);
3137 if (NULL == n->reassembly_map)
3139 n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
3141 n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3142 n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
3143 &reassembly_cleanup_task,
3146 msize = ntohs (fb->msg_size);
3147 rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
3151 rc = GNUNET_malloc (sizeof (*rc) +
3152 msize + /* reassembly payload buffer */
3153 (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
3154 rc->msg_uuid = fb->msg_uuid;
3156 rc->msg_size = msize;
3157 rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
3158 rc->last_frag = GNUNET_TIME_absolute_get ();
3159 rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
3161 rc->reassembly_timeout.abs_value_us);
3162 GNUNET_assert (GNUNET_OK ==
3163 GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
3166 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3167 target = (char *) &rc[1];
3168 rc->bitfield = (uint8_t *) (target + rc->msg_size);
3169 rc->msg_missing = rc->msg_size;
3173 target = (char *) &rc[1];
3175 if (msize != rc->msg_size)
3178 finish_cmc_handling (cmc);
3183 fsize = ntohs (fb->header.size) - sizeof (*fb);
3184 frag_off = ntohs (fb->frag_off);
3185 memcpy (&target[frag_off],
3188 /* update bitfield and msg_missing */
3189 for (unsigned int i=frag_off;i<frag_off+fsize;i++)
3191 if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
3193 rc->bitfield[i / 8] |= (1 << (i % 8));
3198 /* Compute cummulative ACK */
3199 frag_uuid = ntohl (fb->frag_uuid);
3200 cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
3201 cdelay = GNUNET_TIME_relative_multiply (cdelay,
3203 rc->last_frag = GNUNET_TIME_absolute_get ();
3204 rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
3206 ack_now = GNUNET_NO;
3207 if (0 == rc->num_acks)
3209 /* case one: first ack */
3210 rc->frag_uuid = frag_uuid;
3211 rc->extra_acks = 0LLU;
3214 else if ( (frag_uuid >= rc->frag_uuid) &&
3215 (frag_uuid <= rc->frag_uuid + 64) )
3217 /* case two: ack fits after existing min UUID */
3218 if ( (frag_uuid == rc->frag_uuid) ||
3219 (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
3221 /* duplicate fragment, ack now! */
3222 ack_now = GNUNET_YES;
3226 rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
3230 else if ( (rc->frag_uuid > frag_uuid) &&
3231 ( ( (rc->frag_uuid == frag_uuid + 64) &&
3232 (0 == rc->extra_acks) ) ||
3233 ( (rc->frag_uuid < frag_uuid + 64) &&
3234 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
3236 /* can fit ack by shifting extra acks and starting at
3237 frag_uid, test above esured that the bits we will
3238 shift 'extra_acks' by are all zero. */
3239 rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
3240 rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
3241 rc->frag_uuid = frag_uuid;
3244 if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
3245 ack_now = GNUNET_YES; /* maximum acks received */
3246 // FIXME: possibly also ACK based on RTT (but for that we'd need to
3247 // determine the queue used for the ACK first!)
3249 /* is reassembly complete? */
3250 if (0 != rc->msg_missing)
3253 send_fragment_ack (rc);
3254 finish_cmc_handling (cmc);
3257 /* reassembly is complete, verify result */
3258 msg = (const struct GNUNET_MessageHeader *) &rc[1];
3259 if (ntohs (msg->size) != rc->msg_size)
3262 free_reassembly_context (rc);
3263 finish_cmc_handling (cmc);
3266 /* successful reassembly */
3267 send_fragment_ack (rc);
3268 demultiplex_with_cmc (cmc,
3270 /* FIXME: really free here? Might be bad if fragments are still
3271 en-route and we forget that we finished this reassembly immediately!
3272 -> keep around until timeout?
3273 -> shorten timeout based on ACK? */
3274 free_reassembly_context (rc);
3279 * Communicator gave us a fragment acknowledgement. Process the request.
3281 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3282 * @param fa the message that was received
3285 handle_fragment_ack (void *cls,
3286 const struct TransportFragmentAckMessage *fa)
3288 struct CommunicatorMessageContext *cmc = cls;
3290 // FIXME: do work: identify original message; then identify fragments being acked;
3291 // remove those from the tree to prevent retransmission;
3293 // if entire message is ACKed, handle that as well.
3294 finish_cmc_handling (cmc);
3299 * Communicator gave us a reliability box. Check the message.
3301 * @param cls a `struct CommunicatorMessageContext`
3302 * @param rb the send message that was sent
3303 * @return #GNUNET_YES if message is well-formed
3306 check_reliability_box (void *cls,
3307 const struct TransportReliabilityBox *rb)
3309 GNUNET_MQ_check_boxed_message (rb);
3315 * Communicator gave us a reliability box. Process the request.
3317 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3318 * @param rb the message that was received
3321 handle_reliability_box (void *cls,
3322 const struct TransportReliabilityBox *rb)
3324 struct CommunicatorMessageContext *cmc = cls;
3325 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
3327 if (0 == ntohl (rb->ack_countdown))
3329 struct TransportReliabilityAckMessage *ack;
3331 /* FIXME: implement cummulative ACKs and ack_countdown,
3332 then setting the avg_ack_delay field below: */
3333 ack = GNUNET_malloc (sizeof (*ack) +
3334 sizeof (struct GNUNET_ShortHashCode));
3335 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
3336 ack->header.size = htons (sizeof (*ack) +
3337 sizeof (struct GNUNET_ShortHashCode));
3340 sizeof (struct GNUNET_ShortHashCode));
3341 route_message (&cmc->im.sender,
3344 /* continue with inner message */
3345 demultiplex_with_cmc (cmc,
3351 * Communicator gave us a reliability ack. Process the request.
3353 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3354 * @param ra the message that was received
3357 handle_reliability_ack (void *cls,
3358 const struct TransportReliabilityAckMessage *ra)
3360 struct CommunicatorMessageContext *cmc = cls;
3362 // FIXME: do work: find message that was acknowledged, and
3363 // remove from transmission queue; update RTT.
3364 finish_cmc_handling (cmc);
3369 * Communicator gave us a backchannel encapsulation. Check the message.
3371 * @param cls a `struct CommunicatorMessageContext`
3372 * @param be the send message that was sent
3373 * @return #GNUNET_YES if message is well-formed
3376 check_backchannel_encapsulation (void *cls,
3377 const struct TransportBackchannelEncapsulationMessage *be)
3379 uint16_t size = ntohs (be->header.size);
3381 if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
3383 GNUNET_break_op (0);
3384 return GNUNET_SYSERR;
3391 * Communicator gave us a backchannel encapsulation. Process the request.
3393 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3394 * @param be the message that was received
3397 handle_backchannel_encapsulation (void *cls,
3398 const struct TransportBackchannelEncapsulationMessage *be)
3400 struct CommunicatorMessageContext *cmc = cls;
3402 if (0 != memcmp (&be->target,
3404 sizeof (struct GNUNET_PeerIdentity)))
3406 /* not for me, try to route to target */
3407 route_message (&be->target,
3408 GNUNET_copy_message (&be->header));
3409 finish_cmc_handling (cmc);
3412 // FIXME: compute shared secret
3413 // FIXME: check HMAC
3414 // FIXME: decrypt payload
3415 // FIXME: forward to specified communicator!
3416 // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
3417 finish_cmc_handling (cmc);
3422 * Communicator gave us a DV learn message. Check the message.
3424 * @param cls a `struct CommunicatorMessageContext`
3425 * @param dvl the send message that was sent
3426 * @return #GNUNET_YES if message is well-formed
3429 check_dv_learn (void *cls,
3430 const struct TransportDVLearn *dvl)
3432 uint16_t size = ntohs (dvl->header.size);
3433 uint16_t num_hops = ntohs (dvl->num_hops);
3434 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvl[1];
3436 if (size != sizeof (*dvl) + num_hops * sizeof (struct GNUNET_PeerIdentity))
3438 GNUNET_break_op (0);
3439 return GNUNET_SYSERR;
3441 for (unsigned int i=0;i<num_hops;i++)
3443 if (0 == memcmp (&dvl->initiator,
3445 sizeof (struct GNUNET_PeerIdentity)))
3447 GNUNET_break_op (0);
3448 return GNUNET_SYSERR;
3450 if (0 == memcmp (&GST_my_identity,
3452 sizeof (struct GNUNET_PeerIdentity)))
3454 GNUNET_break_op (0);
3455 return GNUNET_SYSERR;
3463 * Communicator gave us a DV learn message. Process the request.
3465 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3466 * @param dvl the message that was received
3469 handle_dv_learn (void *cls,
3470 const struct TransportDVLearn *dvl)
3472 struct CommunicatorMessageContext *cmc = cls;
3474 // FIXME: learn path from DV message (if bi-directional flags are set)
3475 // FIXME: expand DV message, forward on (unless path is getting too long)
3476 finish_cmc_handling (cmc);
3481 * Communicator gave us a DV box. Check the message.
3483 * @param cls a `struct CommunicatorMessageContext`
3484 * @param dvb the send message that was sent
3485 * @return #GNUNET_YES if message is well-formed
3488 check_dv_box (void *cls,
3489 const struct TransportDVBox *dvb)
3491 uint16_t size = ntohs (dvb->header.size);
3492 uint16_t num_hops = ntohs (dvb->num_hops);
3493 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3494 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3498 if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
3500 GNUNET_break_op (0);
3501 return GNUNET_SYSERR;
3503 isize = ntohs (inbox->size);
3504 if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
3506 GNUNET_break_op (0);
3507 return GNUNET_SYSERR;
3509 itype = ntohs (inbox->type);
3510 if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
3511 (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
3513 GNUNET_break_op (0);
3514 return GNUNET_SYSERR;
3521 * Communicator gave us a DV box. Process the request.
3523 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3524 * @param dvb the message that was received
3527 handle_dv_box (void *cls,
3528 const struct TransportDVBox *dvb)
3530 struct CommunicatorMessageContext *cmc = cls;
3531 uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
3532 uint16_t num_hops = ntohs (dvb->num_hops);
3533 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3534 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3538 // FIXME: if we are not the target, shorten path and forward along.
3539 // Try from the _end_ of hops array if we know the given
3540 // neighbour (shortening the path!).
3541 // NOTE: increment total_hops!
3542 finish_cmc_handling (cmc);
3545 /* We are the target. Unbox and handle message. */
3546 cmc->im.sender = dvb->origin;
3547 cmc->total_hops = ntohs (dvb->total_hops);
3548 demultiplex_with_cmc (cmc,
3554 * Client notified us about transmission from a peer. Process the request.
3556 * @param cls a `struct TransportClient` which sent us the message
3557 * @param obm the send message that was sent
3558 * @return #GNUNET_YES if message is well-formed
3561 check_incoming_msg (void *cls,
3562 const struct GNUNET_TRANSPORT_IncomingMessage *im)
3564 struct TransportClient *tc = cls;
3566 if (CT_COMMUNICATOR != tc->type)
3569 return GNUNET_SYSERR;
3571 GNUNET_MQ_check_boxed_message (im);
3577 * Incoming meessage. Process the request.
3579 * @param im the send message that was received
3582 handle_incoming_msg (void *cls,
3583 const struct GNUNET_TRANSPORT_IncomingMessage *im)
3585 struct TransportClient *tc = cls;
3586 struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
3590 demultiplex_with_cmc (cmc,
3591 (const struct GNUNET_MessageHeader *) &im[1]);
3596 * Given an inbound message @a msg from a communicator @a cmc,
3597 * demultiplex it based on the type calling the right handler.
3599 * @param cmc context for demultiplexing
3600 * @param msg message to demultiplex
3603 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3604 const struct GNUNET_MessageHeader *msg)
3606 struct GNUNET_MQ_MessageHandler handlers[] = {
3607 GNUNET_MQ_hd_var_size (fragment_box,
3608 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
3609 struct TransportFragmentBox,
3611 GNUNET_MQ_hd_fixed_size (fragment_ack,
3612 GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
3613 struct TransportFragmentAckMessage,
3615 GNUNET_MQ_hd_var_size (reliability_box,
3616 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
3617 struct TransportReliabilityBox,
3619 GNUNET_MQ_hd_fixed_size (reliability_ack,
3620 GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
3621 struct TransportReliabilityAckMessage,
3623 GNUNET_MQ_hd_var_size (backchannel_encapsulation,
3624 GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
3625 struct TransportBackchannelEncapsulationMessage,
3627 GNUNET_MQ_hd_var_size (dv_learn,
3628 GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
3629 struct TransportDVLearn,
3631 GNUNET_MQ_hd_var_size (dv_box,
3632 GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
3633 struct TransportDVBox,
3635 GNUNET_MQ_handler_end()
3639 ret = GNUNET_MQ_handle_message (handlers,
3641 if (GNUNET_SYSERR == ret)
3644 GNUNET_SERVICE_client_drop (cmc->tc->client);
3648 if (GNUNET_NO == ret)
3650 /* unencapsulated 'raw' message */
3651 handle_raw_message (&cmc,
3658 * New queue became available. Check message.
3660 * @param cls the client
3661 * @param aqm the send message that was sent
3664 check_add_queue_message (void *cls,
3665 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
3667 struct TransportClient *tc = cls;
3669 if (CT_COMMUNICATOR != tc->type)
3672 return GNUNET_SYSERR;
3674 GNUNET_MQ_check_zero_termination (aqm);
3680 * Bandwidth tracker informs us that the delay until we should receive
3683 * @param cls a `struct Queue` for which the delay changed
3686 tracker_update_in_cb (void *cls)
3688 struct Queue *queue = cls;
3689 struct GNUNET_TIME_Relative in_delay;
3692 rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
3693 in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
3695 // FIXME: how exactly do we do inbound flow control?
3700 * If necessary, generates the UUID for a @a pm
3702 * @param pm pending message to generate UUID for.
3705 set_pending_message_uuid (struct PendingMessage *pm)
3707 if (pm->msg_uuid_set)
3709 GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
3711 sizeof (pm->msg_uuid));
3712 pm->msg_uuid_set = GNUNET_YES;
3717 * Fragment the given @a pm to the given @a mtu. Adds
3718 * additional fragments to the neighbour as well. If the
3719 * @a mtu is too small, generates and error for the @a pm
3722 * @param pm pending message to fragment for transmission
3723 * @param mtu MTU to apply
3724 * @return new message to transmit
3726 static struct PendingMessage *
3727 fragment_message (struct PendingMessage *pm,
3730 struct PendingMessage *ff;
3732 set_pending_message_uuid (pm);
3734 /* This invariant is established in #handle_add_queue_message() */
3735 GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
3737 /* select fragment for transmission, descending the tree if it has
3738 been expanded until we are at a leaf or at a fragment that is small enough */
3740 while ( ( (ff->bytes_msg > mtu) ||
3742 (ff->frag_off == ff->bytes_msg) &&
3743 (NULL != ff->head_frag) )
3745 ff = ff->head_frag; /* descent into fragmented fragments */
3748 if ( ( (ff->bytes_msg > mtu) ||
3750 (pm->frag_off < pm->bytes_msg) )
3752 /* Did not yet calculate all fragments, calculate next fragment */
3753 struct PendingMessage *frag;
3754 struct TransportFragmentBox tfb;
3762 orig = (const char *) &ff[1];
3763 msize = ff->bytes_msg;
3766 const struct TransportFragmentBox *tfbo;
3768 tfbo = (const struct TransportFragmentBox *) orig;
3769 orig += sizeof (struct TransportFragmentBox);
3770 msize -= sizeof (struct TransportFragmentBox);
3771 xoff = ntohs (tfbo->frag_off);
3773 fragmax = mtu - sizeof (struct TransportFragmentBox);
3774 fragsize = GNUNET_MIN (msize - ff->frag_off,
3776 frag = GNUNET_malloc (sizeof (struct PendingMessage) +
3777 sizeof (struct TransportFragmentBox) +
3779 frag->target = pm->target;
3780 frag->frag_parent = ff;
3781 frag->timeout = pm->timeout;
3782 frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
3783 frag->pmt = PMT_FRAGMENT_BOX;
3784 msg = (char *) &frag[1];
3785 tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
3786 tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
3788 tfb.frag_uuid = htonl (pm->frag_uuidgen++);
3789 tfb.msg_uuid = pm->msg_uuid;
3790 tfb.frag_off = htons (ff->frag_off + xoff);
3791 tfb.msg_size = htons (pm->bytes_msg);
3795 memcpy (&msg[sizeof (tfb)],
3796 &orig[ff->frag_off],
3798 GNUNET_CONTAINER_MDLL_insert (frag,
3802 ff->frag_off += fragsize;
3806 /* Move head to the tail and return it */
3807 GNUNET_CONTAINER_MDLL_remove (frag,
3808 ff->frag_parent->head_frag,
3809 ff->frag_parent->tail_frag,
3811 GNUNET_CONTAINER_MDLL_insert_tail (frag,
3812 ff->frag_parent->head_frag,
3813 ff->frag_parent->tail_frag,
3820 * Reliability-box the given @a pm. On error (can there be any), NULL
3821 * may be returned, otherwise the "replacement" for @a pm (which
3822 * should then be added to the respective neighbour's queue instead of
3823 * @a pm). If the @a pm is already fragmented or reliability boxed,
3824 * or itself an ACK, this function simply returns @a pm.
3826 * @param pm pending message to box for transmission over unreliabile queue
3827 * @return new message to transmit
3829 static struct PendingMessage *
3830 reliability_box_message (struct PendingMessage *pm)
3832 struct TransportReliabilityBox rbox;
3833 struct PendingMessage *bpm;
3836 if (PMT_CORE != pm->pmt)
3837 return pm; /* already fragmented or reliability boxed, or control message: do nothing */
3838 if (NULL != pm->bpm)
3839 return pm->bpm; /* already computed earlier: do nothing */
3840 GNUNET_assert (NULL == pm->head_frag);
3841 if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
3845 client_send_response (pm,
3850 bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
3853 bpm->target = pm->target;
3854 bpm->frag_parent = pm;
3855 GNUNET_CONTAINER_MDLL_insert (frag,
3859 bpm->timeout = pm->timeout;
3860 bpm->pmt = PMT_RELIABILITY_BOX;
3861 bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
3862 set_pending_message_uuid (bpm);
3863 rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
3864 rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
3865 rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
3866 rbox.msg_uuid = pm->msg_uuid;
3867 msg = (char *) &bpm[1];
3871 memcpy (&msg[sizeof (rbox)],
3880 * We believe we are ready to transmit a message on a queue. Double-checks
3881 * with the queue's "tracker_out" and then gives the message to the
3882 * communicator for transmission (updating the tracker, and re-scheduling
3883 * itself if applicable).
3885 * @param cls the `struct Queue` to process transmissions for
3888 transmit_on_queue (void *cls)
3890 struct Queue *queue = cls;
3891 struct Neighbour *n = queue->neighbour;
3892 struct QueueEntry *qe;
3893 struct PendingMessage *pm;
3894 struct PendingMessage *s;
3896 struct GNUNET_TRANSPORT_SendMessageTo *smt;
3897 struct GNUNET_MQ_Envelope *env;
3899 queue->transmit_task = NULL;
3900 if (NULL == (pm = n->pending_msg_head))
3902 /* no message pending, nothing to do here! */
3905 schedule_transmit_on_queue (queue);
3906 if (NULL != queue->transmit_task)
3907 return; /* do it later */
3909 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3910 overhead += sizeof (struct TransportReliabilityBox);
3912 if ( ( (0 != queue->mtu) &&
3913 (pm->bytes_msg + overhead > queue->mtu) ) ||
3914 (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
3915 (NULL != pm->head_frag /* fragments already exist, should
3916 respect that even if MTU is 0 for
3918 s = fragment_message (s,
3920 ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
3924 /* Fragmentation failed, try next message... */
3925 schedule_transmit_on_queue (queue);
3928 if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3929 s = reliability_box_message (s);
3932 /* Reliability boxing failed, try next message... */
3933 schedule_transmit_on_queue (queue);
3937 /* Pass 's' for transission to the communicator */
3938 qe = GNUNET_new (struct QueueEntry);
3939 qe->mid = queue->mid_gen++;
3941 // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
3942 GNUNET_CONTAINER_DLL_insert (queue->queue_head,
3945 env = GNUNET_MQ_msg_extra (smt,
3947 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
3948 smt->qid = queue->qid;
3950 smt->receiver = n->pid;
3954 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3955 queue->queue_length++;
3956 queue->tc->details.communicator.total_queue_length++;
3957 GNUNET_MQ_send (queue->tc->mq,
3960 // FIXME: do something similar to the logic below
3961 // in defragmentation / reliability ACK handling!
3963 /* Check if this transmission somehow conclusively finished handing 'pm'
3964 even without any explicit ACKs */
3965 if ( (PMT_CORE == s->pmt) &&
3966 (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
3968 /* Full message sent, and over reliabile channel */
3969 client_send_response (pm,
3973 else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
3974 (PMT_FRAGMENT_BOX == s->pmt) )
3976 struct PendingMessage *pos;
3978 /* Fragment sent over reliabile channel */
3979 free_fragment_tree (s);
3980 pos = s->frag_parent;
3981 GNUNET_CONTAINER_MDLL_remove (frag,
3986 /* check if subtree is done */
3987 while ( (NULL == pos->head_frag) &&
3988 (pos->frag_off == pos->bytes_msg) &&
3992 pos = s->frag_parent;
3993 GNUNET_CONTAINER_MDLL_remove (frag,
4000 /* Was this the last applicable fragmment? */
4001 if ( (NULL == pm->head_frag) &&
4002 (pm->frag_off == pm->bytes_msg) )
4003 client_send_response (pm,
4005 pm->bytes_msg /* FIXME: calculate and add overheads! */);
4007 else if (PMT_CORE != pm->pmt)
4009 /* This was an acknowledgement of some type, always free */
4010 free_pending_message (pm);
4014 /* message not finished, waiting for acknowledgement */
4015 struct Neighbour *neighbour = pm->target;
4016 /* Update time by which we might retransmit 's' based on queue
4017 characteristics (i.e. RTT); it takes one RTT for the message to
4018 arrive and the ACK to come back in the best case; but the other
4019 side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
4020 retransmitting. Note that in the future this heuristic should
4021 likely be improved further (measure RTT stability, consider
4022 message urgency and size when delaying ACKs, etc.) */
4023 s->next_attempt = GNUNET_TIME_relative_to_absolute
4024 (GNUNET_TIME_relative_multiply (queue->rtt,
4028 struct PendingMessage *pos;
4030 /* re-insert sort in neighbour list */
4031 GNUNET_CONTAINER_MDLL_remove (neighbour,
4032 neighbour->pending_msg_head,
4033 neighbour->pending_msg_tail,
4035 pos = neighbour->pending_msg_tail;
4036 while ( (NULL != pos) &&
4037 (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
4038 pos = pos->prev_neighbour;
4039 GNUNET_CONTAINER_MDLL_insert_after (neighbour,
4040 neighbour->pending_msg_head,
4041 neighbour->pending_msg_tail,
4047 /* re-insert sort in fragment list */
4048 struct PendingMessage *fp = s->frag_parent;
4049 struct PendingMessage *pos;
4051 GNUNET_CONTAINER_MDLL_remove (frag,
4055 pos = fp->tail_frag;
4056 while ( (NULL != pos) &&
4057 (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
4058 pos = pos->prev_frag;
4059 GNUNET_CONTAINER_MDLL_insert_after (frag,
4067 /* finally, re-schedule queue transmission task itself */
4068 schedule_transmit_on_queue (queue);
4073 * Bandwidth tracker informs us that the delay until we
4074 * can transmit again changed.
4076 * @param cls a `struct Queue` for which the delay changed
4079 tracker_update_out_cb (void *cls)
4081 struct Queue *queue = cls;
4082 struct Neighbour *n = queue->neighbour;
4084 if (NULL == n->pending_msg_head)
4086 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4087 "Bandwidth allocation updated for empty transmission queue `%s'\n",
4089 return; /* no message pending, nothing to do here! */
4091 GNUNET_SCHEDULER_cancel (queue->transmit_task);
4092 queue->transmit_task = NULL;
4093 schedule_transmit_on_queue (queue);
4098 * Bandwidth tracker informs us that excessive outbound bandwidth was
4099 * allocated which is not being used.
4101 * @param cls a `struct Queue` for which the excess was noted
4104 tracker_excess_out_cb (void *cls)
4106 /* FIXME: trigger excess bandwidth report to core? Right now,
4107 this is done internally within transport_api2_core already,
4108 but we probably want to change the logic and trigger it
4109 from here via a message instead! */
4110 /* TODO: maybe inform someone at this point? */
4111 GNUNET_STATISTICS_update (GST_stats,
4112 "# Excess outbound bandwidth reported",
4120 * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
4121 * which is not being used.
4123 * @param cls a `struct Queue` for which the excess was noted
4126 tracker_excess_in_cb (void *cls)
4128 /* TODO: maybe inform somone at this point? */
4129 GNUNET_STATISTICS_update (GST_stats,
4130 "# Excess inbound bandwidth reported",
4137 * New queue became available. Process the request.
4139 * @param cls the client
4140 * @param aqm the send message that was sent
4143 handle_add_queue_message (void *cls,
4144 const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
4146 struct TransportClient *tc = cls;
4147 struct Queue *queue;
4148 struct Neighbour *neighbour;
4152 if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
4154 /* MTU so small as to be useless for transmissions,
4155 required for #fragment_message()! */
4156 GNUNET_break_op (0);
4157 GNUNET_SERVICE_client_drop (tc->client);
4160 neighbour = lookup_neighbour (&aqm->receiver);
4161 if (NULL == neighbour)
4163 neighbour = GNUNET_new (struct Neighbour);
4164 neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
4165 neighbour->pid = aqm->receiver;
4166 GNUNET_assert (GNUNET_OK ==
4167 GNUNET_CONTAINER_multipeermap_put (neighbours,
4170 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4171 cores_send_connect_info (&neighbour->pid,
4172 GNUNET_BANDWIDTH_ZERO);
4174 addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
4175 addr = (const char *) &aqm[1];
4177 queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
4179 queue->address = (const char *) &queue[1];
4180 queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
4181 queue->qid = aqm->qid;
4182 queue->mtu = ntohl (aqm->mtu);
4183 queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
4184 queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
4185 queue->neighbour = neighbour;
4186 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
4187 &tracker_update_in_cb,
4189 GNUNET_BANDWIDTH_ZERO,
4190 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4191 &tracker_excess_in_cb,
4193 GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
4194 &tracker_update_out_cb,
4196 GNUNET_BANDWIDTH_ZERO,
4197 GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4198 &tracker_excess_out_cb,
4203 /* notify monitors about new queue */
4205 struct MonitorEvent me = {
4210 notify_monitors (&neighbour->pid,
4215 GNUNET_CONTAINER_MDLL_insert (neighbour,
4216 neighbour->queue_head,
4217 neighbour->queue_tail,
4219 GNUNET_CONTAINER_MDLL_insert (client,
4220 tc->details.communicator.queue_head,
4221 tc->details.communicator.queue_tail,
4223 GNUNET_SERVICE_client_continue (tc->client);
4228 * Queue to a peer went down. Process the request.
4230 * @param cls the client
4231 * @param dqm the send message that was sent
4234 handle_del_queue_message (void *cls,
4235 const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
4237 struct TransportClient *tc = cls;
4239 if (CT_COMMUNICATOR != tc->type)
4242 GNUNET_SERVICE_client_drop (tc->client);
4245 for (struct Queue *queue = tc->details.communicator.queue_head;
4247 queue = queue->next_client)
4249 struct Neighbour *neighbour = queue->neighbour;
4251 if ( (dqm->qid != queue->qid) ||
4252 (0 != memcmp (&dqm->receiver,
4254 sizeof (struct GNUNET_PeerIdentity))) )
4257 GNUNET_SERVICE_client_continue (tc->client);
4261 GNUNET_SERVICE_client_drop (tc->client);
4266 * Message was transmitted. Process the request.
4268 * @param cls the client
4269 * @param sma the send message that was sent
4272 handle_send_message_ack (void *cls,
4273 const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
4275 struct TransportClient *tc = cls;
4276 struct QueueEntry *qe;
4278 if (CT_COMMUNICATOR != tc->type)
4281 GNUNET_SERVICE_client_drop (tc->client);
4285 /* find our queue entry matching the ACK */
4287 for (struct Queue *queue = tc->details.communicator.queue_head;
4289 queue = queue->next_client)
4291 if (0 != memcmp (&queue->neighbour->pid,
4293 sizeof (struct GNUNET_PeerIdentity)))
4295 for (struct QueueEntry *qep = queue->queue_head;
4299 if (qep->mid != sma->mid)
4308 /* this should never happen */
4310 GNUNET_SERVICE_client_drop (tc->client);
4313 GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
4314 qe->queue->queue_tail,
4316 qe->queue->queue_length--;
4317 tc->details.communicator.total_queue_length--;
4318 GNUNET_SERVICE_client_continue (tc->client);
4320 /* if applicable, resume transmissions that waited on ACK */
4321 if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
4323 /* Communicator dropped below threshold, resume all queues */
4324 GNUNET_STATISTICS_update (GST_stats,
4325 "# Transmission throttled due to communicator queue limit",
4328 for (struct Queue *queue = tc->details.communicator.queue_head;
4330 queue = queue->next_client)
4331 schedule_transmit_on_queue (queue);
4333 else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
4335 /* queue dropped below threshold; only resume this one queue */
4336 GNUNET_STATISTICS_update (GST_stats,
4337 "# Transmission throttled due to queue queue limit",
4340 schedule_transmit_on_queue (qe->queue);
4343 /* TODO: we also should react on the status! */
4344 // FIXME: this probably requires queue->pm = s assignment!
4345 // FIXME: react to communicator status about transmission request. We got:
4346 sma->status; // OK success, SYSERR failure
4353 * Iterator telling new MONITOR client about all existing
4356 * @param cls the new `struct TransportClient`
4357 * @param pid a connected peer
4358 * @param value the `struct Neighbour` with more information
4359 * @return #GNUNET_OK (continue to iterate)
4362 notify_client_queues (void *cls,
4363 const struct GNUNET_PeerIdentity *pid,
4366 struct TransportClient *tc = cls;
4367 struct Neighbour *neighbour = value;
4369 GNUNET_assert (CT_MONITOR == tc->type);
4370 for (struct Queue *q = neighbour->queue_head;
4372 q = q->next_neighbour)
4374 struct MonitorEvent me = {
4377 .num_msg_pending = q->num_msg_pending,
4378 .num_bytes_pending = q->num_bytes_pending
4392 * Initialize a monitor client.
4394 * @param cls the client
4395 * @param start the start message that was sent
4398 handle_monitor_start (void *cls,
4399 const struct GNUNET_TRANSPORT_MonitorStart *start)
4401 struct TransportClient *tc = cls;
4403 if (CT_NONE != tc->type)
4406 GNUNET_SERVICE_client_drop (tc->client);
4409 tc->type = CT_MONITOR;
4410 tc->details.monitor.peer = start->peer;
4411 tc->details.monitor.one_shot = ntohl (start->one_shot);
4412 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4413 ¬ify_client_queues,
4415 GNUNET_SERVICE_client_mark_monitor (tc->client);
4416 GNUNET_SERVICE_client_continue (tc->client);
4421 * Find transport client providing communication service
4422 * for the protocol @a prefix.
4424 * @param prefix communicator name
4425 * @return NULL if no such transport client is available
4427 static struct TransportClient *
4428 lookup_communicator (const char *prefix)
4430 for (struct TransportClient *tc = clients_head;
4434 if (CT_COMMUNICATOR != tc->type)
4436 if (0 == strcmp (prefix,
4437 tc->details.communicator.address_prefix))
4440 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4441 "Somone suggested use of communicator for `%s', but we do not have such a communicator!\n",
4448 * Signature of a function called with a communicator @a address of a peer
4449 * @a pid that an application wants us to connect to.
4451 * @param pid target peer
4452 * @param address the address to try
4455 suggest_to_connect (const struct GNUNET_PeerIdentity *pid,
4456 const char *address)
4458 static uint32_t idgen;
4459 struct TransportClient *tc;
4461 struct GNUNET_TRANSPORT_CreateQueue *cqm;
4462 struct GNUNET_MQ_Envelope *env;
4465 prefix = GNUNET_HELLO_address_to_prefix (address);
4468 GNUNET_break (0); /* We got an invalid address!? */
4471 tc = lookup_communicator (prefix);
4474 GNUNET_STATISTICS_update (GST_stats,
4475 "# Suggestions ignored due to missing communicator",
4480 /* forward suggestion for queue creation to communicator */
4481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4482 "Request #%u for `%s' communicator to create queue to `%s'\n",
4483 (unsigned int) idgen,
4486 alen = strlen (address) + 1;
4487 env = GNUNET_MQ_msg_extra (cqm,
4489 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
4490 cqm->request_id = htonl (idgen++);
4491 cqm->receiver = *pid;
4495 GNUNET_MQ_send (tc->mq,
4501 * Communicator tells us that our request to create a queue "worked", that
4502 * is setting up the queue is now in process.
4504 * @param cls the `struct TransportClient`
4505 * @param cqr confirmation message
4508 handle_queue_create_ok (void *cls,
4509 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4511 struct TransportClient *tc = cls;
4513 if (CT_COMMUNICATOR != tc->type)
4516 GNUNET_SERVICE_client_drop (tc->client);
4519 GNUNET_STATISTICS_update (GST_stats,
4520 "# Suggestions succeeded at communicator",
4523 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4524 "Request #%u for communicator to create queue succeeded\n",
4525 (unsigned int) ntohs (cqr->request_id));
4526 GNUNET_SERVICE_client_continue (tc->client);
4531 * Communicator tells us that our request to create a queue failed. This usually
4532 * indicates that the provided address is simply invalid or that the communicator's
4533 * resources are exhausted.
4535 * @param cls the `struct TransportClient`
4536 * @param cqr failure message
4539 handle_queue_create_fail (void *cls,
4540 const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4542 struct TransportClient *tc = cls;
4544 if (CT_COMMUNICATOR != tc->type)
4547 GNUNET_SERVICE_client_drop (tc->client);
4550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4551 "Request #%u for communicator to create queue failed\n",
4552 (unsigned int) ntohs (cqr->request_id));
4553 GNUNET_STATISTICS_update (GST_stats,
4554 "# Suggestions failed in queue creation at communicator",
4557 GNUNET_SERVICE_client_continue (tc->client);
4562 * Function called by PEERSTORE for each matching record.
4564 * @param cls closure
4565 * @param record peerstore record information
4566 * @param emsg error message, or NULL if no errors
4569 handle_hello (void *cls,
4570 const struct GNUNET_PEERSTORE_Record *record,
4573 struct PeerRequest *pr = cls;
4578 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4579 "Got failure from PEERSTORE: %s\n",
4583 val = record->value;
4584 if ( (0 == record->value_size) ||
4585 ('\0' != val[record->value_size - 1]) )
4590 suggest_to_connect (&pr->pid,
4591 (const char *) record->value);
4596 * We have received a `struct ExpressPreferenceMessage` from an application client.
4598 * @param cls handle to the client
4599 * @param msg the start message
4602 handle_suggest (void *cls,
4603 const struct ExpressPreferenceMessage *msg)
4605 struct TransportClient *tc = cls;
4606 struct PeerRequest *pr;
4608 if (CT_NONE == tc->type)
4610 tc->type = CT_APPLICATION;
4611 tc->details.application.requests
4612 = GNUNET_CONTAINER_multipeermap_create (16,
4615 if (CT_APPLICATION != tc->type)
4618 GNUNET_SERVICE_client_drop (tc->client);
4621 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4622 "Client suggested we talk to %s with preference %d at rate %u\n",
4623 GNUNET_i2s (&msg->peer),
4624 (int) ntohl (msg->pk),
4625 (int) ntohl (msg->bw.value__));
4626 pr = GNUNET_new (struct PeerRequest);
4628 pr->pid = msg->peer;
4630 pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk);
4632 GNUNET_CONTAINER_multipeermap_put (tc->details.application.requests,
4635 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
4639 GNUNET_SERVICE_client_drop (tc->client);
4642 pr->wc = GNUNET_PEERSTORE_watch (peerstore,
4648 GNUNET_SERVICE_client_continue (tc->client);
4653 * We have received a `struct ExpressPreferenceMessage` from an application client.
4655 * @param cls handle to the client
4656 * @param msg the start message
4659 handle_suggest_cancel (void *cls,
4660 const struct ExpressPreferenceMessage *msg)
4662 struct TransportClient *tc = cls;
4663 struct PeerRequest *pr;
4665 if (CT_APPLICATION != tc->type)
4668 GNUNET_SERVICE_client_drop (tc->client);
4671 pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests,
4676 GNUNET_SERVICE_client_drop (tc->client);
4679 (void) stop_peer_request (tc,
4682 GNUNET_SERVICE_client_continue (tc->client);
4687 * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
4688 * messages. We do nothing here, real verification is done later.
4690 * @param cls a `struct TransportClient *`
4691 * @param msg message to verify
4692 * @return #GNUNET_OK
4695 check_address_consider_verify (void *cls,
4696 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
4705 * Given another peers address, consider checking it for validity
4706 * and then adding it to the Peerstore.
4708 * @param cls a `struct TransportClient`
4709 * @param hdr message containing the raw address data and
4710 * signature in the body, see #GNUNET_HELLO_extract_address()
4713 handle_address_consider_verify (void *cls,
4714 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
4717 enum GNUNET_NetworkType nt;
4718 struct GNUNET_TIME_Absolute expiration;
4721 // FIXME: pre-check: do we know this address already?
4722 // FIXME: pre-check: rate-limit signature verification / validation!
4723 address = GNUNET_HELLO_extract_address (&hdr[1],
4724 ntohs (hdr->header.size) - sizeof (*hdr),
4728 if (NULL == address)
4730 GNUNET_break_op (0);
4733 if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
4734 return; /* expired */
4735 // FIXME: do begin actual verification here!
4736 GNUNET_free (address);
4741 * Free neighbour entry.
4745 * @param value a `struct Neighbour`
4746 * @return #GNUNET_OK (always)
4749 free_neighbour_cb (void *cls,
4750 const struct GNUNET_PeerIdentity *pid,
4753 struct Neighbour *neighbour = value;
4757 GNUNET_break (0); // should this ever happen?
4758 free_neighbour (neighbour);
4765 * Free DV route entry.
4769 * @param value a `struct DistanceVector`
4770 * @return #GNUNET_OK (always)
4773 free_dv_routes_cb (void *cls,
4774 const struct GNUNET_PeerIdentity *pid,
4777 struct DistanceVector *dv = value;
4788 * Free ephemeral entry.
4792 * @param value a `struct Neighbour`
4793 * @return #GNUNET_OK (always)
4796 free_ephemeral_cb (void *cls,
4797 const struct GNUNET_PeerIdentity *pid,
4800 struct EphemeralCacheEntry *ece = value;
4804 free_ephemeral (ece);
4810 * Function called when the service shuts down. Unloads our plugins
4811 * and cancels pending validations.
4813 * @param cls closure, unused
4816 do_shutdown (void *cls)
4820 if (NULL != ephemeral_task)
4822 GNUNET_SCHEDULER_cancel (ephemeral_task);
4823 ephemeral_task = NULL;
4825 GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4828 if (NULL != peerstore)
4830 GNUNET_PEERSTORE_disconnect (peerstore,
4834 if (NULL != GST_stats)
4836 GNUNET_STATISTICS_destroy (GST_stats,
4840 if (NULL != GST_my_private_key)
4842 GNUNET_free (GST_my_private_key);
4843 GST_my_private_key = NULL;
4845 GNUNET_CONTAINER_multipeermap_destroy (neighbours);
4847 GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
4850 GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
4852 GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
4855 GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
4856 ephemeral_map = NULL;
4857 GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
4858 ephemeral_heap = NULL;
4863 * Initiate transport service.
4865 * @param cls closure
4866 * @param c configuration to use
4867 * @param service the initialized service
4871 const struct GNUNET_CONFIGURATION_Handle *c,
4872 struct GNUNET_SERVICE_Handle *service)
4877 neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
4879 dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
4881 ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
4883 ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4884 GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
4885 if (NULL == GST_my_private_key)
4887 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4888 _("Transport service is lacking key configuration settings. Exiting.\n"));
4889 GNUNET_SCHEDULER_shutdown ();
4892 GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
4893 &GST_my_identity.public_key);
4894 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
4895 "My identity is `%s'\n",
4896 GNUNET_i2s_full (&GST_my_identity));
4897 GST_stats = GNUNET_STATISTICS_create ("transport",
4899 GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
4901 peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
4902 if (NULL == peerstore)
4905 GNUNET_SCHEDULER_shutdown ();
4912 * Define "main" method using service macro.
4916 GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
4919 &client_disconnect_cb,
4921 /* communication with applications */
4922 GNUNET_MQ_hd_fixed_size (suggest,
4923 GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST,
4924 struct ExpressPreferenceMessage,
4926 GNUNET_MQ_hd_fixed_size (suggest_cancel,
4927 GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL,
4928 struct ExpressPreferenceMessage,
4930 /* communication with core */
4931 GNUNET_MQ_hd_fixed_size (client_start,
4932 GNUNET_MESSAGE_TYPE_TRANSPORT_START,
4933 struct StartMessage,
4935 GNUNET_MQ_hd_var_size (client_send,
4936 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
4937 struct OutboundMessage,
4939 /* communication with communicators */
4940 GNUNET_MQ_hd_var_size (communicator_available,
4941 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
4942 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
4944 GNUNET_MQ_hd_var_size (communicator_backchannel,
4945 GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
4946 struct GNUNET_TRANSPORT_CommunicatorBackchannel,
4948 GNUNET_MQ_hd_var_size (add_address,
4949 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
4950 struct GNUNET_TRANSPORT_AddAddressMessage,
4952 GNUNET_MQ_hd_fixed_size (del_address,
4953 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
4954 struct GNUNET_TRANSPORT_DelAddressMessage,
4956 GNUNET_MQ_hd_var_size (incoming_msg,
4957 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
4958 struct GNUNET_TRANSPORT_IncomingMessage,
4960 GNUNET_MQ_hd_fixed_size (queue_create_ok,
4961 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
4962 struct GNUNET_TRANSPORT_CreateQueueResponse,
4964 GNUNET_MQ_hd_fixed_size (queue_create_fail,
4965 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
4966 struct GNUNET_TRANSPORT_CreateQueueResponse,
4968 GNUNET_MQ_hd_var_size (add_queue_message,
4969 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
4970 struct GNUNET_TRANSPORT_AddQueueMessage,
4972 GNUNET_MQ_hd_var_size (address_consider_verify,
4973 GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
4974 struct GNUNET_TRANSPORT_AddressToVerify,
4976 GNUNET_MQ_hd_fixed_size (del_queue_message,
4977 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
4978 struct GNUNET_TRANSPORT_DelQueueMessage,
4980 GNUNET_MQ_hd_fixed_size (send_message_ack,
4981 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
4982 struct GNUNET_TRANSPORT_SendMessageToAck,
4984 /* communication with monitors */
4985 GNUNET_MQ_hd_fixed_size (monitor_start,
4986 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
4987 struct GNUNET_TRANSPORT_MonitorStart,
4989 GNUNET_MQ_handler_end ());
4992 /* end of file gnunet-service-transport.c */