merge
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement next:
36  * - address validation: what is our plan here?
37  *   #1 Peerstore only gets 'validated' addresses
38  *   #2 transport needs another API to "trigger" validation!
39  *      API may be used by core/application or communicators;
40  *      => use yet another lib/MQ/connection?
41  *   #3 transport should use validation to also establish
42  *      effective flow control (for uni-directional transports!)
43  *   #4 UDP broadcasting logic must be extended to use the new API
44  *   #5 only validated addresses go to ATS for scheduling; that
45  *      also ensures we know the RTT 
46  *   #6 to ensure flow control and RTT are OK, we always do the
47  *      'validation', even if address comes from PEERSTORE
48  *   #7 
49  * - ACK handling / retransmission
50  * - address verification
51  * - track RTT, distance, loss, etc.
52  * - DV data structures:
53  *   + learning
54  *   + forgetting
55  *   + using them!
56  * - routing of messages (using DV data structures!)
57  * - handling of DV-boxed messages that need to be forwarded
58  * - backchannel message encryption & decryption
59  * -
60  *
61  * Easy:
62  * - use ATS bandwidth allocation callback and schedule transmissions!
63  *
64  * Plan:
65  * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
66  *
67  * Later:
68  * - change transport-core API to provide proper flow control in both
69  *   directions, allow multiple messages per peer simultaneously (tag
70  *   confirmations with unique message ID), and replace quota-out with
71  *   proper flow control;
72  * - if messages are below MTU, consider adding ACKs and other stuff
73  *   (requires planning at receiver, and additional MST-style demultiplex
74  *    at receiver!)
75  * - could avoid copying body of message into each fragment and keep
76  *   fragments as just pointers into the original message and only
77  *   fully build fragments just before transmission (optimization, should
78  *   reduce CPU and memory use)
79  *
80  * Design realizations / discussion:
81  * - communicators do flow control by calling MQ "notify sent"
82  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
83  *   or explicitly via background channel FC ACKs.  As long as the
84  *   channel is not full, they may 'notify sent' even if the other
85  *   peer has not yet confirmed receipt. The other peer confirming
86  *   is _only_ for FC, not for more reliable transmission; reliable
87  *   transmission (i.e. of fragments) is left to _transport_.
88  * - ACKs sent back in uni-directional communicators are done via
89  *   the background channel API; here transport _may_ initially
90  *   broadcast (with bounded # hops) if no path is known;
91  * - transport should _integrate_ DV-routing and build a view of
92  *   the network; then background channel traffic can be
93  *   routed via DV as well as explicit "DV" traffic.
94  * - background channel is also used for ACKs and NAT traversal support
95  * - transport service is responsible for AEAD'ing the background
96  *   channel, timestamps and monotonic time are used against replay
97  *   of old messages -> peerstore needs to be supplied with
98  *   "latest timestamps seen" data
99  * - if transport implements DV, we likely need a 3rd peermap
100  *   in addition to ephemerals and (direct) neighbours
101  *   => in this data structure, we should track ATS metrics (distance, RTT, etc.)
102  *   as well as latest timestamps seen, goodput, fragments for transmission, etc.
103  *   ==> check if stuff needs to be moved out of "Neighbour"
104  * - transport should encapsualte core-level messages and do its
105  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
106  *   for retransmission
107  */
108 #include "platform.h"
109 #include "gnunet_util_lib.h"
110 #include "gnunet_statistics_service.h"
111 #include "gnunet_transport_monitor_service.h"
112 #include "gnunet_peerstore_service.h"
113 #include "gnunet_hello_lib.h"
114 #include "gnunet_ats_transport_service.h"
115 #include "gnunet_signatures.h"
116 #include "transport.h"
117
118
119 /**
120  * What is the size we assume for a read operation in the
121  * absence of an MTU for the purpose of flow control?
122  */
123 #define IN_PACKET_SIZE_WITHOUT_MTU 128
124
125 /**
126  * If a queue delays the next message by more than this number
127  * of seconds we log a warning. Note: this is for testing,
128  * the value chosen here might be too aggressively low!
129  */
130 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
131
132 /**
133  * How long are ephemeral keys valid?
134  */
135 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
136
137 /**
138  * How long do we keep partially reassembled messages around before giving up?
139  */
140 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
141
142 /**
143  * How many messages can we have pending for a given communicator
144  * process before we start to throttle that communicator?
145  *
146  * Used if a communicator might be CPU-bound and cannot handle the traffic.
147  */
148 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
149
150 /**
151  * How many messages can we have pending for a given session (queue to
152  * a particular peer via a communicator) process before we start to
153  * throttle that queue?
154  *
155  * Used if ATS assigns more bandwidth to a particular transmission
156  * method than that transmission method can right now handle. (Yes,
157  * ATS should eventually notice utilization below allocation and
158  * adjust, but we don't want to queue up tons of messages in the
159  * meantime). Must be significantly below
160  * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
161  */
162 #define SESSION_QUEUE_LIMIT 32
163
164
165 GNUNET_NETWORK_STRUCT_BEGIN
166
167 /**
168  * Outer layer of an encapsulated backchannel message.
169  */
170 struct TransportBackchannelEncapsulationMessage
171 {
172   /**
173    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Distance the backchannel message has traveled, to be updated at
179    * each hop.  Used to bound the number of hops in case a backchannel
180    * message is broadcast and thus travels without routing
181    * information (during initial backchannel discovery).
182    */
183   uint32_t distance;
184
185   /**
186    * Target's peer identity (as backchannels may be transmitted
187    * indirectly, or even be broadcast).
188    */
189   struct GNUNET_PeerIdentity target;
190
191   /**
192    * Ephemeral key setup by the sender for @e target, used
193    * to encrypt the payload.
194    */
195   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
196
197   // FIXME: probably should add random IV here as well,
198   // especially if we re-use ephemeral keys!
199
200   /**
201    * HMAC over the ciphertext of the encrypted, variable-size
202    * body that follows.  Verified via DH of @e target and
203    * @e ephemeral_key
204    */
205   struct GNUNET_HashCode hmac;
206
207   /* Followed by encrypted, variable-size payload */
208 };
209
210
211 /**
212  * Body by which a peer confirms that it is using an ephemeral key.
213  */
214 struct EphemeralConfirmation
215 {
216
217   /**
218    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
219    */
220   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
221
222   /**
223    * How long is this signature over the ephemeral key valid?
224    * Note that the receiver MUST IGNORE the absolute time, and
225    * only interpret the value as a mononic time and reject
226    * "older" values than the last one observed.  Even with this,
227    * there is no real guarantee against replay achieved here,
228    * as the latest timestamp is not persisted.  This is
229    * necessary as we do not want to require synchronized
230    * clocks and may not have a bidirectional communication
231    * channel.  Communicators must protect against replay
232    * attacks when using backchannel communication!
233    */
234   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
235
236   /**
237    * Target's peer identity.
238    */
239   struct GNUNET_PeerIdentity target;
240
241   /**
242    * Ephemeral key setup by the sender for @e target, used
243    * to encrypt the payload.
244    */
245   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
246
247 };
248
249
250 /**
251  * Plaintext of the variable-size payload that is encrypted
252  * within a `struct TransportBackchannelEncapsulationMessage`
253  */
254 struct TransportBackchannelRequestPayload
255 {
256
257   /**
258    * Sender's peer identity.
259    */
260   struct GNUNET_PeerIdentity sender;
261
262   /**
263    * Signature of the sender over an
264    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
265    */
266   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
267
268   /**
269    * How long is this signature over the ephemeral key
270    * valid?
271    */
272   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
273
274   /**
275    * Current monotonic time of the sending transport service.  Used to
276    * detect replayed messages.  Note that the receiver should remember
277    * a list of the recently seen timestamps and only reject messages
278    * if the timestamp is in the list, or the list is "full" and the
279    * timestamp is smaller than the lowest in the list.  This list of
280    * timestamps per peer should be persisted to guard against replays
281    * after restarts.
282    */
283   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
284
285   /* Followed by a `struct GNUNET_MessageHeader` with a message
286      for a communicator */
287
288   /* Followed by a 0-termianted string specifying the name of
289      the communicator which is to receive the message */
290
291 };
292
293
294 /**
295  * Outer layer of an encapsulated unfragmented application message sent
296  * over an unreliable channel.
297  */
298 struct TransportReliabilityBox
299 {
300   /**
301    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
302    */
303   struct GNUNET_MessageHeader header;
304
305   /**
306    * Number of messages still to be sent before a commulative
307    * ACK is requested.  Zero if an ACK is requested immediately.
308    * In NBO.  Note that the receiver may send the ACK faster
309    * if it believes that is reasonable.
310    */
311   uint32_t ack_countdown GNUNET_PACKED;
312
313   /**
314    * Unique ID of the message used for signalling receipt of
315    * messages sent over possibly unreliable channels.  Should
316    * be a random.
317    */
318   struct GNUNET_ShortHashCode msg_uuid;
319 };
320
321
322 /**
323  * Confirmation that the receiver got a
324  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
325  * confirmation may be transmitted over a completely different queue,
326  * so ACKs are identified by a combination of PID of sender and
327  * message UUID, without the queue playing any role!
328  */
329 struct TransportReliabilityAckMessage
330 {
331   /**
332    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
333    */
334   struct GNUNET_MessageHeader header;
335
336   /**
337    * Reserved. Zero.
338    */
339   uint32_t reserved GNUNET_PACKED;
340
341   /**
342    * How long was the ACK delayed relative to the average time of
343    * receipt of the messages being acknowledged?  Used to calculate
344    * the average RTT by taking the receipt time of the ack minus the
345    * average transmission time of the sender minus this value.
346    */
347   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
348
349   /* followed by any number of `struct GNUNET_ShortHashCode`
350      messages providing ACKs */
351 };
352
353
354 /**
355  * Outer layer of an encapsulated fragmented application message.
356  */
357 struct TransportFragmentBox
358 {
359   /**
360    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
361    */
362   struct GNUNET_MessageHeader header;
363
364   /**
365    * Unique ID of this fragment (and fragment transmission!). Will
366    * change even if a fragement is retransmitted to make each
367    * transmission attempt unique! Should be incremented by one for
368    * each fragment transmission. If a client receives a duplicate
369    * fragment (same @e frag_off), it must send
370    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
371    */
372   uint32_t frag_uuid GNUNET_PACKED;
373
374   /**
375    * Original message ID for of the message that all the1
376    * fragments belong to.  Must be the same for all fragments.
377    */
378   struct GNUNET_ShortHashCode msg_uuid;
379
380   /**
381    * Offset of this fragment in the overall message.
382    */
383   uint16_t frag_off GNUNET_PACKED;
384
385   /**
386    * Total size of the message that is being fragmented.
387    */
388   uint16_t msg_size GNUNET_PACKED;
389
390 };
391
392
393 /**
394  * Outer layer of an fragmented application message sent over a queue
395  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
396  * received, the receiver has two RTTs or 64 further fragments with
397  * the same basic message time to send an acknowledgement, possibly
398  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
399  * sent immediately once all fragments were sent.
400  */
401 struct TransportFragmentAckMessage
402 {
403   /**
404    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
405    */
406   struct GNUNET_MessageHeader header;
407
408   /**
409    * Unique ID of the lowest fragment UUID being acknowledged.
410    */
411   uint32_t frag_uuid GNUNET_PACKED;
412
413   /**
414    * Bitfield of up to 64 additional fragments following the
415    * @e msg_uuid being acknowledged by this message.
416    */
417   uint64_t extra_acks GNUNET_PACKED;
418
419   /**
420    * Original message ID for of the message that all the
421    * fragments belong to.
422    */
423   struct GNUNET_ShortHashCode msg_uuid;
424
425   /**
426    * How long was the ACK delayed relative to the average time of
427    * receipt of the fragments being acknowledged?  Used to calculate
428    * the average RTT by taking the receipt time of the ack minus the
429    * average transmission time of the sender minus this value.
430    */
431   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
432
433   /**
434    * How long until the receiver will stop trying reassembly
435    * of this message?
436    */
437   struct GNUNET_TIME_RelativeNBO reassembly_timeout;
438 };
439
440
441 /**
442  * Internal message used by transport for distance vector learning.
443  * If @e num_hops does not exceed the threshold, peers should append
444  * themselves to the peer list and flood the message (possibly only
445  * to a subset of their neighbours to limit discoverability of the
446  * network topology).  To the extend that the @e bidirectional bits
447  * are set, peers may learn the inverse paths even if they did not
448  * initiate.
449  *
450  * Unless received on a bidirectional queue and @e num_hops just
451  * zero, peers that can forward to the initator should always try to
452  * forward to the initiator.
453  */
454 struct TransportDVLearn
455 {
456   /**
457    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
458    */
459   struct GNUNET_MessageHeader header;
460
461   /**
462    * Number of hops this messages has travelled, in NBO. Zero if
463    * sent by initiator.
464    */
465   uint16_t num_hops GNUNET_PACKED;
466
467   /**
468    * Bitmask of the last 16 hops indicating whether they are confirmed
469    * available (without DV) in both directions or not, in NBO.  Used
470    * to possibly instantly learn a path in both directions.  Each peer
471    * should shift this value by one to the left, and then set the
472    * lowest bit IF the current sender can be reached from it (without
473    * DV routing).
474    */
475   uint16_t bidirectional GNUNET_PACKED;
476
477   /**
478    * Peers receiving this message and delaying forwarding to other
479    * peers for any reason should increment this value such as to
480    * enable the origin to determine the actual network-only delay
481    * in addition to the real-time delay (assuming the message loops
482    * back to the origin).
483    */
484   struct GNUNET_TIME_Relative cummulative_non_network_delay;
485
486   /**
487    * Identity of the peer that started this learning activity.
488    */
489   struct GNUNET_PeerIdentity initiator;
490
491   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
492      excluding the initiator of the DV trace; the last entry is the
493      current sender; the current peer must not be included. */
494
495 };
496
497
498 /**
499  * Outer layer of an encapsulated message send over multiple hops.
500  * The path given only includes the identities of the subsequent
501  * peers, i.e. it will be empty if we are the receiver. Each
502  * forwarding peer should scan the list from the end, and if it can,
503  * forward to the respective peer. The list should then be shortened
504  * by all the entries up to and including that peer.  Each hop should
505  * also increment @e total_hops to allow the receiver to get a precise
506  * estimate on the number of hops the message travelled.  Senders must
507  * provide a learned path that thus should work, but intermediaries
508  * know of a shortcut, they are allowed to send the message via that
509  * shortcut.
510  *
511  * If a peer finds itself still on the list, it must drop the message.
512  */
513 struct TransportDVBox
514 {
515   /**
516    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
517    */
518   struct GNUNET_MessageHeader header;
519
520   /**
521    * Number of total hops this messages travelled. In NBO.
522    * @e origin sets this to zero, to be incremented at
523    * each hop.
524    */
525   uint16_t total_hops GNUNET_PACKED;
526
527   /**
528    * Number of hops this messages includes. In NBO.
529    */
530   uint16_t num_hops GNUNET_PACKED;
531
532   /**
533    * Identity of the peer that originated the message.
534    */
535   struct GNUNET_PeerIdentity origin;
536
537   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
538      excluding the @e origin and the current peer, the last must be
539      the ultimate target; if @e num_hops is zero, the receiver of this
540      message is the ultimate target. */
541
542   /* Followed by the actual message, which itself may be
543      another box, but not a DV_LEARN or DV_BOX message! */
544 };
545
546
547 GNUNET_NETWORK_STRUCT_END
548
549
550
551 /**
552  * What type of client is the `struct TransportClient` about?
553  */
554 enum ClientType
555 {
556   /**
557    * We do not know yet (client is fresh).
558    */
559   CT_NONE = 0,
560
561   /**
562    * Is the CORE service, we need to forward traffic to it.
563    */
564   CT_CORE = 1,
565
566   /**
567    * It is a monitor, forward monitor data.
568    */
569   CT_MONITOR = 2,
570
571   /**
572    * It is a communicator, use for communication.
573    */
574   CT_COMMUNICATOR = 3
575 };
576
577
578 /**
579  * Entry in our cache of ephemeral keys we currently use.
580  * This way, we only sign an ephemeral once per @e target,
581  * and then can re-use it over multiple
582  * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION
583  * messages (as signing is expensive).
584  */
585 struct EphemeralCacheEntry
586 {
587
588   /**
589    * Target's peer identity (we don't re-use ephemerals
590    * to limit linkability of messages).
591    */
592   struct GNUNET_PeerIdentity target;
593
594   /**
595    * Signature affirming @e ephemeral_key of type
596    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
597    */
598   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
599
600   /**
601    * How long is @e sender_sig valid
602    */
603   struct GNUNET_TIME_Absolute ephemeral_validity;
604
605   /**
606    * Our ephemeral key.
607    */
608   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
609
610   /**
611    * Our private ephemeral key.
612    */
613   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
614
615   /**
616    * Node in the ephemeral cache for this entry.
617    * Used for expiration.
618    */
619   struct GNUNET_CONTAINER_HeapNode *hn;
620 };
621
622
623 /**
624  * Client connected to the transport service.
625  */
626 struct TransportClient;
627
628
629 /**
630  * A neighbour that at least one communicator is connected to.
631  */
632 struct Neighbour;
633
634
635 /**
636  * Entry in our #dv_routes table, representing a (set of) distance
637  * vector routes to a particular peer.
638  */
639 struct DistanceVector;
640
641 /**
642  * One possible hop towards a DV target.
643  */
644 struct DistanceVectorHop
645 {
646
647   /**
648    * Kept in a MDLL, sorted by @e timeout.
649    */
650   struct DistanceVectorHop *next_dv;
651
652   /**
653    * Kept in a MDLL, sorted by @e timeout.
654    */
655   struct DistanceVectorHop *prev_dv;
656
657   /**
658    * Kept in a MDLL.
659    */
660   struct DistanceVectorHop *next_neighbour;
661
662   /**
663    * Kept in a MDLL.
664    */
665   struct DistanceVectorHop *prev_neighbour;
666
667   /**
668    * What would be the next hop to @e target?
669    */
670   struct Neighbour *next_hop;
671
672   /**
673    * Distance vector entry this hop belongs with.
674    */
675   struct DistanceVector *dv;
676
677   /**
678    * Array of @e distance hops to the target, excluding @e next_hop.
679    * NULL if the entire path is us to @e next_hop to `target`. Allocated
680    * at the end of this struct.
681    */
682   const struct GNUNET_PeerIdentity *path;
683
684   /**
685    * At what time do we forget about this path unless we see it again
686    * while learning?
687    */
688   struct GNUNET_TIME_Absolute timeout;
689
690   /**
691    * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
692    * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
693    */
694   unsigned int distance;
695 };
696
697
698 /**
699  * Entry in our #dv_routes table, representing a (set of) distance
700  * vector routes to a particular peer.
701  */
702 struct DistanceVector
703 {
704
705   /**
706    * To which peer is this a route?
707    */
708   struct GNUNET_PeerIdentity target;
709
710   /**
711    * Known paths to @e target.
712    */
713   struct DistanceVectorHop *dv_head;
714
715   /**
716    * Known paths to @e target.
717    */
718   struct DistanceVectorHop *dv_tail;
719
720   /**
721    * Task scheduled to purge expired paths from @e dv_head MDLL.
722    */
723   struct GNUNET_SCHEDULER_Task *timeout_task;
724 };
725
726
727 /**
728  * Entry identifying transmission in one of our `struct
729  * GNUNET_ATS_Sessions` which still awaits an ACK.  This is used to
730  * ensure we do not overwhelm a communicator and limit the number of
731  * messages outstanding per communicator (say in case communicator is
732  * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
733  * what the communicator can actually provide towards a particular
734  * peer/target).
735  */
736 struct QueueEntry
737 {
738
739   /**
740    * Kept as a DLL.
741    */
742   struct QueueEntry *next;
743
744   /**
745    * Kept as a DLL.
746    */
747   struct QueueEntry *prev;
748
749   /**
750    * ATS session this entry is queued with.
751    */
752   struct GNUNET_ATS_Session *session;
753
754   /**
755    * Message ID used for this message with the queue used for transmission.
756    */
757   uint64_t mid;
758 };
759
760
761 /**
762  * An ATS session is a message queue provided by a communicator
763  * via which we can reach a particular neighbour.
764  */
765 struct GNUNET_ATS_Session
766 {
767   /**
768    * Kept in a MDLL.
769    */
770   struct GNUNET_ATS_Session *next_neighbour;
771
772   /**
773    * Kept in a MDLL.
774    */
775   struct GNUNET_ATS_Session *prev_neighbour;
776
777   /**
778    * Kept in a MDLL.
779    */
780   struct GNUNET_ATS_Session *prev_client;
781
782   /**
783    * Kept in a MDLL.
784    */
785   struct GNUNET_ATS_Session *next_client;
786
787   /**
788    * Head of DLL of unacked transmission requests.
789    */
790   struct QueueEntry *queue_head;
791
792   /**
793    * End of DLL of unacked transmission requests.
794    */
795   struct QueueEntry *queue_tail;
796
797   /**
798    * Which neighbour is this ATS session for?
799    */
800   struct Neighbour *neighbour;
801
802   /**
803    * Which communicator offers this ATS session?
804    */
805   struct TransportClient *tc;
806
807   /**
808    * Address served by the ATS session.
809    */
810   const char *address;
811
812   /**
813    * Handle by which we inform ATS about this queue.
814    */
815   struct GNUNET_ATS_SessionRecord *sr;
816
817   /**
818    * Task scheduled for the time when this queue can (likely) transmit the
819    * next message. Still needs to check with the @e tracker_out to be sure.
820    */
821   struct GNUNET_SCHEDULER_Task *transmit_task;
822
823   /**
824    * Our current RTT estimate for this ATS session.
825    */
826   struct GNUNET_TIME_Relative rtt;
827
828   /**
829    * Message ID generator for transmissions on this queue.
830    */
831   uint64_t mid_gen;
832
833   /**
834    * Unique identifier of this ATS session with the communicator.
835    */
836   uint32_t qid;
837
838   /**
839    * Maximum transmission unit supported by this ATS session.
840    */
841   uint32_t mtu;
842
843   /**
844    * Distance to the target of this ATS session.
845    */
846   uint32_t distance;
847
848   /**
849    * Messages pending.
850    */
851   uint32_t num_msg_pending;
852
853   /**
854    * Bytes pending.
855    */
856   uint32_t num_bytes_pending;
857
858   /**
859    * Length of the DLL starting at @e queue_head.
860    */
861   unsigned int queue_length;
862
863   /**
864    * Network type offered by this ATS session.
865    */
866   enum GNUNET_NetworkType nt;
867
868   /**
869    * Connection status for this ATS session.
870    */
871   enum GNUNET_TRANSPORT_ConnectionStatus cs;
872
873   /**
874    * How much outbound bandwidth do we have available for this session?
875    */
876   struct GNUNET_BANDWIDTH_Tracker tracker_out;
877
878   /**
879    * How much inbound bandwidth do we have available for this session?
880    */
881   struct GNUNET_BANDWIDTH_Tracker tracker_in;
882 };
883
884
885 /**
886  * Information we keep for a message that we are reassembling.
887  */
888 struct ReassemblyContext
889 {
890
891   /**
892    * Original message ID for of the message that all the
893    * fragments belong to.
894    */
895   struct GNUNET_ShortHashCode msg_uuid;
896
897   /**
898    * Which neighbour is this context for?
899    */
900   struct Neighbour *neighbour;
901
902   /**
903    * Entry in the reassembly heap (sorted by expiration).
904    */
905   struct GNUNET_CONTAINER_HeapNode *hn;
906
907   /**
908    * Bitfield with @e msg_size bits representing the positions
909    * where we have received fragments.  When we receive a fragment,
910    * we check the bits in @e bitfield before incrementing @e msg_missing.
911    *
912    * Allocated after the reassembled message.
913    */
914   uint8_t *bitfield;
915
916   /**
917    * Task for sending ACK. We may send ACKs either because of hitting
918    * the @e extra_acks limit, or based on time and @e num_acks.  This
919    * task is for the latter case.
920    */
921   struct GNUNET_SCHEDULER_Task *ack_task;
922
923   /**
924    * At what time will we give up reassembly of this message?
925    */
926   struct GNUNET_TIME_Absolute reassembly_timeout;
927
928   /**
929    * Average delay of all acks in @e extra_acks and @e frag_uuid.
930    * Should be reset to zero when @e num_acks is set to 0.
931    */
932   struct GNUNET_TIME_Relative avg_ack_delay;
933
934   /**
935    * Time we received the last fragment.  @e avg_ack_delay must be
936    * incremented by now - @e last_frag multiplied by @e num_acks.
937    */
938   struct GNUNET_TIME_Absolute last_frag;
939
940   /**
941    * Bitfield of up to 64 additional fragments following @e frag_uuid
942    * to be acknowledged in the next cummulative ACK.
943    */
944   uint64_t extra_acks;
945
946   /**
947    * Unique ID of the lowest fragment UUID to be acknowledged in the
948    * next cummulative ACK.  Only valid if @e num_acks > 0.
949    */
950   uint32_t frag_uuid;
951
952   /**
953    * Number of ACKs we have accumulated so far.  Reset to 0
954    * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
955    */
956   unsigned int num_acks;
957
958   /**
959    * How big is the message we are reassembling in total?
960    */
961   uint16_t msg_size;
962
963   /**
964    * How many bytes of the message are still missing?  Defragmentation
965    * is complete when @e msg_missing == 0.
966    */
967   uint16_t msg_missing;
968
969   /* Followed by @e msg_size bytes of the (partially) defragmented original message */
970
971   /* Followed by @e bitfield data */
972 };
973
974
975 /**
976  * A neighbour that at least one communicator is connected to.
977  */
978 struct Neighbour
979 {
980
981   /**
982    * Which peer is this about?
983    */
984   struct GNUNET_PeerIdentity pid;
985
986   /**
987    * Map with `struct ReassemblyContext` structs for fragments under
988    * reassembly. May be NULL if we currently have no fragments from
989    * this @e pid (lazy initialization).
990    */
991   struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
992
993   /**
994    * Heap with `struct ReassemblyContext` structs for fragments under
995    * reassembly. May be NULL if we currently have no fragments from
996    * this @e pid (lazy initialization).
997    */
998   struct GNUNET_CONTAINER_Heap *reassembly_heap;
999
1000   /**
1001    * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
1002    */
1003   struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
1004
1005   /**
1006    * Head of list of messages pending for this neighbour.
1007    */
1008   struct PendingMessage *pending_msg_head;
1009
1010   /**
1011    * Tail of list of messages pending for this neighbour.
1012    */
1013   struct PendingMessage *pending_msg_tail;
1014
1015   /**
1016    * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1017    * purged if this neighbour goes down.
1018    */
1019   struct DistanceVectorHop *dv_head;
1020
1021   /**
1022    * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1023    * purged if this neighbour goes down.
1024    */
1025   struct DistanceVectorHop *dv_tail;
1026
1027   /**
1028    * Head of DLL of ATS sessions to this peer.
1029    */
1030   struct GNUNET_ATS_Session *session_head;
1031
1032   /**
1033    * Tail of DLL of ATS sessions to this peer.
1034    */
1035   struct GNUNET_ATS_Session *session_tail;
1036
1037   /**
1038    * Task run to cleanup pending messages that have exceeded their timeout.
1039    */
1040   struct GNUNET_SCHEDULER_Task *timeout_task;
1041
1042   /**
1043    * Quota at which CORE is allowed to transmit to this peer
1044    * according to ATS.
1045    *
1046    * FIXME: not yet used, tricky to get right given multiple queues!
1047    *        (=> Idea: let ATS set a quota per queue and we add them up here?)
1048    * FIXME: how do we set this value initially when we tell CORE?
1049    *    Options: start at a minimum value or at literally zero (before ATS?)
1050    *         (=> Current thought: clean would be zero!)
1051    */
1052   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1053
1054   /**
1055    * What is the earliest timeout of any message in @e pending_msg_tail?
1056    */
1057   struct GNUNET_TIME_Absolute earliest_timeout;
1058
1059 };
1060
1061
1062 /**
1063  * Types of different pending messages.
1064  */
1065 enum PendingMessageType
1066 {
1067
1068   /**
1069    * Ordinary message received from the CORE service.
1070    */
1071   PMT_CORE = 0,
1072
1073   /**
1074    * Fragment box.
1075    */
1076   PMT_FRAGMENT_BOX = 1,
1077
1078   /**
1079    * Reliability box.
1080    */
1081   PMT_RELIABILITY_BOX = 2,
1082
1083   /**
1084    * Any type of acknowledgement.
1085    */
1086   PMT_ACKNOWLEDGEMENT = 3
1087
1088
1089 };
1090
1091
1092 /**
1093  * Transmission request that is awaiting delivery.  The original
1094  * transmission requests from CORE may be too big for some queues.
1095  * In this case, a *tree* of fragments is created.  At each
1096  * level of the tree, fragments are kept in a DLL ordered by which
1097  * fragment should be sent next (at the head).  The tree is searched
1098  * top-down, with the original message at the root.
1099  *
1100  * To select a node for transmission, first it is checked if the
1101  * current node's message fits with the MTU.  If it does not, we
1102  * either calculate the next fragment (based on @e frag_off) from the
1103  * current node, or, if all fragments have already been created,
1104  * descend to the @e head_frag.  Even though the node was already
1105  * fragmented, the fragment may be too big if the fragment was
1106  * generated for a queue with a larger MTU. In this case, the node
1107  * may be fragmented again, thus creating a tree.
1108  *
1109  * When acknowledgements for fragments are received, the tree
1110  * must be pruned, removing those parts that were already
1111  * acknowledged.  When fragments are sent over a reliable
1112  * channel, they can be immediately removed.
1113  *
1114  * If a message is ever fragmented, then the original "full" message
1115  * is never again transmitted (even if it fits below the MTU), and
1116  * only (remaining) fragments are sent.
1117  */
1118 struct PendingMessage
1119 {
1120   /**
1121    * Kept in a MDLL of messages for this @a target.
1122    */
1123   struct PendingMessage *next_neighbour;
1124
1125   /**
1126    * Kept in a MDLL of messages for this @a target.
1127    */
1128   struct PendingMessage *prev_neighbour;
1129
1130   /**
1131    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1132    */
1133   struct PendingMessage *next_client;
1134
1135   /**
1136    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
1137    */
1138   struct PendingMessage *prev_client;
1139
1140   /**
1141    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1142    */
1143   struct PendingMessage *next_frag;
1144
1145   /**
1146    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
1147    */
1148   struct PendingMessage *prev_frag;
1149
1150   /**
1151    * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1152    */
1153   struct PendingMessage *bpm;
1154
1155   /**
1156    * Target of the request.
1157    */
1158   struct Neighbour *target;
1159
1160   /**
1161    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1162    */
1163   struct TransportClient *client;
1164
1165   /**
1166    * Head of a MDLL of fragments created for this core message.
1167    */
1168   struct PendingMessage *head_frag;
1169
1170   /**
1171    * Tail of a MDLL of fragments created for this core message.
1172    */
1173   struct PendingMessage *tail_frag;
1174
1175   /**
1176    * Our parent in the fragmentation tree.
1177    */
1178   struct PendingMessage *frag_parent;
1179
1180   /**
1181    * At what time should we give up on the transmission (and no longer retry)?
1182    */
1183   struct GNUNET_TIME_Absolute timeout;
1184
1185   /**
1186    * What is the earliest time for us to retry transmission of this message?
1187    */
1188   struct GNUNET_TIME_Absolute next_attempt;
1189
1190   /**
1191    * UUID to use for this message (used for reassembly of fragments, only
1192    * initialized if @e msg_uuid_set is #GNUNET_YES).
1193    */
1194   struct GNUNET_ShortHashCode msg_uuid;
1195
1196   /**
1197    * Counter incremented per generated fragment.
1198    */
1199   uint32_t frag_uuidgen;
1200
1201   /**
1202    * Type of the pending message.
1203    */
1204   enum PendingMessageType pmt;
1205
1206   /**
1207    * Size of the original message.
1208    */
1209   uint16_t bytes_msg;
1210
1211   /**
1212    * Offset at which we should generate the next fragment.
1213    */
1214   uint16_t frag_off;
1215
1216   /**
1217    * #GNUNET_YES once @e msg_uuid was initialized
1218    */
1219   int16_t msg_uuid_set;
1220
1221   /* Followed by @e bytes_msg to transmit */
1222 };
1223
1224
1225 /**
1226  * One of the addresses of this peer.
1227  */
1228 struct AddressListEntry
1229 {
1230
1231   /**
1232    * Kept in a DLL.
1233    */
1234   struct AddressListEntry *next;
1235
1236   /**
1237    * Kept in a DLL.
1238    */
1239   struct AddressListEntry *prev;
1240
1241   /**
1242    * Which communicator provides this address?
1243    */
1244   struct TransportClient *tc;
1245
1246   /**
1247    * The actual address.
1248    */
1249   const char *address;
1250
1251   /**
1252    * Current context for storing this address in the peerstore.
1253    */
1254   struct GNUNET_PEERSTORE_StoreContext *sc;
1255
1256   /**
1257    * Task to periodically do @e st operation.
1258    */
1259   struct GNUNET_SCHEDULER_Task *st;
1260
1261   /**
1262    * What is a typical lifetime the communicator expects this
1263    * address to have? (Always from now.)
1264    */
1265   struct GNUNET_TIME_Relative expiration;
1266
1267   /**
1268    * Address identifier used by the communicator.
1269    */
1270   uint32_t aid;
1271
1272   /**
1273    * Network type offered by this address.
1274    */
1275   enum GNUNET_NetworkType nt;
1276
1277 };
1278
1279
1280 /**
1281  * Client connected to the transport service.
1282  */
1283 struct TransportClient
1284 {
1285
1286   /**
1287    * Kept in a DLL.
1288    */
1289   struct TransportClient *next;
1290
1291   /**
1292    * Kept in a DLL.
1293    */
1294   struct TransportClient *prev;
1295
1296   /**
1297    * Handle to the client.
1298    */
1299   struct GNUNET_SERVICE_Client *client;
1300
1301   /**
1302    * Message queue to the client.
1303    */
1304   struct GNUNET_MQ_Handle *mq;
1305
1306   /**
1307    * What type of client is this?
1308    */
1309   enum ClientType type;
1310
1311   union
1312   {
1313
1314     /**
1315      * Information for @e type #CT_CORE.
1316      */
1317     struct {
1318
1319       /**
1320        * Head of list of messages pending for this client, sorted by
1321        * transmission time ("next_attempt" + possibly internal prioritization).
1322        */
1323       struct PendingMessage *pending_msg_head;
1324
1325       /**
1326        * Tail of list of messages pending for this client.
1327        */
1328       struct PendingMessage *pending_msg_tail;
1329
1330     } core;
1331
1332     /**
1333      * Information for @e type #CT_MONITOR.
1334      */
1335     struct {
1336
1337       /**
1338        * Peer identity to monitor the addresses of.
1339        * Zero to monitor all neighbours.  Valid if
1340        * @e type is #CT_MONITOR.
1341        */
1342       struct GNUNET_PeerIdentity peer;
1343
1344       /**
1345        * Is this a one-shot monitor?
1346        */
1347       int one_shot;
1348
1349     } monitor;
1350
1351
1352     /**
1353      * Information for @e type #CT_COMMUNICATOR.
1354      */
1355     struct {
1356       /**
1357        * If @e type is #CT_COMMUNICATOR, this communicator
1358        * supports communicating using these addresses.
1359        */
1360       char *address_prefix;
1361
1362       /**
1363        * Head of DLL of queues offered by this communicator.
1364        */
1365       struct GNUNET_ATS_Session *session_head;
1366
1367       /**
1368        * Tail of DLL of queues offered by this communicator.
1369        */
1370       struct GNUNET_ATS_Session *session_tail;
1371
1372       /**
1373        * Head of list of the addresses of this peer offered by this communicator.
1374        */
1375       struct AddressListEntry *addr_head;
1376
1377       /**
1378        * Tail of list of the addresses of this peer offered by this communicator.
1379        */
1380       struct AddressListEntry *addr_tail;
1381
1382       /**
1383        * Number of queue entries in all queues to this communicator. Used
1384        * throttle sending to a communicator if we see that the communicator
1385        * is globally unable to keep up.
1386        */
1387       unsigned int total_queue_length;
1388
1389       /**
1390        * Characteristics of this communicator.
1391        */
1392       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1393
1394     } communicator;
1395
1396   } details;
1397
1398 };
1399
1400
1401 /**
1402  * Head of linked list of all clients to this service.
1403  */
1404 static struct TransportClient *clients_head;
1405
1406 /**
1407  * Tail of linked list of all clients to this service.
1408  */
1409 static struct TransportClient *clients_tail;
1410
1411 /**
1412  * Statistics handle.
1413  */
1414 static struct GNUNET_STATISTICS_Handle *GST_stats;
1415
1416 /**
1417  * Configuration handle.
1418  */
1419 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1420
1421 /**
1422  * Our public key.
1423  */
1424 static struct GNUNET_PeerIdentity GST_my_identity;
1425
1426 /**
1427  * Our private key.
1428  */
1429 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1430
1431 /**
1432  * Map from PIDs to `struct Neighbour` entries.  A peer is
1433  * a neighbour if we have an MQ to it from some communicator.
1434  */
1435 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1436
1437 /**
1438  * Map from PIDs to `struct DistanceVector` entries describing
1439  * known paths to the peer.
1440  */
1441 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1442
1443 /**
1444  * Database for peer's HELLOs.
1445  */
1446 static struct GNUNET_PEERSTORE_Handle *peerstore;
1447
1448 /**
1449  * Heap sorting `struct EphemeralCacheEntry` by their
1450  * key/signature validity.
1451  */
1452 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1453
1454 /**
1455  * Hash map for looking up `struct EphemeralCacheEntry`s
1456  * by peer identity. (We may have ephemerals in our
1457  * cache for which we do not have a neighbour entry,
1458  * and similar many neighbours may not need ephemerals,
1459  * so we use a second map.)
1460  */
1461 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1462
1463 /**
1464  * Task to free expired ephemerals.
1465  */
1466 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
1467
1468 /**
1469  * Our connection to ATS for allocation and bootstrapping.
1470  */
1471 static struct GNUNET_ATS_TransportHandle *ats;
1472
1473
1474 /**
1475  * Free cached ephemeral key.
1476  *
1477  * @param ece cached signature to free
1478  */
1479 static void
1480 free_ephemeral (struct EphemeralCacheEntry *ece)
1481 {
1482   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1483                                         &ece->target,
1484                                         ece);
1485   GNUNET_CONTAINER_heap_remove_node (ece->hn);
1486   GNUNET_free (ece);
1487 }
1488
1489
1490 /**
1491  * Lookup neighbour record for peer @a pid.
1492  *
1493  * @param pid neighbour to look for
1494  * @return NULL if we do not have this peer as a neighbour
1495  */
1496 static struct Neighbour *
1497 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1498 {
1499   return GNUNET_CONTAINER_multipeermap_get (neighbours,
1500                                             pid);
1501 }
1502
1503
1504 /**
1505  * Details about what to notify monitors about.
1506  */
1507 struct MonitorEvent
1508 {
1509   /**
1510    * @deprecated To be discussed if we keep these...
1511    */
1512   struct GNUNET_TIME_Absolute last_validation;
1513   struct GNUNET_TIME_Absolute valid_until;
1514   struct GNUNET_TIME_Absolute next_validation;
1515
1516   /**
1517    * Current round-trip time estimate.
1518    */
1519   struct GNUNET_TIME_Relative rtt;
1520
1521   /**
1522    * Connection status.
1523    */
1524   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1525
1526   /**
1527    * Messages pending.
1528    */
1529   uint32_t num_msg_pending;
1530
1531   /**
1532    * Bytes pending.
1533    */
1534   uint32_t num_bytes_pending;
1535
1536
1537 };
1538
1539
1540 /**
1541  * Free a @dvh, and if it is the last path to the `target`,also
1542  * free the associated DV entry in #dv_routes.
1543  *
1544  * @param dvh hop to free
1545  */
1546 static void
1547 free_distance_vector_hop (struct DistanceVectorHop *dvh)
1548 {
1549   struct Neighbour *n = dvh->next_hop;
1550   struct DistanceVector *dv = dvh->dv;
1551
1552   GNUNET_CONTAINER_MDLL_remove (neighbour,
1553                                 n->dv_head,
1554                                 n->dv_tail,
1555                                 dvh);
1556   GNUNET_CONTAINER_MDLL_remove (dv,
1557                                 dv->dv_head,
1558                                 dv->dv_tail,
1559                                 dvh);
1560   GNUNET_free (dvh);
1561   if (NULL == dv->dv_head)
1562   {
1563     GNUNET_assert (GNUNET_YES ==
1564                    GNUNET_CONTAINER_multipeermap_remove (dv_routes,
1565                                                          &dv->target,
1566                                                          dv));
1567     if (NULL != dv->timeout_task)
1568       GNUNET_SCHEDULER_cancel (dv->timeout_task);
1569     GNUNET_free (dv);
1570   }
1571 }
1572
1573
1574 /**
1575  * Free entry in #dv_routes.  First frees all hops to the target, and
1576  * the last target will implicitly free @a dv as well.
1577  *
1578  * @param dv route to free
1579  */
1580 static void
1581 free_dv_route (struct DistanceVector *dv)
1582 {
1583   struct DistanceVectorHop *dvh;
1584
1585   while (NULL != (dvh = dv->dv_head))
1586     free_distance_vector_hop (dvh);
1587 }
1588
1589
1590 /**
1591  * Notify monitor @a tc about an event.  That @a tc
1592  * cares about the event has already been checked.
1593  *
1594  * Send @a tc information in @a me about a @a peer's status with
1595  * respect to some @a address to all monitors that care.
1596  *
1597  * @param tc monitor to inform
1598  * @param peer peer the information is about
1599  * @param address address the information is about
1600  * @param nt network type associated with @a address
1601  * @param me detailed information to transmit
1602  */
1603 static void
1604 notify_monitor (struct TransportClient *tc,
1605                 const struct GNUNET_PeerIdentity *peer,
1606                 const char *address,
1607                 enum GNUNET_NetworkType nt,
1608                 const struct MonitorEvent *me)
1609 {
1610   struct GNUNET_MQ_Envelope *env;
1611   struct GNUNET_TRANSPORT_MonitorData *md;
1612   size_t addr_len = strlen (address) + 1;
1613
1614   env = GNUNET_MQ_msg_extra (md,
1615                              addr_len,
1616                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1617   md->nt = htonl ((uint32_t) nt);
1618   md->peer = *peer;
1619   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1620   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1621   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1622   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1623   md->cs = htonl ((uint32_t) me->cs);
1624   md->num_msg_pending = htonl (me->num_msg_pending);
1625   md->num_bytes_pending = htonl (me->num_bytes_pending);
1626   memcpy (&md[1],
1627           address,
1628           addr_len);
1629   GNUNET_MQ_send (tc->mq,
1630                   env);
1631 }
1632
1633
1634 /**
1635  * Send information in @a me about a @a peer's status with respect
1636  * to some @a address to all monitors that care.
1637  *
1638  * @param peer peer the information is about
1639  * @param address address the information is about
1640  * @param nt network type associated with @a address
1641  * @param me detailed information to transmit
1642  */
1643 static void
1644 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1645                  const char *address,
1646                  enum GNUNET_NetworkType nt,
1647                  const struct MonitorEvent *me)
1648 {
1649   static struct GNUNET_PeerIdentity zero;
1650
1651   for (struct TransportClient *tc = clients_head;
1652        NULL != tc;
1653        tc = tc->next)
1654   {
1655     if (CT_MONITOR != tc->type)
1656       continue;
1657     if (tc->details.monitor.one_shot)
1658       continue;
1659     if ( (0 != memcmp (&tc->details.monitor.peer,
1660                        &zero,
1661                        sizeof (zero))) &&
1662          (0 != memcmp (&tc->details.monitor.peer,
1663                        peer,
1664                        sizeof (*peer))) )
1665       continue;
1666     notify_monitor (tc,
1667                     peer,
1668                     address,
1669                     nt,
1670                     me);
1671   }
1672 }
1673
1674
1675 /**
1676  * Called whenever a client connects.  Allocates our
1677  * data structures associated with that client.
1678  *
1679  * @param cls closure, NULL
1680  * @param client identification of the client
1681  * @param mq message queue for the client
1682  * @return our `struct TransportClient`
1683  */
1684 static void *
1685 client_connect_cb (void *cls,
1686                    struct GNUNET_SERVICE_Client *client,
1687                    struct GNUNET_MQ_Handle *mq)
1688 {
1689   struct TransportClient *tc;
1690
1691   tc = GNUNET_new (struct TransportClient);
1692   tc->client = client;
1693   tc->mq = mq;
1694   GNUNET_CONTAINER_DLL_insert (clients_head,
1695                                clients_tail,
1696                                tc);
1697   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1698               "Client %p connected\n",
1699               tc);
1700   return tc;
1701 }
1702
1703
1704 /**
1705  * Free @a rc
1706  *
1707  * @param rc data structure to free
1708  */
1709 static void
1710 free_reassembly_context (struct ReassemblyContext *rc)
1711 {
1712   struct Neighbour *n = rc->neighbour;
1713
1714   GNUNET_assert (rc ==
1715                  GNUNET_CONTAINER_heap_remove_node (rc->hn));
1716   GNUNET_assert (GNUNET_OK ==
1717                  GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
1718                                                         &rc->msg_uuid,
1719                                                         rc));
1720   GNUNET_free (rc);
1721 }
1722
1723
1724 /**
1725  * Task run to clean up reassembly context of a neighbour that have expired.
1726  *
1727  * @param cls a `struct Neighbour`
1728  */
1729 static void
1730 reassembly_cleanup_task (void *cls)
1731 {
1732   struct Neighbour *n = cls;
1733   struct ReassemblyContext *rc;
1734
1735   n->reassembly_timeout_task = NULL;
1736   while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
1737   {
1738     if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
1739     {
1740       free_reassembly_context (rc);
1741       continue;
1742     }
1743     GNUNET_assert (NULL == n->reassembly_timeout_task);
1744     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
1745                                                           &reassembly_cleanup_task,
1746                                                           n);
1747     return;
1748   }
1749 }
1750
1751
1752 /**
1753  * function called to #free_reassembly_context().
1754  *
1755  * @param cls NULL
1756  * @param key unused
1757  * @param value a `struct ReassemblyContext` to free
1758  * @return #GNUNET_OK (continue iteration)
1759  */
1760 static int
1761 free_reassembly_cb (void *cls,
1762                     const struct GNUNET_ShortHashCode *key,
1763                     void *value)
1764 {
1765   struct ReassemblyContext *rc = value;
1766   (void) cls;
1767   (void) key;
1768
1769   free_reassembly_context (rc);
1770   return GNUNET_OK;
1771 }
1772
1773
1774 /**
1775  * Release memory used by @a neighbour.
1776  *
1777  * @param neighbour neighbour entry to free
1778  */
1779 static void
1780 free_neighbour (struct Neighbour *neighbour)
1781 {
1782   struct DistanceVectorHop *dvh;
1783
1784   GNUNET_assert (NULL == neighbour->session_head);
1785   GNUNET_assert (GNUNET_YES ==
1786                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
1787                                                        &neighbour->pid,
1788                                                        neighbour));
1789   if (NULL != neighbour->timeout_task)
1790     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1791   if (NULL != neighbour->reassembly_map)
1792   {
1793     GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
1794                                             &free_reassembly_cb,
1795                                             NULL);
1796     GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
1797     neighbour->reassembly_map = NULL;
1798     GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
1799     neighbour->reassembly_heap = NULL;
1800   }
1801   while (NULL != (dvh = neighbour->dv_head))
1802     free_distance_vector_hop (dvh);
1803   if (NULL != neighbour->reassembly_timeout_task)
1804     GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
1805   GNUNET_free (neighbour);
1806 }
1807
1808
1809 /**
1810  * Send message to CORE clients that we lost a connection.
1811  *
1812  * @param tc client to inform (must be CORE client)
1813  * @param pid peer the connection is for
1814  * @param quota_out current quota for the peer
1815  */
1816 static void
1817 core_send_connect_info (struct TransportClient *tc,
1818                         const struct GNUNET_PeerIdentity *pid,
1819                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1820 {
1821   struct GNUNET_MQ_Envelope *env;
1822   struct ConnectInfoMessage *cim;
1823
1824   GNUNET_assert (CT_CORE == tc->type);
1825   env = GNUNET_MQ_msg (cim,
1826                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1827   cim->quota_out = quota_out;
1828   cim->id = *pid;
1829   GNUNET_MQ_send (tc->mq,
1830                   env);
1831 }
1832
1833
1834 /**
1835  * Send message to CORE clients that we gained a connection
1836  *
1837  * @param pid peer the queue was for
1838  * @param quota_out current quota for the peer
1839  */
1840 static void
1841 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1842                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1843 {
1844   for (struct TransportClient *tc = clients_head;
1845        NULL != tc;
1846        tc = tc->next)
1847   {
1848     if (CT_CORE != tc->type)
1849       continue;
1850     core_send_connect_info (tc,
1851                             pid,
1852                             quota_out);
1853   }
1854 }
1855
1856
1857 /**
1858  * Send message to CORE clients that we lost a connection.
1859  *
1860  * @param pid peer the connection was for
1861  */
1862 static void
1863 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1864 {
1865   for (struct TransportClient *tc = clients_head;
1866        NULL != tc;
1867        tc = tc->next)
1868   {
1869     struct GNUNET_MQ_Envelope *env;
1870     struct DisconnectInfoMessage *dim;
1871
1872     if (CT_CORE != tc->type)
1873       continue;
1874     env = GNUNET_MQ_msg (dim,
1875                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1876     dim->peer = *pid;
1877     GNUNET_MQ_send (tc->mq,
1878                     env);
1879   }
1880 }
1881
1882
1883 /**
1884  * We believe we are ready to transmit a message on a queue. Double-checks
1885  * with the queue's "tracker_out" and then gives the message to the
1886  * communicator for transmission (updating the tracker, and re-scheduling
1887  * itself if applicable).
1888  *
1889  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1890  */
1891 static void
1892 transmit_on_queue (void *cls);
1893
1894
1895 /**
1896  * Schedule next run of #transmit_on_queue().  Does NOTHING if
1897  * we should run immediately or if the message queue is empty.
1898  * Test for no task being added AND queue not being empty to
1899  * transmit immediately afterwards!  This function must only
1900  * be called if the message queue is non-empty!
1901  *
1902  * @param queue the queue to do scheduling for
1903  */
1904 static void
1905 schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1906 {
1907   struct Neighbour *n = queue->neighbour;
1908   struct PendingMessage *pm = n->pending_msg_head;
1909   struct GNUNET_TIME_Relative out_delay;
1910   unsigned int wsize;
1911
1912   GNUNET_assert (NULL != pm);
1913   if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1914   {
1915     GNUNET_STATISTICS_update (GST_stats,
1916                               "# Transmission throttled due to communicator queue limit",
1917                               1,
1918                               GNUNET_NO);
1919     return;
1920   }
1921   if (queue->queue_length >= SESSION_QUEUE_LIMIT)
1922   {
1923     GNUNET_STATISTICS_update (GST_stats,
1924                               "# Transmission throttled due to session queue limit",
1925                               1,
1926                               GNUNET_NO);
1927     return;
1928   }
1929
1930   wsize = (0 == queue->mtu)
1931     ? pm->bytes_msg /* FIXME: add overheads? */
1932     : queue->mtu;
1933   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1934                                                   wsize);
1935   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1936                                         out_delay);
1937   if (0 == out_delay.rel_value_us)
1938     return; /* we should run immediately! */
1939   /* queue has changed since we were scheduled, reschedule again */
1940   queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1941                                                        &transmit_on_queue,
1942                                                        queue);
1943   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1944     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1945                 "Next transmission on queue `%s' in %s (high delay)\n",
1946                 queue->address,
1947                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1948                                                         GNUNET_YES));
1949   else
1950     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1951                 "Next transmission on queue `%s' in %s\n",
1952                 queue->address,
1953                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1954                                                         GNUNET_YES));
1955 }
1956
1957
1958 /**
1959  * Free @a session.
1960  *
1961  * @param session the session to free
1962  */
1963 static void
1964 free_session (struct GNUNET_ATS_Session *session)
1965 {
1966   struct Neighbour *neighbour = session->neighbour;
1967   struct TransportClient *tc = session->tc;
1968   struct MonitorEvent me = {
1969     .cs = GNUNET_TRANSPORT_CS_DOWN,
1970     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
1971   };
1972   struct QueueEntry *qe;
1973   int maxxed;
1974
1975   if (NULL != session->transmit_task)
1976   {
1977     GNUNET_SCHEDULER_cancel (session->transmit_task);
1978     session->transmit_task = NULL;
1979   }
1980   GNUNET_CONTAINER_MDLL_remove (neighbour,
1981                                 neighbour->session_head,
1982                                 neighbour->session_tail,
1983                                 session);
1984   GNUNET_CONTAINER_MDLL_remove (client,
1985                                 tc->details.communicator.session_head,
1986                                 tc->details.communicator.session_tail,
1987                                 session);
1988   maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
1989   while (NULL != (qe = session->queue_head))
1990   {
1991     GNUNET_CONTAINER_DLL_remove (session->queue_head,
1992                                  session->queue_tail,
1993                                  qe);
1994     session->queue_length--;
1995     tc->details.communicator.total_queue_length--;
1996     GNUNET_free (qe);
1997   }
1998   GNUNET_assert (0 == session->queue_length);
1999   if ( (maxxed) &&
2000        (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2001   {
2002     /* Communicator dropped below threshold, resume all queues */
2003     GNUNET_STATISTICS_update (GST_stats,
2004                               "# Transmission throttled due to communicator queue limit",
2005                               -1,
2006                               GNUNET_NO);
2007     for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
2008          NULL != s;
2009          s = s->next_client)
2010       schedule_transmit_on_queue (s);
2011   }
2012   notify_monitors (&neighbour->pid,
2013                    session->address,
2014                    session->nt,
2015                    &me);
2016   GNUNET_ATS_session_del (session->sr);
2017   GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
2018   GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
2019   GNUNET_free (session);
2020   if (NULL == neighbour->session_head)
2021   {
2022     cores_send_disconnect_info (&neighbour->pid);
2023     free_neighbour (neighbour);
2024   }
2025 }
2026
2027
2028 /**
2029  * Free @a ale
2030  *
2031  * @param ale address list entry to free
2032  */
2033 static void
2034 free_address_list_entry (struct AddressListEntry *ale)
2035 {
2036   struct TransportClient *tc = ale->tc;
2037
2038   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2039                                tc->details.communicator.addr_tail,
2040                                ale);
2041   if (NULL != ale->sc)
2042   {
2043     GNUNET_PEERSTORE_store_cancel (ale->sc);
2044     ale->sc = NULL;
2045   }
2046   if (NULL != ale->st)
2047   {
2048     GNUNET_SCHEDULER_cancel (ale->st);
2049     ale->st = NULL;
2050   }
2051   GNUNET_free (ale);
2052 }
2053
2054
2055 /**
2056  * Called whenever a client is disconnected.  Frees our
2057  * resources associated with that client.
2058  *
2059  * @param cls closure, NULL
2060  * @param client identification of the client
2061  * @param app_ctx our `struct TransportClient`
2062  */
2063 static void
2064 client_disconnect_cb (void *cls,
2065                       struct GNUNET_SERVICE_Client *client,
2066                       void *app_ctx)
2067 {
2068   struct TransportClient *tc = app_ctx;
2069
2070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2071               "Client %p disconnected, cleaning up.\n",
2072               tc);
2073   GNUNET_CONTAINER_DLL_remove (clients_head,
2074                                clients_tail,
2075                                tc);
2076   switch (tc->type)
2077   {
2078   case CT_NONE:
2079     break;
2080   case CT_CORE:
2081     {
2082       struct PendingMessage *pm;
2083
2084       while (NULL != (pm = tc->details.core.pending_msg_head))
2085       {
2086         GNUNET_CONTAINER_MDLL_remove (client,
2087                                       tc->details.core.pending_msg_head,
2088                                       tc->details.core.pending_msg_tail,
2089                                       pm);
2090         pm->client = NULL;
2091       }
2092     }
2093     break;
2094   case CT_MONITOR:
2095     break;
2096   case CT_COMMUNICATOR:
2097     {
2098       struct GNUNET_ATS_Session *q;
2099       struct AddressListEntry *ale;
2100
2101       while (NULL != (q = tc->details.communicator.session_head))
2102         free_session (q);
2103       while (NULL != (ale = tc->details.communicator.addr_head))
2104         free_address_list_entry (ale);
2105       GNUNET_free (tc->details.communicator.address_prefix);
2106     }
2107     break;
2108   }
2109   GNUNET_free (tc);
2110 }
2111
2112
2113 /**
2114  * Iterator telling new CORE client about all existing
2115  * connections to peers.
2116  *
2117  * @param cls the new `struct TransportClient`
2118  * @param pid a connected peer
2119  * @param value the `struct Neighbour` with more information
2120  * @return #GNUNET_OK (continue to iterate)
2121  */
2122 static int
2123 notify_client_connect_info (void *cls,
2124                             const struct GNUNET_PeerIdentity *pid,
2125                             void *value)
2126 {
2127   struct TransportClient *tc = cls;
2128   struct Neighbour *neighbour = value;
2129
2130   core_send_connect_info (tc,
2131                           pid,
2132                           neighbour->quota_out);
2133   return GNUNET_OK;
2134 }
2135
2136
2137 /**
2138  * Initialize a "CORE" client.  We got a start message from this
2139  * client, so add it to the list of clients for broadcasting of
2140  * inbound messages.
2141  *
2142  * @param cls the client
2143  * @param start the start message that was sent
2144  */
2145 static void
2146 handle_client_start (void *cls,
2147                      const struct StartMessage *start)
2148 {
2149   struct TransportClient *tc = cls;
2150   uint32_t options;
2151
2152   options = ntohl (start->options);
2153   if ( (0 != (1 & options)) &&
2154        (0 !=
2155         memcmp (&start->self,
2156                 &GST_my_identity,
2157                 sizeof (struct GNUNET_PeerIdentity)) ) )
2158   {
2159     /* client thinks this is a different peer, reject */
2160     GNUNET_break (0);
2161     GNUNET_SERVICE_client_drop (tc->client);
2162     return;
2163   }
2164   if (CT_NONE != tc->type)
2165   {
2166     GNUNET_break (0);
2167     GNUNET_SERVICE_client_drop (tc->client);
2168     return;
2169   }
2170   tc->type = CT_CORE;
2171   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2172                                          &notify_client_connect_info,
2173                                          tc);
2174   GNUNET_SERVICE_client_continue (tc->client);
2175 }
2176
2177
2178 /**
2179  * Client asked for transmission to a peer.  Process the request.
2180  *
2181  * @param cls the client
2182  * @param obm the send message that was sent
2183  */
2184 static int
2185 check_client_send (void *cls,
2186                    const struct OutboundMessage *obm)
2187 {
2188   struct TransportClient *tc = cls;
2189   uint16_t size;
2190   const struct GNUNET_MessageHeader *obmm;
2191
2192   if (CT_CORE != tc->type)
2193   {
2194     GNUNET_break (0);
2195     return GNUNET_SYSERR;
2196   }
2197   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2198   if (size < sizeof (struct GNUNET_MessageHeader))
2199   {
2200     GNUNET_break (0);
2201     return GNUNET_SYSERR;
2202   }
2203   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2204   if (size != ntohs (obmm->size))
2205   {
2206     GNUNET_break (0);
2207     return GNUNET_SYSERR;
2208   }
2209   return GNUNET_OK;
2210 }
2211
2212
2213 /**
2214  * Free fragment tree below @e root, excluding @e root itself.
2215  *
2216  * @param root root of the tree to free
2217  */
2218 static void
2219 free_fragment_tree (struct PendingMessage *root)
2220 {
2221   struct PendingMessage *frag;
2222
2223   while (NULL != (frag = root->head_frag))
2224   {
2225     free_fragment_tree (frag);
2226     GNUNET_CONTAINER_MDLL_remove (frag,
2227                                   root->head_frag,
2228                                   root->tail_frag,
2229                                   frag);
2230     GNUNET_free (frag);
2231   }
2232 }
2233
2234
2235 /**
2236  * Release memory associated with @a pm and remove @a pm from associated
2237  * data structures.  @a pm must be a top-level pending message and not
2238  * a fragment in the tree.  The entire tree is freed (if applicable).
2239  *
2240  * @param pm the pending message to free
2241  */
2242 static void
2243 free_pending_message (struct PendingMessage *pm)
2244 {
2245   struct TransportClient *tc = pm->client;
2246   struct Neighbour *target = pm->target;
2247
2248   if (NULL != tc)
2249   {
2250     GNUNET_CONTAINER_MDLL_remove (client,
2251                                   tc->details.core.pending_msg_head,
2252                                   tc->details.core.pending_msg_tail,
2253                                   pm);
2254   }
2255   GNUNET_CONTAINER_MDLL_remove (neighbour,
2256                                 target->pending_msg_head,
2257                                 target->pending_msg_tail,
2258                                 pm);
2259   free_fragment_tree (pm);
2260   GNUNET_free_non_null (pm->bpm);
2261   GNUNET_free (pm);
2262 }
2263
2264
2265 /**
2266  * Send a response to the @a pm that we have processed a
2267  * "send" request with status @a success. We
2268  * transmitted @a bytes_physical on the actual wire.
2269  * Sends a confirmation to the "core" client responsible
2270  * for the original request and free's @a pm.
2271  *
2272  * @param pm handle to the original pending message
2273  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2274  *          for transmission failure
2275  * @param bytes_physical amount of bandwidth consumed
2276  */
2277 static void
2278 client_send_response (struct PendingMessage *pm,
2279                       int success,
2280                       uint32_t bytes_physical)
2281 {
2282   struct TransportClient *tc = pm->client;
2283   struct Neighbour *target = pm->target;
2284   struct GNUNET_MQ_Envelope *env;
2285   struct SendOkMessage *som;
2286
2287   if (NULL != tc)
2288   {
2289     env = GNUNET_MQ_msg (som,
2290                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2291     som->success = htonl ((uint32_t) success);
2292     som->bytes_msg = htons (pm->bytes_msg);
2293     som->bytes_physical = htonl (bytes_physical);
2294     som->peer = target->pid;
2295     GNUNET_MQ_send (tc->mq,
2296                     env);
2297   }
2298   free_pending_message (pm);
2299 }
2300
2301
2302 /**
2303  * Checks the message queue for a neighbour for messages that have timed
2304  * out and purges them.
2305  *
2306  * @param cls a `struct Neighbour`
2307  */
2308 static void
2309 check_queue_timeouts (void *cls)
2310 {
2311   struct Neighbour *n = cls;
2312   struct PendingMessage *pm;
2313   struct GNUNET_TIME_Absolute now;
2314   struct GNUNET_TIME_Absolute earliest_timeout;
2315
2316   n->timeout_task = NULL;
2317   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2318   now = GNUNET_TIME_absolute_get ();
2319   for (struct PendingMessage *pos = n->pending_msg_head;
2320        NULL != pos;
2321        pos = pm)
2322   {
2323     pm = pos->next_neighbour;
2324     if (pos->timeout.abs_value_us <= now.abs_value_us)
2325     {
2326       GNUNET_STATISTICS_update (GST_stats,
2327                                 "# messages dropped (timeout before confirmation)",
2328                                 1,
2329                                 GNUNET_NO);
2330       client_send_response (pm,
2331                             GNUNET_NO,
2332                             0);
2333       continue;
2334     }
2335     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2336                                                  pos->timeout);
2337   }
2338   n->earliest_timeout = earliest_timeout;
2339   if (NULL != n->pending_msg_head)
2340     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2341                                                &check_queue_timeouts,
2342                                                n);
2343 }
2344
2345
2346 /**
2347  * Client asked for transmission to a peer.  Process the request.
2348  *
2349  * @param cls the client
2350  * @param obm the send message that was sent
2351  */
2352 static void
2353 handle_client_send (void *cls,
2354                     const struct OutboundMessage *obm)
2355 {
2356   struct TransportClient *tc = cls;
2357   struct PendingMessage *pm;
2358   const struct GNUNET_MessageHeader *obmm;
2359   struct Neighbour *target;
2360   uint32_t bytes_msg;
2361
2362   GNUNET_assert (CT_CORE == tc->type);
2363   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2364   bytes_msg = ntohs (obmm->size);
2365   target = lookup_neighbour (&obm->peer);
2366   if (NULL == target)
2367   {
2368     /* Failure: don't have this peer as a neighbour (anymore).
2369        Might have gone down asynchronously, so this is NOT
2370        a protocol violation by CORE. Still count the event,
2371        as this should be rare. */
2372     struct GNUNET_MQ_Envelope *env;
2373     struct SendOkMessage *som;
2374
2375     env = GNUNET_MQ_msg (som,
2376                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2377     som->success = htonl (GNUNET_SYSERR);
2378     som->bytes_msg = htonl (bytes_msg);
2379     som->bytes_physical = htonl (0);
2380     som->peer = obm->peer;
2381     GNUNET_MQ_send (tc->mq,
2382                     env);
2383     GNUNET_SERVICE_client_continue (tc->client);
2384     GNUNET_STATISTICS_update (GST_stats,
2385                               "# messages dropped (neighbour unknown)",
2386                               1,
2387                               GNUNET_NO);
2388     return;
2389   }
2390   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2391   pm->client = tc;
2392   pm->target = target;
2393   pm->bytes_msg = bytes_msg;
2394   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2395   memcpy (&pm[1],
2396           &obm[1],
2397           bytes_msg);
2398   GNUNET_CONTAINER_MDLL_insert (neighbour,
2399                                 target->pending_msg_head,
2400                                 target->pending_msg_tail,
2401                                 pm);
2402   GNUNET_CONTAINER_MDLL_insert (client,
2403                                 tc->details.core.pending_msg_head,
2404                                 tc->details.core.pending_msg_tail,
2405                                 pm);
2406   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2407   {
2408     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2409     if (NULL != target->timeout_task)
2410       GNUNET_SCHEDULER_cancel (target->timeout_task);
2411     target->timeout_task
2412       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2413                                  &check_queue_timeouts,
2414                                  target);
2415   }
2416 }
2417
2418
2419 /**
2420  * Communicator started.  Test message is well-formed.
2421  *
2422  * @param cls the client
2423  * @param cam the send message that was sent
2424  */
2425 static int
2426 check_communicator_available (void *cls,
2427                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2428 {
2429   struct TransportClient *tc = cls;
2430   uint16_t size;
2431
2432   if (CT_NONE != tc->type)
2433   {
2434     GNUNET_break (0);
2435     return GNUNET_SYSERR;
2436   }
2437   tc->type = CT_COMMUNICATOR;
2438   size = ntohs (cam->header.size) - sizeof (*cam);
2439   if (0 == size)
2440     return GNUNET_OK; /* receive-only communicator */
2441   GNUNET_MQ_check_zero_termination (cam);
2442   return GNUNET_OK;
2443 }
2444
2445
2446 /**
2447  * Communicator started.  Process the request.
2448  *
2449  * @param cls the client
2450  * @param cam the send message that was sent
2451  */
2452 static void
2453 handle_communicator_available (void *cls,
2454                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2455 {
2456   struct TransportClient *tc = cls;
2457   uint16_t size;
2458
2459   size = ntohs (cam->header.size) - sizeof (*cam);
2460   if (0 == size)
2461     return; /* receive-only communicator */
2462   tc->details.communicator.address_prefix
2463     = GNUNET_strdup ((const char *) &cam[1]);
2464   tc->details.communicator.cc
2465     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2466   GNUNET_SERVICE_client_continue (tc->client);
2467 }
2468
2469
2470 /**
2471  * Communicator requests backchannel transmission.  Check the request.
2472  *
2473  * @param cls the client
2474  * @param cb the send message that was sent
2475  * @return #GNUNET_OK if message is well-formed
2476  */
2477 static int
2478 check_communicator_backchannel (void *cls,
2479                                 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2480 {
2481   const struct GNUNET_MessageHeader *inbox;
2482   const char *is;
2483   uint16_t msize;
2484   uint16_t isize;
2485
2486   msize = ntohs (cb->header.size) - sizeof (*cb);
2487   if (UINT16_MAX - msize >
2488       sizeof (struct TransportBackchannelEncapsulationMessage) +
2489       sizeof (struct TransportBackchannelRequestPayload) )
2490   {
2491     GNUNET_break (0);
2492     return GNUNET_SYSERR;
2493   }
2494   inbox = (const struct GNUNET_MessageHeader *) &cb[1];
2495   isize = ntohs (inbox->size);
2496   if (isize >= msize)
2497   {
2498     GNUNET_break (0);
2499     return GNUNET_SYSERR;
2500   }
2501   is = (const char *) inbox;
2502   is += isize;
2503   msize -= isize;
2504   GNUNET_assert (msize > 0);
2505   if ('\0' != is[msize-1])
2506   {
2507     GNUNET_break (0);
2508     return GNUNET_SYSERR;
2509   }
2510   return GNUNET_OK;
2511 }
2512
2513
2514 /**
2515  * Remove memory used by expired ephemeral keys.
2516  *
2517  * @param cls NULL
2518  */
2519 static void
2520 expire_ephemerals (void *cls)
2521 {
2522   struct EphemeralCacheEntry *ece;
2523
2524   (void) cls;
2525   ephemeral_task = NULL;
2526   while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
2527   {
2528     if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
2529     {
2530       free_ephemeral (ece);
2531       continue;
2532     }
2533     ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2534                                               &expire_ephemerals,
2535                                               NULL);
2536     return;
2537   }
2538 }
2539
2540
2541 /**
2542  * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
2543  * one, cache it and return it.
2544  *
2545  * @param pid peer to look up ephemeral for
2546  * @param private_key[out] set to the private key
2547  * @param ephemeral_key[out] set to the key
2548  * @param ephemeral_sender_sig[out] set to the signature
2549  * @param ephemeral_validity[out] set to the validity expiration time
2550  */
2551 static void
2552 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
2553                   struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
2554                   struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
2555                   struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
2556                   struct GNUNET_TIME_Absolute *ephemeral_validity)
2557 {
2558   struct EphemeralCacheEntry *ece;
2559   struct EphemeralConfirmation ec;
2560
2561   ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
2562                                            pid);
2563   if ( (NULL != ece) &&
2564        (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
2565   {
2566     free_ephemeral (ece);
2567     ece = NULL;
2568   }
2569   if (NULL == ece)
2570   {
2571     ece = GNUNET_new (struct EphemeralCacheEntry);
2572     ece->target = *pid;
2573     ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
2574                                                         EPHEMERAL_VALIDITY);
2575     GNUNET_assert (GNUNET_OK ==
2576                    GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
2577     GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
2578                                         &ece->ephemeral_key);
2579     ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
2580     ec.purpose.size = htonl (sizeof (ec));
2581     ec.target = *pid;
2582     ec.ephemeral_key = ece->ephemeral_key;
2583     GNUNET_assert (GNUNET_OK ==
2584                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
2585                                              &ec.purpose,
2586                                              &ece->sender_sig));
2587     ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
2588                                             ece,
2589                                             ece->ephemeral_validity.abs_value_us);
2590     GNUNET_assert (GNUNET_OK ==
2591                    GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
2592                                                       &ece->target,
2593                                                       ece,
2594                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2595     if (NULL == ephemeral_task)
2596       ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2597                                                 &expire_ephemerals,
2598                                                 NULL);
2599   }
2600   *private_key = ece->private_key;
2601   *ephemeral_key = ece->ephemeral_key;
2602   *ephemeral_sender_sig = ece->sender_sig;
2603   *ephemeral_validity = ece->ephemeral_validity;
2604 }
2605
2606
2607 /**
2608  * We need to transmit @a hdr to @a target.  If necessary, this may
2609  * involve DV routing or even broadcasting and fragmentation.
2610  *
2611  * @param target peer to receive @a hdr
2612  * @param hdr header of the message to route
2613  */
2614 static void
2615 route_message (const struct GNUNET_PeerIdentity *target,
2616                struct GNUNET_MessageHeader *hdr)
2617 {
2618   // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
2619   GNUNET_free (hdr);
2620 }
2621
2622
2623 /**
2624  * Communicator requests backchannel transmission.  Process the request.
2625  *
2626  * @param cls the client
2627  * @param cb the send message that was sent
2628  */
2629 static void
2630 handle_communicator_backchannel (void *cls,
2631                                  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2632 {
2633   struct TransportClient *tc = cls;
2634   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
2635   struct GNUNET_TIME_Absolute ephemeral_validity;
2636   struct TransportBackchannelEncapsulationMessage *enc;
2637   struct TransportBackchannelRequestPayload ppay;
2638   char *mpos;
2639   uint16_t msize;
2640
2641   /* encapsulate and encrypt message */
2642   msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
2643   enc = GNUNET_malloc (sizeof (*enc) + msize);
2644   enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
2645   enc->header.size = htons (sizeof (*enc) + msize);
2646   enc->target = cb->pid;
2647   lookup_ephemeral (&cb->pid,
2648                     &private_key,
2649                     &enc->ephemeral_key,
2650                     &ppay.sender_sig,
2651                     &ephemeral_validity);
2652   // FIXME: setup 'iv'
2653 #if FIXME
2654   dh_key_derive (&private_key,
2655                  &cb->pid,
2656                  &enc->iv,
2657                  &key);
2658 #endif
2659   ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
2660   ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
2661   mpos = (char *) &enc[1];
2662 #if FIXME
2663   encrypt (key,
2664            &ppay,
2665            &mpos,
2666            sizeof (ppay));
2667   encrypt (key,
2668            &cb[1],
2669            &mpos,
2670            ntohs (cb->header.size) - sizeof (*cb));
2671   hmac (key,
2672         &enc->hmac);
2673 #endif
2674   route_message (&cb->pid,
2675                  &enc->header);
2676   GNUNET_SERVICE_client_continue (tc->client);
2677 }
2678
2679
2680 /**
2681  * Address of our peer added.  Test message is well-formed.
2682  *
2683  * @param cls the client
2684  * @param aam the send message that was sent
2685  * @return #GNUNET_OK if message is well-formed
2686  */
2687 static int
2688 check_add_address (void *cls,
2689                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2690 {
2691   struct TransportClient *tc = cls;
2692
2693   if (CT_COMMUNICATOR != tc->type)
2694   {
2695     GNUNET_break (0);
2696     return GNUNET_SYSERR;
2697   }
2698   GNUNET_MQ_check_zero_termination (aam);
2699   return GNUNET_OK;
2700 }
2701
2702
2703 /**
2704  * Ask peerstore to store our address.
2705  *
2706  * @param cls an `struct AddressListEntry *`
2707  */
2708 static void
2709 store_pi (void *cls);
2710
2711
2712 /**
2713  * Function called when peerstore is done storing our address.
2714  */
2715 static void
2716 peerstore_store_cb (void *cls,
2717                     int success)
2718 {
2719   struct AddressListEntry *ale = cls;
2720
2721   ale->sc = NULL;
2722   if (GNUNET_YES != success)
2723     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2724                 "Failed to store our own address `%s' in peerstore!\n",
2725                 ale->address);
2726   /* refresh period is 1/4 of expiration time, that should be plenty
2727      without being excessive. */
2728   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
2729                                                                        4ULL),
2730                                           &store_pi,
2731                                           ale);
2732 }
2733
2734
2735 /**
2736  * Ask peerstore to store our address.
2737  *
2738  * @param cls an `struct AddressListEntry *`
2739  */
2740 static void
2741 store_pi (void *cls)
2742 {
2743   struct AddressListEntry *ale = cls;
2744   void *addr;
2745   size_t addr_len;
2746   struct GNUNET_TIME_Absolute expiration;
2747
2748   ale->st = NULL;
2749   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
2750   GNUNET_HELLO_sign_address (ale->address,
2751                              ale->nt,
2752                              expiration,
2753                              GST_my_private_key,
2754                              &addr,
2755                              &addr_len);
2756   ale->sc = GNUNET_PEERSTORE_store (peerstore,
2757                                     "transport",
2758                                     &GST_my_identity,
2759                                     GNUNET_HELLO_PEERSTORE_KEY,
2760                                     addr,
2761                                     addr_len,
2762                                     expiration,
2763                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
2764                                     &peerstore_store_cb,
2765                                     ale);
2766   GNUNET_free (addr);
2767   if (NULL == ale->sc)
2768   {
2769     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2770                 "Failed to store our address `%s' with peerstore\n",
2771                 ale->address);
2772     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
2773                                             &store_pi,
2774                                             ale);
2775   }
2776 }
2777
2778
2779 /**
2780  * Address of our peer added.  Process the request.
2781  *
2782  * @param cls the client
2783  * @param aam the send message that was sent
2784  */
2785 static void
2786 handle_add_address (void *cls,
2787                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2788 {
2789   struct TransportClient *tc = cls;
2790   struct AddressListEntry *ale;
2791   size_t slen;
2792
2793   slen = ntohs (aam->header.size) - sizeof (*aam);
2794   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
2795   ale->tc = tc;
2796   ale->address = (const char *) &ale[1];
2797   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
2798   ale->aid = aam->aid;
2799   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
2800   memcpy (&ale[1],
2801           &aam[1],
2802           slen);
2803   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
2804                                tc->details.communicator.addr_tail,
2805                                ale);
2806   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
2807                                       ale);
2808   GNUNET_SERVICE_client_continue (tc->client);
2809 }
2810
2811
2812 /**
2813  * Address of our peer deleted.  Process the request.
2814  *
2815  * @param cls the client
2816  * @param dam the send message that was sent
2817  */
2818 static void
2819 handle_del_address (void *cls,
2820                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
2821 {
2822   struct TransportClient *tc = cls;
2823
2824   if (CT_COMMUNICATOR != tc->type)
2825   {
2826     GNUNET_break (0);
2827     GNUNET_SERVICE_client_drop (tc->client);
2828     return;
2829   }
2830   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
2831        NULL != ale;
2832        ale = ale->next)
2833   {
2834     if (dam->aid != ale->aid)
2835       continue;
2836     GNUNET_assert (ale->tc == tc);
2837     free_address_list_entry (ale);
2838     GNUNET_SERVICE_client_continue (tc->client);
2839   }
2840   GNUNET_break (0);
2841   GNUNET_SERVICE_client_drop (tc->client);
2842 }
2843
2844
2845 /**
2846  * Context from #handle_incoming_msg().  Closure for many
2847  * message handlers below.
2848  */
2849 struct CommunicatorMessageContext
2850 {
2851   /**
2852    * Which communicator provided us with the message.
2853    */
2854   struct TransportClient *tc;
2855
2856   /**
2857    * Additional information for flow control and about the sender.
2858    */
2859   struct GNUNET_TRANSPORT_IncomingMessage im;
2860
2861   /**
2862    * Number of hops the message has travelled (if DV-routed).
2863    * FIXME: make use of this in ACK handling!
2864    */
2865   uint16_t total_hops;
2866 };
2867
2868
2869 /**
2870  * Given an inbound message @a msg from a communicator @a cmc,
2871  * demultiplex it based on the type calling the right handler.
2872  *
2873  * @param cmc context for demultiplexing
2874  * @param msg message to demultiplex
2875  */
2876 static void
2877 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
2878                       const struct GNUNET_MessageHeader *msg);
2879
2880
2881 /**
2882  * Send ACK to communicator (if requested) and free @a cmc.
2883  *
2884  * @param cmc context for which we are done handling the message
2885  */
2886 static void
2887 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
2888 {
2889   if (0 != ntohl (cmc->im.fc_on))
2890   {
2891     /* send ACK when done to communicator for flow control! */
2892     struct GNUNET_MQ_Envelope *env;
2893     struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
2894
2895     env = GNUNET_MQ_msg (ack,
2896                          GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
2897     ack->reserved = htonl (0);
2898     ack->fc_id = cmc->im.fc_id;
2899     ack->sender = cmc->im.sender;
2900     GNUNET_MQ_send (cmc->tc->mq,
2901                     env);
2902   }
2903   GNUNET_SERVICE_client_continue (cmc->tc->client);
2904   GNUNET_free (cmc);
2905 }
2906
2907
2908 /**
2909  * Communicator gave us an unencapsulated message to pass as-is to
2910  * CORE.  Process the request.
2911  *
2912  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2913  * @param mh the message that was received
2914  */
2915 static void
2916 handle_raw_message (void *cls,
2917                     const struct GNUNET_MessageHeader *mh)
2918 {
2919   struct CommunicatorMessageContext *cmc = cls;
2920   uint16_t size = ntohs (mh->size);
2921
2922   if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
2923        (size < sizeof (struct GNUNET_MessageHeader)) )
2924   {
2925     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
2926
2927     GNUNET_break (0);
2928     finish_cmc_handling (cmc);
2929     GNUNET_SERVICE_client_drop (client);
2930     return;
2931   }
2932   /* Forward to all CORE clients */
2933   for (struct TransportClient *tc = clients_head;
2934        NULL != tc;
2935        tc = tc->next)
2936   {
2937     struct GNUNET_MQ_Envelope *env;
2938     struct InboundMessage *im;
2939
2940     if (CT_CORE != tc->type)
2941       continue;
2942     env = GNUNET_MQ_msg_extra (im,
2943                                size,
2944                                GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2945     im->peer = cmc->im.sender;
2946     memcpy (&im[1],
2947             mh,
2948             size);
2949     GNUNET_MQ_send (tc->mq,
2950                     env);
2951   }
2952   /* FIXME: consider doing this _only_ once the message
2953      was drained from the CORE MQs to extend flow control to CORE!
2954      (basically, increment counter in cmc, decrement on MQ send continuation! */
2955   finish_cmc_handling (cmc);
2956 }
2957
2958
2959 /**
2960  * Communicator gave us a fragment box.  Check the message.
2961  *
2962  * @param cls a `struct CommunicatorMessageContext`
2963  * @param fb the send message that was sent
2964  * @return #GNUNET_YES if message is well-formed
2965  */
2966 static int
2967 check_fragment_box (void *cls,
2968                     const struct TransportFragmentBox *fb)
2969 {
2970   uint16_t size = ntohs (fb->header.size);
2971   uint16_t bsize = size - sizeof (*fb);
2972
2973   if (0 == bsize)
2974   {
2975     GNUNET_break_op (0);
2976     return GNUNET_SYSERR;
2977   }
2978   if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
2979   {
2980     GNUNET_break_op (0);
2981     return GNUNET_SYSERR;
2982   }
2983   if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
2984   {
2985     GNUNET_break_op (0);
2986     return GNUNET_SYSERR;
2987   }
2988   return GNUNET_YES;
2989 }
2990
2991
2992 /**
2993  * Generate a fragment acknowledgement for an @a rc.
2994  *
2995  * @param rc context to generate ACK for, @a rc ACK state is reset
2996  */
2997 static void
2998 send_fragment_ack (struct ReassemblyContext *rc)
2999 {
3000   struct TransportFragmentAckMessage *ack;
3001
3002   ack = GNUNET_new (struct TransportFragmentAckMessage);
3003   ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3004   ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3005   ack->frag_uuid = htonl (rc->frag_uuid);
3006   ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3007   ack->msg_uuid = rc->msg_uuid;
3008   ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3009   if (0 == rc->msg_missing)
3010     ack->reassembly_timeout
3011       = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3012   else
3013     ack->reassembly_timeout
3014       = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3015   route_message (&rc->neighbour->pid,
3016                  &ack->header);
3017   rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3018   rc->num_acks = 0;
3019   rc->extra_acks = 0LLU;
3020 }
3021
3022
3023 /**
3024  * Communicator gave us a fragment.  Process the request.
3025  *
3026  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3027  * @param fb the message that was received
3028  */
3029 static void
3030 handle_fragment_box (void *cls,
3031                      const struct TransportFragmentBox *fb)
3032 {
3033   struct CommunicatorMessageContext *cmc = cls;
3034   struct Neighbour *n;
3035   struct ReassemblyContext *rc;
3036   const struct GNUNET_MessageHeader *msg;
3037   uint16_t msize;
3038   uint16_t fsize;
3039   uint16_t frag_off;
3040   uint32_t frag_uuid;
3041   char *target;
3042   struct GNUNET_TIME_Relative cdelay;
3043   int ack_now;
3044
3045   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3046                                          &cmc->im.sender);
3047   if (NULL == n)
3048   {
3049     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3050
3051     GNUNET_break (0);
3052     finish_cmc_handling (cmc);
3053     GNUNET_SERVICE_client_drop (client);
3054     return;
3055   }
3056   if (NULL == n->reassembly_map)
3057   {
3058     n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
3059                                                                GNUNET_YES);
3060     n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3061     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
3062                                                                &reassembly_cleanup_task,
3063                                                                n);
3064   }
3065   msize = ntohs (fb->msg_size);
3066   rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
3067                                            &fb->msg_uuid);
3068   if (NULL == rc)
3069   {
3070     rc = GNUNET_malloc (sizeof (*rc) +
3071                         msize + /* reassembly payload buffer */
3072                         (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
3073     rc->msg_uuid = fb->msg_uuid;
3074     rc->neighbour = n;
3075     rc->msg_size = msize;
3076     rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
3077     rc->last_frag = GNUNET_TIME_absolute_get ();
3078     rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
3079                                            rc,
3080                                            rc->reassembly_timeout.abs_value_us);
3081     GNUNET_assert (GNUNET_OK ==
3082                    GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
3083                                                        &rc->msg_uuid,
3084                                                        rc,
3085                                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3086     target = (char *) &rc[1];
3087     rc->bitfield = (uint8_t *) (target + rc->msg_size);
3088     rc->msg_missing = rc->msg_size;
3089   }
3090   else
3091   {
3092     target = (char *) &rc[1];
3093   }
3094   if (msize != rc->msg_size)
3095   {
3096     GNUNET_break (0);
3097     finish_cmc_handling (cmc);
3098     return;
3099   }
3100
3101   /* reassemble */
3102   fsize = ntohs (fb->header.size) - sizeof (*fb);
3103   frag_off = ntohs (fb->frag_off);
3104   memcpy (&target[frag_off],
3105           &fb[1],
3106           fsize);
3107   /* update bitfield and msg_missing */
3108   for (unsigned int i=frag_off;i<frag_off+fsize;i++)
3109   {
3110     if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
3111     {
3112       rc->bitfield[i / 8] |= (1 << (i % 8));
3113       rc->msg_missing--;
3114     }
3115   }
3116
3117   /* Compute cummulative ACK */
3118   frag_uuid = ntohl (fb->frag_uuid);
3119   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
3120   cdelay = GNUNET_TIME_relative_multiply (cdelay,
3121                                           rc->num_acks);
3122   rc->last_frag = GNUNET_TIME_absolute_get ();
3123   rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
3124                                                 cdelay);
3125   ack_now = GNUNET_NO;
3126   if (0 == rc->num_acks)
3127   {
3128     /* case one: first ack */
3129     rc->frag_uuid = frag_uuid;
3130     rc->extra_acks = 0LLU;
3131     rc->num_acks = 1;
3132   }
3133   else if ( (frag_uuid >= rc->frag_uuid) &&
3134             (frag_uuid <= rc->frag_uuid + 64) )
3135   {
3136     /* case two: ack fits after existing min UUID */
3137     if ( (frag_uuid == rc->frag_uuid) ||
3138          (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
3139     {
3140       /* duplicate fragment, ack now! */
3141       ack_now = GNUNET_YES;
3142     }
3143     else
3144     {
3145       rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
3146       rc->num_acks++;
3147     }
3148   }
3149   else if ( (rc->frag_uuid > frag_uuid) &&
3150             ( ( (rc->frag_uuid == frag_uuid + 64) &&
3151                 (0 == rc->extra_acks) ) ||
3152               ( (rc->frag_uuid < frag_uuid + 64) &&
3153                 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
3154   {
3155     /* can fit ack by shifting extra acks and starting at
3156        frag_uid, test above esured that the bits we will
3157        shift 'extra_acks' by are all zero. */
3158     rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
3159     rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
3160     rc->frag_uuid = frag_uuid;
3161     rc->num_acks++;
3162   }
3163   if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
3164     ack_now = GNUNET_YES; /* maximum acks received */
3165   // FIXME: possibly also ACK based on RTT (but for that we'd need to
3166   // determine the session used for the ACK first!)
3167
3168   /* is reassembly complete? */
3169   if (0 != rc->msg_missing)
3170   {
3171     if (ack_now)
3172       send_fragment_ack (rc);
3173     finish_cmc_handling (cmc);
3174     return;
3175   }
3176   /* reassembly is complete, verify result */
3177   msg = (const struct GNUNET_MessageHeader *) &rc[1];
3178   if (ntohs (msg->size) != rc->msg_size)
3179   {
3180     GNUNET_break (0);
3181     free_reassembly_context (rc);
3182     finish_cmc_handling (cmc);
3183     return;
3184   }
3185   /* successful reassembly */
3186   send_fragment_ack (rc);
3187   demultiplex_with_cmc (cmc,
3188                         msg);
3189   /* FIXME: really free here? Might be bad if fragments are still
3190      en-route and we forget that we finished this reassembly immediately!
3191      -> keep around until timeout?
3192      -> shorten timeout based on ACK? */
3193   free_reassembly_context (rc);
3194 }
3195
3196
3197 /**
3198  * Communicator gave us a fragment acknowledgement.  Process the request.
3199  *
3200  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3201  * @param fa the message that was received
3202  */
3203 static void
3204 handle_fragment_ack (void *cls,
3205                      const struct TransportFragmentAckMessage *fa)
3206 {
3207   struct CommunicatorMessageContext *cmc = cls;
3208
3209   // FIXME: do work: identify original message; then identify fragments being acked;
3210   // remove those from the tree to prevent retransmission;
3211   // compute RTT
3212   // if entire message is ACKed, handle that as well.
3213   finish_cmc_handling (cmc);
3214 }
3215
3216
3217 /**
3218  * Communicator gave us a reliability box.  Check the message.
3219  *
3220  * @param cls a `struct CommunicatorMessageContext`
3221  * @param rb the send message that was sent
3222  * @return #GNUNET_YES if message is well-formed
3223  */
3224 static int
3225 check_reliability_box (void *cls,
3226                        const struct TransportReliabilityBox *rb)
3227 {
3228   GNUNET_MQ_check_boxed_message (rb);
3229   return GNUNET_YES;
3230 }
3231
3232
3233 /**
3234  * Communicator gave us a reliability box.  Process the request.
3235  *
3236  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3237  * @param rb the message that was received
3238  */
3239 static void
3240 handle_reliability_box (void *cls,
3241                         const struct TransportReliabilityBox *rb)
3242 {
3243   struct CommunicatorMessageContext *cmc = cls;
3244   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
3245
3246   if (0 == ntohl (rb->ack_countdown))
3247   {
3248     struct TransportReliabilityAckMessage *ack;
3249
3250     /* FIXME: implement cummulative ACKs and ack_countdown,
3251        then setting the avg_ack_delay field below: */
3252     ack = GNUNET_malloc (sizeof (*ack) +
3253                          sizeof (struct GNUNET_ShortHashCode));
3254     ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
3255     ack->header.size = htons (sizeof (*ack) +
3256                               sizeof (struct GNUNET_ShortHashCode));
3257     memcpy (&ack[1],
3258             &rb->msg_uuid,
3259             sizeof (struct GNUNET_ShortHashCode));
3260     route_message (&cmc->im.sender,
3261                    &ack->header);
3262   }
3263   /* continue with inner message */
3264   demultiplex_with_cmc (cmc,
3265                         inbox);
3266 }
3267
3268
3269 /**
3270  * Communicator gave us a reliability ack.  Process the request.
3271  *
3272  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3273  * @param ra the message that was received
3274  */
3275 static void
3276 handle_reliability_ack (void *cls,
3277                         const struct TransportReliabilityAckMessage *ra)
3278 {
3279   struct CommunicatorMessageContext *cmc = cls;
3280
3281   // FIXME: do work: find message that was acknowledged, and
3282   // remove from transmission queue; update RTT.
3283   finish_cmc_handling (cmc);
3284 }
3285
3286
3287 /**
3288  * Communicator gave us a backchannel encapsulation.  Check the message.
3289  *
3290  * @param cls a `struct CommunicatorMessageContext`
3291  * @param be the send message that was sent
3292  * @return #GNUNET_YES if message is well-formed
3293  */
3294 static int
3295 check_backchannel_encapsulation (void *cls,
3296                                  const struct TransportBackchannelEncapsulationMessage *be)
3297 {
3298   uint16_t size = ntohs (be->header.size);
3299
3300   if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
3301   {
3302     GNUNET_break_op (0);
3303     return GNUNET_SYSERR;
3304   }
3305   return GNUNET_YES;
3306 }
3307
3308
3309 /**
3310  * Communicator gave us a backchannel encapsulation.  Process the request.
3311  *
3312  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3313  * @param be the message that was received
3314  */
3315 static void
3316 handle_backchannel_encapsulation (void *cls,
3317                                   const struct TransportBackchannelEncapsulationMessage *be)
3318 {
3319   struct CommunicatorMessageContext *cmc = cls;
3320
3321   if (0 != memcmp (&be->target,
3322                    &GST_my_identity,
3323                    sizeof (struct GNUNET_PeerIdentity)))
3324   {
3325     /* not for me, try to route to target */
3326     route_message (&be->target,
3327                    GNUNET_copy_message (&be->header));
3328     finish_cmc_handling (cmc);
3329     return;
3330   }
3331   // FIXME: compute shared secret
3332   // FIXME: check HMAC
3333   // FIXME: decrypt payload
3334   // FIXME: forward to specified communicator!
3335   // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
3336   finish_cmc_handling (cmc);
3337 }
3338
3339
3340 /**
3341  * Communicator gave us a DV learn message.  Check the message.
3342  *
3343  * @param cls a `struct CommunicatorMessageContext`
3344  * @param dvl the send message that was sent
3345  * @return #GNUNET_YES if message is well-formed
3346  */
3347 static int
3348 check_dv_learn (void *cls,
3349                 const struct TransportDVLearn *dvl)
3350 {
3351   uint16_t size = ntohs (dvl->header.size);
3352   uint16_t num_hops = ntohs (dvl->num_hops);
3353   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvl[1];
3354
3355   if (size != sizeof (*dvl) + num_hops * sizeof (struct GNUNET_PeerIdentity))
3356   {
3357     GNUNET_break_op (0);
3358     return GNUNET_SYSERR;
3359   }
3360   for (unsigned int i=0;i<num_hops;i++)
3361   {
3362     if (0 == memcmp (&dvl->initiator,
3363                      &hops[i],
3364                      sizeof (struct GNUNET_PeerIdentity)))
3365     {
3366       GNUNET_break_op (0);
3367       return GNUNET_SYSERR;
3368     }
3369     if (0 == memcmp (&GST_my_identity,
3370                      &hops[i],
3371                      sizeof (struct GNUNET_PeerIdentity)))
3372     {
3373       GNUNET_break_op (0);
3374       return GNUNET_SYSERR;
3375     }
3376   }
3377   return GNUNET_YES;
3378 }
3379
3380
3381 /**
3382  * Communicator gave us a DV learn message.  Process the request.
3383  *
3384  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3385  * @param dvl the message that was received
3386  */
3387 static void
3388 handle_dv_learn (void *cls,
3389                  const struct TransportDVLearn *dvl)
3390 {
3391   struct CommunicatorMessageContext *cmc = cls;
3392
3393   // FIXME: learn path from DV message (if bi-directional flags are set)
3394   // FIXME: expand DV message, forward on (unless path is getting too long)
3395   finish_cmc_handling (cmc);
3396 }
3397
3398
3399 /**
3400  * Communicator gave us a DV box.  Check the message.
3401  *
3402  * @param cls a `struct CommunicatorMessageContext`
3403  * @param dvb the send message that was sent
3404  * @return #GNUNET_YES if message is well-formed
3405  */
3406 static int
3407 check_dv_box (void *cls,
3408               const struct TransportDVBox *dvb)
3409 {
3410   uint16_t size = ntohs (dvb->header.size);
3411   uint16_t num_hops = ntohs (dvb->num_hops);
3412   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3413   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3414   uint16_t isize;
3415   uint16_t itype;
3416
3417   if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
3418   {
3419     GNUNET_break_op (0);
3420     return GNUNET_SYSERR;
3421   }
3422   isize = ntohs (inbox->size);
3423   if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
3424   {
3425     GNUNET_break_op (0);
3426     return GNUNET_SYSERR;
3427   }
3428   itype = ntohs (inbox->type);
3429   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
3430        (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
3431   {
3432     GNUNET_break_op (0);
3433     return GNUNET_SYSERR;
3434   }
3435   return GNUNET_YES;
3436 }
3437
3438
3439 /**
3440  * Communicator gave us a DV box.  Process the request.
3441  *
3442  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3443  * @param dvb the message that was received
3444  */
3445 static void
3446 handle_dv_box (void *cls,
3447                const struct TransportDVBox *dvb)
3448 {
3449   struct CommunicatorMessageContext *cmc = cls;
3450   uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
3451   uint16_t num_hops = ntohs (dvb->num_hops);
3452   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3453   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3454
3455   if (num_hops > 0)
3456   {
3457     // FIXME: if we are not the target, shorten path and forward along.
3458     // Try from the _end_ of hops array if we know the given
3459     // neighbour (shortening the path!).
3460     // NOTE: increment total_hops!
3461     finish_cmc_handling (cmc);
3462     return;
3463   }
3464   /* We are the target. Unbox and handle message. */
3465   cmc->im.sender = dvb->origin;
3466   cmc->total_hops = ntohs (dvb->total_hops);
3467   demultiplex_with_cmc (cmc,
3468                         inbox);
3469 }
3470
3471
3472 /**
3473  * Client notified us about transmission from a peer.  Process the request.
3474  *
3475  * @param cls a `struct TransportClient` which sent us the message
3476  * @param obm the send message that was sent
3477  * @return #GNUNET_YES if message is well-formed
3478  */
3479 static int
3480 check_incoming_msg (void *cls,
3481                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
3482 {
3483   struct TransportClient *tc = cls;
3484
3485   if (CT_COMMUNICATOR != tc->type)
3486   {
3487     GNUNET_break (0);
3488     return GNUNET_SYSERR;
3489   }
3490   GNUNET_MQ_check_boxed_message (im);
3491   return GNUNET_OK;
3492 }
3493
3494
3495 /**
3496  * Incoming meessage.  Process the request.
3497  *
3498  * @param im the send message that was received
3499  */
3500 static void
3501 handle_incoming_msg (void *cls,
3502                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
3503 {
3504   struct TransportClient *tc = cls;
3505   struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
3506
3507   cmc->tc = tc;
3508   cmc->im = *im;
3509   demultiplex_with_cmc (cmc,
3510                         (const struct GNUNET_MessageHeader *) &im[1]);
3511 }
3512
3513
3514 /**
3515  * Given an inbound message @a msg from a communicator @a cmc,
3516  * demultiplex it based on the type calling the right handler.
3517  *
3518  * @param cmc context for demultiplexing
3519  * @param msg message to demultiplex
3520  */
3521 static void
3522 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3523                       const struct GNUNET_MessageHeader *msg)
3524 {
3525   struct GNUNET_MQ_MessageHandler handlers[] = {
3526     GNUNET_MQ_hd_var_size (fragment_box,
3527                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
3528                            struct TransportFragmentBox,
3529                            &cmc),
3530     GNUNET_MQ_hd_fixed_size (fragment_ack,
3531                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
3532                              struct TransportFragmentAckMessage,
3533                              &cmc),
3534     GNUNET_MQ_hd_var_size (reliability_box,
3535                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
3536                            struct TransportReliabilityBox,
3537                            &cmc),
3538     GNUNET_MQ_hd_fixed_size (reliability_ack,
3539                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
3540                              struct TransportReliabilityAckMessage,
3541                              &cmc),
3542     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
3543                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
3544                            struct TransportBackchannelEncapsulationMessage,
3545                            &cmc),
3546     GNUNET_MQ_hd_var_size (dv_learn,
3547                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
3548                            struct TransportDVLearn,
3549                            &cmc),
3550     GNUNET_MQ_hd_var_size (dv_box,
3551                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
3552                            struct TransportDVBox,
3553                            &cmc),
3554     GNUNET_MQ_handler_end()
3555   };
3556   int ret;
3557
3558   ret = GNUNET_MQ_handle_message (handlers,
3559                                   msg);
3560   if (GNUNET_SYSERR == ret)
3561   {
3562     GNUNET_break (0);
3563     GNUNET_SERVICE_client_drop (cmc->tc->client);
3564     GNUNET_free (cmc);
3565     return;
3566   }
3567   if (GNUNET_NO == ret)
3568   {
3569     /* unencapsulated 'raw' message */
3570     handle_raw_message (&cmc,
3571                         msg);
3572   }
3573 }
3574
3575
3576 /**
3577  * New queue became available.  Check message.
3578  *
3579  * @param cls the client
3580  * @param aqm the send message that was sent
3581  */
3582 static int
3583 check_add_queue_message (void *cls,
3584                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
3585 {
3586   struct TransportClient *tc = cls;
3587
3588   if (CT_COMMUNICATOR != tc->type)
3589   {
3590     GNUNET_break (0);
3591     return GNUNET_SYSERR;
3592   }
3593   GNUNET_MQ_check_zero_termination (aqm);
3594   return GNUNET_OK;
3595 }
3596
3597
3598 /**
3599  * Bandwidth tracker informs us that the delay until we should receive
3600  * more has changed.
3601  *
3602  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
3603  */
3604 static void
3605 tracker_update_in_cb (void *cls)
3606 {
3607   struct GNUNET_ATS_Session *queue = cls;
3608   struct GNUNET_TIME_Relative in_delay;
3609   unsigned int rsize;
3610
3611   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
3612   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
3613                                                  rsize);
3614   // FIXME: how exactly do we do inbound flow control?
3615 }
3616
3617
3618 /**
3619  * If necessary, generates the UUID for a @a pm
3620  *
3621  * @param pm pending message to generate UUID for.
3622  */
3623 static void
3624 set_pending_message_uuid (struct PendingMessage *pm)
3625 {
3626   if (pm->msg_uuid_set)
3627     return;
3628   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
3629                               &pm->msg_uuid,
3630                               sizeof (pm->msg_uuid));
3631   pm->msg_uuid_set = GNUNET_YES;
3632 }
3633
3634
3635 /**
3636  * Fragment the given @a pm to the given @a mtu.  Adds
3637  * additional fragments to the neighbour as well. If the
3638  * @a mtu is too small, generates and error for the @a pm
3639  * and returns NULL.
3640  *
3641  * @param pm pending message to fragment for transmission
3642  * @param mtu MTU to apply
3643  * @return new message to transmit
3644  */
3645 static struct PendingMessage *
3646 fragment_message (struct PendingMessage *pm,
3647                   uint16_t mtu)
3648 {
3649   struct PendingMessage *ff;
3650
3651   set_pending_message_uuid (pm);
3652
3653   /* This invariant is established in #handle_add_queue_message() */
3654   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
3655
3656   /* select fragment for transmission, descending the tree if it has
3657      been expanded until we are at a leaf or at a fragment that is small enough */
3658   ff = pm;
3659   while ( ( (ff->bytes_msg > mtu) ||
3660             (pm == ff) ) &&
3661           (ff->frag_off == ff->bytes_msg) &&
3662           (NULL != ff->head_frag) )
3663   {
3664     ff = ff->head_frag; /* descent into fragmented fragments */
3665   }
3666
3667   if ( ( (ff->bytes_msg > mtu) ||
3668          (pm == ff) ) &&
3669        (pm->frag_off < pm->bytes_msg) )
3670   {
3671     /* Did not yet calculate all fragments, calculate next fragment */
3672     struct PendingMessage *frag;
3673     struct TransportFragmentBox tfb;
3674     const char *orig;
3675     char *msg;
3676     uint16_t fragmax;
3677     uint16_t fragsize;
3678     uint16_t msize;
3679     uint16_t xoff = 0;
3680
3681     orig = (const char *) &ff[1];
3682     msize = ff->bytes_msg;
3683     if (pm != ff)
3684     {
3685       const struct TransportFragmentBox *tfbo;
3686
3687       tfbo = (const struct TransportFragmentBox *) orig;
3688       orig += sizeof (struct TransportFragmentBox);
3689       msize -= sizeof (struct TransportFragmentBox);
3690       xoff = ntohs (tfbo->frag_off);
3691     }
3692     fragmax = mtu - sizeof (struct TransportFragmentBox);
3693     fragsize = GNUNET_MIN (msize - ff->frag_off,
3694                            fragmax);
3695     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
3696                           sizeof (struct TransportFragmentBox) +
3697                           fragsize);
3698     frag->target = pm->target;
3699     frag->frag_parent = ff;
3700     frag->timeout = pm->timeout;
3701     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
3702     frag->pmt = PMT_FRAGMENT_BOX;
3703     msg = (char *) &frag[1];
3704     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
3705     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
3706                              fragsize);
3707     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
3708     tfb.msg_uuid = pm->msg_uuid;
3709     tfb.frag_off = htons (ff->frag_off + xoff);
3710     tfb.msg_size = htons (pm->bytes_msg);
3711     memcpy (msg,
3712             &tfb,
3713             sizeof (tfb));
3714     memcpy (&msg[sizeof (tfb)],
3715             &orig[ff->frag_off],
3716             fragsize);
3717     GNUNET_CONTAINER_MDLL_insert (frag,
3718                                   ff->head_frag,
3719                                   ff->tail_frag,
3720                                   frag);
3721     ff->frag_off += fragsize;
3722     ff = frag;
3723   }
3724
3725   /* Move head to the tail and return it */
3726   GNUNET_CONTAINER_MDLL_remove (frag,
3727                                 ff->frag_parent->head_frag,
3728                                 ff->frag_parent->tail_frag,
3729                                 ff);
3730   GNUNET_CONTAINER_MDLL_insert_tail (frag,
3731                                      ff->frag_parent->head_frag,
3732                                      ff->frag_parent->tail_frag,
3733                                      ff);
3734   return ff;
3735 }
3736
3737
3738 /**
3739  * Reliability-box the given @a pm. On error (can there be any), NULL
3740  * may be returned, otherwise the "replacement" for @a pm (which
3741  * should then be added to the respective neighbour's queue instead of
3742  * @a pm).  If the @a pm is already fragmented or reliability boxed,
3743  * or itself an ACK, this function simply returns @a pm.
3744  *
3745  * @param pm pending message to box for transmission over unreliabile queue
3746  * @return new message to transmit
3747  */
3748 static struct PendingMessage *
3749 reliability_box_message (struct PendingMessage *pm)
3750 {
3751   struct TransportReliabilityBox rbox;
3752   struct PendingMessage *bpm;
3753   char *msg;
3754
3755   if (PMT_CORE != pm->pmt)
3756     return pm;  /* already fragmented or reliability boxed, or control message: do nothing */
3757   if (NULL != pm->bpm)
3758     return pm->bpm; /* already computed earlier: do nothing */
3759   GNUNET_assert (NULL == pm->head_frag);
3760   if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
3761   {
3762     /* failed hard */
3763     GNUNET_break (0);
3764     client_send_response (pm,
3765                           GNUNET_NO,
3766                           0);
3767     return NULL;
3768   }
3769   bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
3770                        sizeof (rbox) +
3771                        pm->bytes_msg);
3772   bpm->target = pm->target;
3773   bpm->frag_parent = pm;
3774   GNUNET_CONTAINER_MDLL_insert (frag,
3775                                 pm->head_frag,
3776                                 pm->tail_frag,
3777                                 bpm);
3778   bpm->timeout = pm->timeout;
3779   bpm->pmt = PMT_RELIABILITY_BOX;
3780   bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
3781   set_pending_message_uuid (bpm);
3782   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
3783   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
3784   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
3785   rbox.msg_uuid = pm->msg_uuid;
3786   msg = (char *) &bpm[1];
3787   memcpy (msg,
3788           &rbox,
3789           sizeof (rbox));
3790   memcpy (&msg[sizeof (rbox)],
3791           &pm[1],
3792           pm->bytes_msg);
3793   pm->bpm = bpm;
3794   return bpm;
3795 }
3796
3797
3798 /**
3799  * We believe we are ready to transmit a message on a queue. Double-checks
3800  * with the queue's "tracker_out" and then gives the message to the
3801  * communicator for transmission (updating the tracker, and re-scheduling
3802  * itself if applicable).
3803  *
3804  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
3805  */
3806 static void
3807 transmit_on_queue (void *cls)
3808 {
3809   struct GNUNET_ATS_Session *queue = cls;
3810   struct Neighbour *n = queue->neighbour;
3811   struct QueueEntry *qe;
3812   struct PendingMessage *pm;
3813   struct PendingMessage *s;
3814   uint32_t overhead;
3815   struct GNUNET_TRANSPORT_SendMessageTo *smt;
3816   struct GNUNET_MQ_Envelope *env;
3817
3818   queue->transmit_task = NULL;
3819   if (NULL == (pm = n->pending_msg_head))
3820   {
3821     /* no message pending, nothing to do here! */
3822     return;
3823   }
3824   schedule_transmit_on_queue (queue);
3825   if (NULL != queue->transmit_task)
3826     return; /* do it later */
3827   overhead = 0;
3828   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3829     overhead += sizeof (struct TransportReliabilityBox);
3830   s = pm;
3831   if ( ( (0 != queue->mtu) &&
3832          (pm->bytes_msg + overhead > queue->mtu) ) ||
3833        (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
3834        (NULL != pm->head_frag /* fragments already exist, should
3835                                  respect that even if MTU is 0 for
3836                                  this queue */) )
3837     s = fragment_message (s,
3838                           (0 == queue->mtu)
3839                           ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
3840                           : queue->mtu);
3841   if (NULL == s)
3842   {
3843     /* Fragmentation failed, try next message... */
3844     schedule_transmit_on_queue (queue);
3845     return;
3846   }
3847   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3848     s = reliability_box_message (s);
3849   if (NULL == s)
3850   {
3851     /* Reliability boxing failed, try next message... */
3852     schedule_transmit_on_queue (queue);
3853     return;
3854   }
3855
3856   /* Pass 's' for transission to the communicator */
3857   qe = GNUNET_new (struct QueueEntry);
3858   qe->mid = queue->mid_gen++;
3859   qe->session = queue;
3860   // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
3861   GNUNET_CONTAINER_DLL_insert (queue->queue_head,
3862                                queue->queue_tail,
3863                                qe);
3864   env = GNUNET_MQ_msg_extra (smt,
3865                              s->bytes_msg,
3866                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
3867   smt->qid = queue->qid;
3868   smt->mid = qe->mid;
3869   smt->receiver = n->pid;
3870   memcpy (&smt[1],
3871           &s[1],
3872           s->bytes_msg);
3873   GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3874   queue->queue_length++;
3875   queue->tc->details.communicator.total_queue_length++;
3876   GNUNET_MQ_send (queue->tc->mq,
3877                   env);
3878
3879   // FIXME: do something similar to the logic below
3880   // in defragmentation / reliability ACK handling!
3881
3882   /* Check if this transmission somehow conclusively finished handing 'pm'
3883      even without any explicit ACKs */
3884   if ( (PMT_CORE == s->pmt) &&
3885        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
3886   {
3887     /* Full message sent, and over reliabile channel */
3888     client_send_response (pm,
3889                           GNUNET_YES,
3890                           pm->bytes_msg);
3891   }
3892   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
3893             (PMT_FRAGMENT_BOX == s->pmt) )
3894   {
3895     struct PendingMessage *pos;
3896
3897     /* Fragment sent over reliabile channel */
3898     free_fragment_tree (s);
3899     pos = s->frag_parent;
3900     GNUNET_CONTAINER_MDLL_remove (frag,
3901                                   pos->head_frag,
3902                                   pos->tail_frag,
3903                                   s);
3904     GNUNET_free (s);
3905     /* check if subtree is done */
3906     while ( (NULL == pos->head_frag) &&
3907             (pos->frag_off == pos->bytes_msg) &&
3908             (pos != pm) )
3909     {
3910       s = pos;
3911       pos = s->frag_parent;
3912       GNUNET_CONTAINER_MDLL_remove (frag,
3913                                     pos->head_frag,
3914                                     pos->tail_frag,
3915                                     s);
3916       GNUNET_free (s);
3917     }
3918
3919     /* Was this the last applicable fragmment? */
3920     if ( (NULL == pm->head_frag) &&
3921          (pm->frag_off == pm->bytes_msg) )
3922       client_send_response (pm,
3923                             GNUNET_YES,
3924                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
3925   }
3926   else if (PMT_CORE != pm->pmt)
3927   {
3928     /* This was an acknowledgement of some type, always free */
3929     free_pending_message (pm);
3930   }
3931   else
3932   {
3933     /* message not finished, waiting for acknowledgement */
3934     struct Neighbour *neighbour = pm->target;
3935     /* Update time by which we might retransmit 's' based on queue
3936        characteristics (i.e. RTT); it takes one RTT for the message to
3937        arrive and the ACK to come back in the best case; but the other
3938        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
3939        retransmitting.  Note that in the future this heuristic should
3940        likely be improved further (measure RTT stability, consider
3941        message urgency and size when delaying ACKs, etc.) */
3942     s->next_attempt = GNUNET_TIME_relative_to_absolute
3943       (GNUNET_TIME_relative_multiply (queue->rtt,
3944                                       4));
3945     if (s == pm)
3946     {
3947       struct PendingMessage *pos;
3948
3949       /* re-insert sort in neighbour list */
3950       GNUNET_CONTAINER_MDLL_remove (neighbour,
3951                                     neighbour->pending_msg_head,
3952                                     neighbour->pending_msg_tail,
3953                                     pm);
3954       pos = neighbour->pending_msg_tail;
3955       while ( (NULL != pos) &&
3956               (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3957         pos = pos->prev_neighbour;
3958       GNUNET_CONTAINER_MDLL_insert_after (neighbour,
3959                                           neighbour->pending_msg_head,
3960                                           neighbour->pending_msg_tail,
3961                                           pos,
3962                                           pm);
3963     }
3964     else
3965     {
3966       /* re-insert sort in fragment list */
3967       struct PendingMessage *fp = s->frag_parent;
3968       struct PendingMessage *pos;
3969
3970       GNUNET_CONTAINER_MDLL_remove (frag,
3971                                     fp->head_frag,
3972                                     fp->tail_frag,
3973                                     s);
3974       pos = fp->tail_frag;
3975       while ( (NULL != pos) &&
3976               (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3977         pos = pos->prev_frag;
3978       GNUNET_CONTAINER_MDLL_insert_after (frag,
3979                                           fp->head_frag,
3980                                           fp->tail_frag,
3981                                           pos,
3982                                           s);
3983     }
3984   }
3985
3986   /* finally, re-schedule queue transmission task itself */
3987   schedule_transmit_on_queue (queue);
3988 }
3989
3990
3991 /**
3992  * Bandwidth tracker informs us that the delay until we
3993  * can transmit again changed.
3994  *
3995  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
3996  */
3997 static void
3998 tracker_update_out_cb (void *cls)
3999 {
4000   struct GNUNET_ATS_Session *queue = cls;
4001   struct Neighbour *n = queue->neighbour;
4002
4003   if (NULL == n->pending_msg_head)
4004   {
4005     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4006                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
4007                 queue->address);
4008     return; /* no message pending, nothing to do here! */
4009   }
4010   GNUNET_SCHEDULER_cancel (queue->transmit_task);
4011   queue->transmit_task = NULL;
4012   schedule_transmit_on_queue (queue);
4013 }
4014
4015
4016 /**
4017  * Bandwidth tracker informs us that excessive outbound bandwidth was
4018  * allocated which is not being used.
4019  *
4020  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
4021  */
4022 static void
4023 tracker_excess_out_cb (void *cls)
4024 {
4025   /* FIXME: trigger excess bandwidth report to core? Right now,
4026      this is done internally within transport_api2_core already,
4027      but we probably want to change the logic and trigger it
4028      from here via a message instead! */
4029   /* TODO: maybe inform ATS at this point? */
4030   GNUNET_STATISTICS_update (GST_stats,
4031                             "# Excess outbound bandwidth reported",
4032                             1,
4033                             GNUNET_NO);
4034 }
4035
4036
4037
4038 /**
4039  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
4040  * which is not being used.
4041  *
4042  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
4043  */
4044 static void
4045 tracker_excess_in_cb (void *cls)
4046 {
4047   /* TODO: maybe inform ATS at this point? */
4048   GNUNET_STATISTICS_update (GST_stats,
4049                             "# Excess inbound bandwidth reported",
4050                             1,
4051                             GNUNET_NO);
4052 }
4053
4054
4055 /**
4056  * New queue became available.  Process the request.
4057  *
4058  * @param cls the client
4059  * @param aqm the send message that was sent
4060  */
4061 static void
4062 handle_add_queue_message (void *cls,
4063                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
4064 {
4065   struct TransportClient *tc = cls;
4066   struct GNUNET_ATS_Session *queue;
4067   struct Neighbour *neighbour;
4068   const char *addr;
4069   uint16_t addr_len;
4070
4071   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
4072   {
4073     /* MTU so small as to be useless for transmissions,
4074        required for #fragment_message()! */
4075     GNUNET_break_op (0);
4076     GNUNET_SERVICE_client_drop (tc->client);
4077     return;
4078   }
4079   neighbour = lookup_neighbour (&aqm->receiver);
4080   if (NULL == neighbour)
4081   {
4082     neighbour = GNUNET_new (struct Neighbour);
4083     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
4084     neighbour->pid = aqm->receiver;
4085     GNUNET_assert (GNUNET_OK ==
4086                    GNUNET_CONTAINER_multipeermap_put (neighbours,
4087                                                       &neighbour->pid,
4088                                                       neighbour,
4089                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4090     cores_send_connect_info (&neighbour->pid,
4091                              GNUNET_BANDWIDTH_ZERO);
4092   }
4093   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
4094   addr = (const char *) &aqm[1];
4095
4096   queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
4097   queue->tc = tc;
4098   queue->address = (const char *) &queue[1];
4099   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
4100   queue->qid = aqm->qid;
4101   queue->mtu = ntohl (aqm->mtu);
4102   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
4103   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
4104   queue->neighbour = neighbour;
4105   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
4106                                   &tracker_update_in_cb,
4107                                   queue,
4108                                   GNUNET_BANDWIDTH_ZERO,
4109                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4110                                   &tracker_excess_in_cb,
4111                                   queue);
4112   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
4113                                   &tracker_update_out_cb,
4114                                   queue,
4115                                   GNUNET_BANDWIDTH_ZERO,
4116                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4117                                   &tracker_excess_out_cb,
4118                                   queue);
4119   memcpy (&queue[1],
4120           addr,
4121           addr_len);
4122   /* notify ATS about new queue */
4123   {
4124     struct GNUNET_ATS_Properties prop = {
4125       .delay = GNUNET_TIME_UNIT_FOREVER_REL,
4126       .mtu = queue->mtu,
4127       .nt = queue->nt,
4128       .cc = tc->details.communicator.cc
4129     };
4130
4131     queue->sr = GNUNET_ATS_session_add (ats,
4132                                         &neighbour->pid,
4133                                         queue->address,
4134                                         queue,
4135                                         &prop);
4136     if  (NULL == queue->sr)
4137     {
4138       /* This can only happen if the 'address' was way too long for ATS
4139          (approaching 64k in strlen()!). In this case, the communicator
4140          must be buggy and we drop it. */
4141       GNUNET_break (0);
4142       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
4143       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
4144       GNUNET_free (queue);
4145       if (NULL == neighbour->session_head)
4146       {
4147         cores_send_disconnect_info (&neighbour->pid);
4148         free_neighbour (neighbour);
4149       }
4150       GNUNET_SERVICE_client_drop (tc->client);
4151       return;
4152     }
4153   }
4154   /* notify monitors about new queue */
4155   {
4156     struct MonitorEvent me = {
4157       .rtt = queue->rtt,
4158       .cs = queue->cs
4159     };
4160
4161     notify_monitors (&neighbour->pid,
4162                      queue->address,
4163                      queue->nt,
4164                      &me);
4165   }
4166   GNUNET_CONTAINER_MDLL_insert (neighbour,
4167                                 neighbour->session_head,
4168                                 neighbour->session_tail,
4169                                 queue);
4170   GNUNET_CONTAINER_MDLL_insert (client,
4171                                 tc->details.communicator.session_head,
4172                                 tc->details.communicator.session_tail,
4173                                 queue);
4174   GNUNET_SERVICE_client_continue (tc->client);
4175 }
4176
4177
4178 /**
4179  * Queue to a peer went down.  Process the request.
4180  *
4181  * @param cls the client
4182  * @param dqm the send message that was sent
4183  */
4184 static void
4185 handle_del_queue_message (void *cls,
4186                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
4187 {
4188   struct TransportClient *tc = cls;
4189
4190   if (CT_COMMUNICATOR != tc->type)
4191   {
4192     GNUNET_break (0);
4193     GNUNET_SERVICE_client_drop (tc->client);
4194     return;
4195   }
4196   for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4197        NULL != session;
4198        session = session->next_client)
4199   {
4200     struct Neighbour *neighbour = session->neighbour;
4201
4202     if ( (dqm->qid != session->qid) ||
4203          (0 != memcmp (&dqm->receiver,
4204                        &neighbour->pid,
4205                        sizeof (struct GNUNET_PeerIdentity))) )
4206       continue;
4207     free_session (session);
4208     GNUNET_SERVICE_client_continue (tc->client);
4209     return;
4210   }
4211   GNUNET_break (0);
4212   GNUNET_SERVICE_client_drop (tc->client);
4213 }
4214
4215
4216 /**
4217  * Message was transmitted.  Process the request.
4218  *
4219  * @param cls the client
4220  * @param sma the send message that was sent
4221  */
4222 static void
4223 handle_send_message_ack (void *cls,
4224                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
4225 {
4226   struct TransportClient *tc = cls;
4227   struct QueueEntry *queue;
4228
4229   if (CT_COMMUNICATOR != tc->type)
4230   {
4231     GNUNET_break (0);
4232     GNUNET_SERVICE_client_drop (tc->client);
4233     return;
4234   }
4235
4236   /* find our queue entry matching the ACK */
4237   queue = NULL;
4238   for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4239        NULL != session;
4240        session = session->next_client)
4241   {
4242     if (0 != memcmp (&session->neighbour->pid,
4243                      &sma->receiver,
4244                      sizeof (struct GNUNET_PeerIdentity)))
4245       continue;
4246     for (struct QueueEntry *qe = session->queue_head;
4247          NULL != qe;
4248          qe = qe->next)
4249     {
4250       if (qe->mid != sma->mid)
4251         continue;
4252       queue = qe;
4253       break;
4254     }
4255     break;
4256   }
4257   if (NULL == queue)
4258   {
4259     /* this should never happen */
4260     GNUNET_break (0);
4261     GNUNET_SERVICE_client_drop (tc->client);
4262     return;
4263   }
4264   GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
4265                                queue->session->queue_tail,
4266                                queue);
4267   queue->session->queue_length--;
4268   tc->details.communicator.total_queue_length--;
4269   GNUNET_SERVICE_client_continue (tc->client);
4270
4271   /* if applicable, resume transmissions that waited on ACK */
4272   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
4273   {
4274     /* Communicator dropped below threshold, resume all queues */
4275     GNUNET_STATISTICS_update (GST_stats,
4276                               "# Transmission throttled due to communicator queue limit",
4277                               -1,
4278                               GNUNET_NO);
4279     for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4280          NULL != session;
4281          session = session->next_client)
4282       schedule_transmit_on_queue (session);
4283   }
4284   else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
4285   {
4286     /* queue dropped below threshold; only resume this one queue */
4287     GNUNET_STATISTICS_update (GST_stats,
4288                               "# Transmission throttled due to session queue limit",
4289                               -1,
4290                               GNUNET_NO);
4291     schedule_transmit_on_queue (queue->session);
4292   }
4293
4294   /* TODO: we also should react on the status! */
4295   // FIXME: this probably requires queue->pm = s assignment!
4296   // FIXME: react to communicator status about transmission request. We got:
4297   sma->status; // OK success, SYSERR failure
4298
4299   GNUNET_free (queue);
4300 }
4301
4302
4303 /**
4304  * Iterator telling new MONITOR client about all existing
4305  * queues to peers.
4306  *
4307  * @param cls the new `struct TransportClient`
4308  * @param pid a connected peer
4309  * @param value the `struct Neighbour` with more information
4310  * @return #GNUNET_OK (continue to iterate)
4311  */
4312 static int
4313 notify_client_queues (void *cls,
4314                       const struct GNUNET_PeerIdentity *pid,
4315                       void *value)
4316 {
4317   struct TransportClient *tc = cls;
4318   struct Neighbour *neighbour = value;
4319
4320   GNUNET_assert (CT_MONITOR == tc->type);
4321   for (struct GNUNET_ATS_Session *q = neighbour->session_head;
4322        NULL != q;
4323        q = q->next_neighbour)
4324   {
4325     struct MonitorEvent me = {
4326       .rtt = q->rtt,
4327       .cs = q->cs,
4328       .num_msg_pending = q->num_msg_pending,
4329       .num_bytes_pending = q->num_bytes_pending
4330     };
4331
4332     notify_monitor (tc,
4333                     pid,
4334                     q->address,
4335                     q->nt,
4336                     &me);
4337   }
4338   return GNUNET_OK;
4339 }
4340
4341
4342 /**
4343  * Initialize a monitor client.
4344  *
4345  * @param cls the client
4346  * @param start the start message that was sent
4347  */
4348 static void
4349 handle_monitor_start (void *cls,
4350                       const struct GNUNET_TRANSPORT_MonitorStart *start)
4351 {
4352   struct TransportClient *tc = cls;
4353
4354   if (CT_NONE != tc->type)
4355   {
4356     GNUNET_break (0);
4357     GNUNET_SERVICE_client_drop (tc->client);
4358     return;
4359   }
4360   tc->type = CT_MONITOR;
4361   tc->details.monitor.peer = start->peer;
4362   tc->details.monitor.one_shot = ntohl (start->one_shot);
4363   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4364                                          &notify_client_queues,
4365                                          tc);
4366   GNUNET_SERVICE_client_mark_monitor (tc->client);
4367   GNUNET_SERVICE_client_continue (tc->client);
4368 }
4369
4370
4371 /**
4372  * Signature of a function called by ATS with the current bandwidth
4373  * allocation to be used as determined by ATS.
4374  *
4375  * @param cls closure, NULL
4376  * @param session session this is about
4377  * @param bandwidth_out assigned outbound bandwidth for the connection,
4378  *        0 to signal disconnect
4379  * @param bandwidth_in assigned inbound bandwidth for the connection,
4380  *        0 to signal disconnect
4381  */
4382 static void
4383 ats_allocation_cb (void *cls,
4384                    struct GNUNET_ATS_Session *session,
4385                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
4386                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
4387 {
4388   (void) cls;
4389   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
4390                                          bandwidth_out);
4391   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
4392                                          bandwidth_in);
4393 }
4394
4395
4396 /**
4397  * Find transport client providing communication service
4398  * for the protocol @a prefix.
4399  *
4400  * @param prefix communicator name
4401  * @return NULL if no such transport client is available
4402  */
4403 static struct TransportClient *
4404 lookup_communicator (const char *prefix)
4405 {
4406   for (struct TransportClient *tc = clients_head;
4407        NULL != tc;
4408        tc = tc->next)
4409   {
4410     if (CT_COMMUNICATOR != tc->type)
4411       continue;
4412     if (0 == strcmp (prefix,
4413                      tc->details.communicator.address_prefix))
4414       return tc;
4415   }
4416   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4417               "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
4418               prefix);
4419   return NULL;
4420 }
4421
4422
4423 /**
4424  * Signature of a function called by ATS suggesting transport to
4425  * try connecting with a particular address.
4426  *
4427  * @param cls closure, NULL
4428  * @param pid target peer
4429  * @param address the address to try
4430  */
4431 static void
4432 ats_suggestion_cb (void *cls,
4433                    const struct GNUNET_PeerIdentity *pid,
4434                    const char *address)
4435 {
4436   static uint32_t idgen;
4437   struct TransportClient *tc;
4438   char *prefix;
4439   struct GNUNET_TRANSPORT_CreateQueue *cqm;
4440   struct GNUNET_MQ_Envelope *env;
4441   size_t alen;
4442
4443   (void) cls;
4444   prefix = GNUNET_HELLO_address_to_prefix (address);
4445   if (NULL == prefix)
4446   {
4447     GNUNET_break (0); /* ATS gave invalid address!? */
4448     return;
4449   }
4450   tc = lookup_communicator (prefix);
4451   if (NULL == tc)
4452   {
4453     GNUNET_STATISTICS_update (GST_stats,
4454                               "# ATS suggestions ignored due to missing communicator",
4455                               1,
4456                               GNUNET_NO);
4457     return;
4458   }
4459   /* forward suggestion for queue creation to communicator */
4460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4461               "Request #%u for `%s' communicator to create queue to `%s'\n",
4462               (unsigned int) idgen,
4463               prefix,
4464               address);
4465   alen = strlen (address) + 1;
4466   env = GNUNET_MQ_msg_extra (cqm,
4467                              alen,
4468                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
4469   cqm->request_id = htonl (idgen++);
4470   cqm->receiver = *pid;
4471   memcpy (&cqm[1],
4472           address,
4473           alen);
4474   GNUNET_MQ_send (tc->mq,
4475                   env);
4476 }
4477
4478
4479 /**
4480  * Communicator tells us that our request to create a queue "worked", that
4481  * is setting up the queue is now in process.
4482  *
4483  * @param cls the `struct TransportClient`
4484  * @param cqr confirmation message
4485  */
4486 static void
4487 handle_queue_create_ok (void *cls,
4488                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4489 {
4490   struct TransportClient *tc = cls;
4491
4492   if (CT_COMMUNICATOR != tc->type)
4493   {
4494     GNUNET_break (0);
4495     GNUNET_SERVICE_client_drop (tc->client);
4496     return;
4497   }
4498   GNUNET_STATISTICS_update (GST_stats,
4499                             "# ATS suggestions succeeded at communicator",
4500                             1,
4501                             GNUNET_NO);
4502   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4503               "Request #%u for communicator to create queue succeeded\n",
4504               (unsigned int) ntohs (cqr->request_id));
4505   GNUNET_SERVICE_client_continue (tc->client);
4506 }
4507
4508
4509 /**
4510  * Communicator tells us that our request to create a queue failed. This usually
4511  * indicates that the provided address is simply invalid or that the communicator's
4512  * resources are exhausted.
4513  *
4514  * @param cls the `struct TransportClient`
4515  * @param cqr failure message
4516  */
4517 static void
4518 handle_queue_create_fail (void *cls,
4519                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4520 {
4521   struct TransportClient *tc = cls;
4522
4523   if (CT_COMMUNICATOR != tc->type)
4524   {
4525     GNUNET_break (0);
4526     GNUNET_SERVICE_client_drop (tc->client);
4527     return;
4528   }
4529   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4530               "Request #%u for communicator to create queue failed\n",
4531               (unsigned int) ntohs (cqr->request_id));
4532   GNUNET_STATISTICS_update (GST_stats,
4533                             "# ATS suggestions failed in queue creation at communicator",
4534                             1,
4535                             GNUNET_NO);
4536   GNUNET_SERVICE_client_continue (tc->client);
4537 }
4538
4539
4540 /**
4541  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
4542  * messages. We do nothing here, real verification is done later.
4543  *
4544  * @param cls a `struct TransportClient *`
4545  * @param msg message to verify
4546  * @return #GNUNET_OK
4547  */
4548 static int
4549 check_address_consider_verify (void *cls,
4550                                const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
4551 {
4552   (void) cls;
4553   (void) hdr;
4554   return GNUNET_OK;
4555 }
4556
4557
4558 /**
4559  * Given another peers address, consider checking it for validity
4560  * and then adding it to the Peerstore.
4561  *
4562  * @param cls a `struct TransportClient`
4563  * @param hdr message containing the raw address data and
4564  *        signature in the body, see #GNUNET_HELLO_extract_address()
4565  */
4566 static void
4567 handle_address_consider_verify (void *cls,
4568                                 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
4569 {
4570   char *address;
4571   enum GNUNET_NetworkType nt;
4572   struct GNUNET_TIME_Absolute expiration;
4573
4574   (void) cls;
4575   // FIXME: pre-check: do we know this address already?
4576   // FIXME: pre-check: rate-limit signature verification / validation!
4577   address = GNUNET_HELLO_extract_address (&hdr[1],
4578                                           ntohs (hdr->header.size) - sizeof (*hdr),
4579                                           &hdr->peer,
4580                                           &nt,
4581                                           &expiration);
4582   if (NULL == address)
4583   {
4584     GNUNET_break_op (0);
4585     return;
4586   }
4587   if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
4588     return; /* expired */
4589   // FIXME: do begin actual verification here!
4590   GNUNET_free (address);
4591 }
4592
4593
4594 /**
4595  * Free neighbour entry.
4596  *
4597  * @param cls NULL
4598  * @param pid unused
4599  * @param value a `struct Neighbour`
4600  * @return #GNUNET_OK (always)
4601  */
4602 static int
4603 free_neighbour_cb (void *cls,
4604                    const struct GNUNET_PeerIdentity *pid,
4605                    void *value)
4606 {
4607   struct Neighbour *neighbour = value;
4608
4609   (void) cls;
4610   (void) pid;
4611   GNUNET_break (0); // should this ever happen?
4612   free_neighbour (neighbour);
4613
4614   return GNUNET_OK;
4615 }
4616
4617
4618 /**
4619  * Free DV route entry.
4620  *
4621  * @param cls NULL
4622  * @param pid unused
4623  * @param value a `struct DistanceVector`
4624  * @return #GNUNET_OK (always)
4625  */
4626 static int
4627 free_dv_routes_cb (void *cls,
4628                    const struct GNUNET_PeerIdentity *pid,
4629                    void *value)
4630 {
4631   struct DistanceVector *dv = value;
4632
4633   (void) cls;
4634   (void) pid;
4635   free_dv_route (dv);
4636
4637   return GNUNET_OK;
4638 }
4639
4640
4641 /**
4642  * Free ephemeral entry.
4643  *
4644  * @param cls NULL
4645  * @param pid unused
4646  * @param value a `struct Neighbour`
4647  * @return #GNUNET_OK (always)
4648  */
4649 static int
4650 free_ephemeral_cb (void *cls,
4651                    const struct GNUNET_PeerIdentity *pid,
4652                    void *value)
4653 {
4654   struct EphemeralCacheEntry *ece = value;
4655
4656   (void) cls;
4657   (void) pid;
4658   free_ephemeral (ece);
4659   return GNUNET_OK;
4660 }
4661
4662
4663 /**
4664  * Function called when the service shuts down.  Unloads our plugins
4665  * and cancels pending validations.
4666  *
4667  * @param cls closure, unused
4668  */
4669 static void
4670 do_shutdown (void *cls)
4671 {
4672   (void) cls;
4673
4674   if (NULL != ephemeral_task)
4675   {
4676     GNUNET_SCHEDULER_cancel (ephemeral_task);
4677     ephemeral_task = NULL;
4678   }
4679   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4680                                          &free_neighbour_cb,
4681                                          NULL);
4682   if (NULL != ats)
4683   {
4684     GNUNET_ATS_transport_done (ats);
4685     ats = NULL;
4686   }
4687   if (NULL != peerstore)
4688   {
4689     GNUNET_PEERSTORE_disconnect (peerstore,
4690                                  GNUNET_NO);
4691     peerstore = NULL;
4692   }
4693   if (NULL != GST_stats)
4694   {
4695     GNUNET_STATISTICS_destroy (GST_stats,
4696                                GNUNET_NO);
4697     GST_stats = NULL;
4698   }
4699   if (NULL != GST_my_private_key)
4700   {
4701     GNUNET_free (GST_my_private_key);
4702     GST_my_private_key = NULL;
4703   }
4704   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
4705   neighbours = NULL;
4706   GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
4707                                          &free_dv_routes_cb,
4708                                          NULL);
4709   GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
4710   dv_routes = NULL;
4711   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
4712                                          &free_ephemeral_cb,
4713                                          NULL);
4714   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
4715   ephemeral_map = NULL;
4716   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
4717   ephemeral_heap = NULL;
4718 }
4719
4720
4721 /**
4722  * Initiate transport service.
4723  *
4724  * @param cls closure
4725  * @param c configuration to use
4726  * @param service the initialized service
4727  */
4728 static void
4729 run (void *cls,
4730      const struct GNUNET_CONFIGURATION_Handle *c,
4731      struct GNUNET_SERVICE_Handle *service)
4732 {
4733   (void) cls;
4734   /* setup globals */
4735   GST_cfg = c;
4736   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
4737                                                      GNUNET_YES);
4738   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
4739                                                     GNUNET_YES);
4740   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
4741                                                         GNUNET_YES);
4742   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4743   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
4744   if (NULL == GST_my_private_key)
4745   {
4746     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4747                 _("Transport service is lacking key configuration settings. Exiting.\n"));
4748     GNUNET_SCHEDULER_shutdown ();
4749     return;
4750   }
4751   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
4752                                       &GST_my_identity.public_key);
4753   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
4754              "My identity is `%s'\n",
4755              GNUNET_i2s_full (&GST_my_identity));
4756   GST_stats = GNUNET_STATISTICS_create ("transport",
4757                                         GST_cfg);
4758   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
4759                                  NULL);
4760   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
4761   if (NULL == peerstore)
4762   {
4763     GNUNET_break (0);
4764     GNUNET_SCHEDULER_shutdown ();
4765     return;
4766   }
4767   ats = GNUNET_ATS_transport_init (GST_cfg,
4768                                    &ats_allocation_cb,
4769                                    NULL,
4770                                    &ats_suggestion_cb,
4771                                    NULL);
4772   if (NULL == ats)
4773   {
4774     GNUNET_break (0);
4775     GNUNET_SCHEDULER_shutdown ();
4776     return;
4777   }
4778 }
4779
4780
4781 /**
4782  * Define "main" method using service macro.
4783  */
4784 GNUNET_SERVICE_MAIN
4785 ("transport",
4786  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
4787  &run,
4788  &client_connect_cb,
4789  &client_disconnect_cb,
4790  NULL,
4791  /* communication with core */
4792  GNUNET_MQ_hd_fixed_size (client_start,
4793                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
4794                           struct StartMessage,
4795                           NULL),
4796  GNUNET_MQ_hd_var_size (client_send,
4797                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
4798                         struct OutboundMessage,
4799                         NULL),
4800  /* communication with communicators */
4801  GNUNET_MQ_hd_var_size (communicator_available,
4802                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
4803                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
4804                         NULL),
4805  GNUNET_MQ_hd_var_size (communicator_backchannel,
4806                         GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
4807                         struct GNUNET_TRANSPORT_CommunicatorBackchannel,
4808                         NULL),
4809  GNUNET_MQ_hd_var_size (add_address,
4810                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
4811                         struct GNUNET_TRANSPORT_AddAddressMessage,
4812                         NULL),
4813  GNUNET_MQ_hd_fixed_size (del_address,
4814                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
4815                           struct GNUNET_TRANSPORT_DelAddressMessage,
4816                           NULL),
4817  GNUNET_MQ_hd_var_size (incoming_msg,
4818                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
4819                         struct GNUNET_TRANSPORT_IncomingMessage,
4820                         NULL),
4821  GNUNET_MQ_hd_fixed_size (queue_create_ok,
4822                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
4823                           struct GNUNET_TRANSPORT_CreateQueueResponse,
4824                           NULL),
4825  GNUNET_MQ_hd_fixed_size (queue_create_fail,
4826                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
4827                           struct GNUNET_TRANSPORT_CreateQueueResponse,
4828                           NULL),
4829  GNUNET_MQ_hd_var_size (add_queue_message,
4830                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
4831                         struct GNUNET_TRANSPORT_AddQueueMessage,
4832                         NULL),
4833  GNUNET_MQ_hd_var_size (address_consider_verify,
4834                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
4835                         struct GNUNET_TRANSPORT_AddressToVerify,
4836                         NULL),
4837  GNUNET_MQ_hd_fixed_size (del_queue_message,
4838                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
4839                           struct GNUNET_TRANSPORT_DelQueueMessage,
4840                           NULL),
4841  GNUNET_MQ_hd_fixed_size (send_message_ack,
4842                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
4843                           struct GNUNET_TRANSPORT_SendMessageToAck,
4844                           NULL),
4845  /* communication with monitors */
4846  GNUNET_MQ_hd_fixed_size (monitor_start,
4847                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
4848                           struct GNUNET_TRANSPORT_MonitorStart,
4849                           NULL),
4850  GNUNET_MQ_handler_end ());
4851
4852
4853 /* end of file gnunet-service-transport.c */