tng: towards communicator flow control:
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement:
36  * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
37  *
38  * Easy:
39  * - use ATS bandwidth allocation callback and schedule transmissions!
40  *
41  * Plan:
42  * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
43  *
44  * Later:
45  * - change transport-core API to provide proper flow control in both
46  *   directions, allow multiple messages per peer simultaneously (tag
47  *   confirmations with unique message ID), and replace quota-out with
48  *   proper flow control;
49  *
50  * Design realizations / discussion:
51  * - communicators do flow control by calling MQ "notify sent"
52  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
53  *   or explicitly via background channel FC ACKs.  As long as the
54  *   channel is not full, they may 'notify sent' even if the other
55  *   peer has not yet confirmed receipt. The other peer confirming
56  *   is _only_ for FC, not for more reliable transmission; reliable
57  *   transmission (i.e. of fragments) is left to _transport_.
58  * - ACKs sent back in uni-directional communicators are done via
59  *   the background channel API; here transport _may_ initially
60  *   broadcast (with bounded # hops) if no path is known;
61  * - transport should _integrate_ DV-routing and build a view of
62  *   the network; then background channel traffic can be
63  *   routed via DV as well as explicit "DV" traffic.
64  * - background channel is also used for ACKs and NAT traversal support
65  * - transport service is responsible for AEAD'ing the background
66  *   channel, timestamps and monotonic time are used against replay
67  *   of old messages -> peerstore needs to be supplied with
68  *   "latest timestamps seen" data
69  * - if transport implements DV, we likely need a 3rd peermap
70  *   in addition to ephemerals and (direct) neighbours
71  *   => in this data structure, we should track ATS metrics (distance, RTT, etc.)
72  *   as well as latest timestamps seen, goodput, fragments for transmission, etc.
73  *   ==> check if stuff needs to be moved out of "Neighbour"
74  * - transport should encapsualte core-level messages and do its
75  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
76  *   for retransmission
77  */
78 #include "platform.h"
79 #include "gnunet_util_lib.h"
80 #include "gnunet_statistics_service.h"
81 #include "gnunet_transport_monitor_service.h"
82 #include "gnunet_peerstore_service.h"
83 #include "gnunet_hello_lib.h"
84 #include "gnunet_ats_transport_service.h"
85 #include "transport.h"
86
87
88 /**
89  * What is the size we assume for a read operation in the
90  * absence of an MTU for the purpose of flow control?
91  */
92 #define IN_PACKET_SIZE_WITHOUT_MTU 128
93
94 /**
95  * If a queue delays the next message by more than this number
96  * of seconds we log a warning. Note: this is for testing,
97  * the value chosen here might be too aggressively low!
98  */
99 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
100
101 /**
102  * How many messages can we have pending for a given communicator
103  * process before we start to throttle that communicator?
104  * 
105  * Used if a communicator might be CPU-bound and cannot handle the traffic.
106  */
107 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
108
109 /**
110  * How many messages can we have pending for a given session (queue to
111  * a particular peer via a communicator) process before we start to
112  * throttle that queue?
113  * 
114  * Used if ATS assigns more bandwidth to a particular transmission
115  * method than that transmission method can right now handle. (Yes,
116  * ATS should eventually notice utilization below allocation and
117  * adjust, but we don't want to queue up tons of messages in the
118  * meantime). Must be significantly below
119  * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
120  */
121 #define SESSION_QUEUE_LIMIT 32
122
123
124 GNUNET_NETWORK_STRUCT_BEGIN
125
126 /**
127  * Outer layer of an encapsulated backchannel message.
128  */
129 struct TransportBackchannelEncapsulationMessage
130 {
131   /**
132    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
133    */
134   struct GNUNET_MessageHeader header;
135
136   /**
137    * Distance the backchannel message has traveled, to be updated at
138    * each hop.  Used to bound the number of hops in case a backchannel
139    * message is broadcast and thus travels without routing
140    * information (during initial backchannel discovery).
141    */
142   uint32_t distance;
143
144   /**
145    * Target's peer identity (as backchannels may be transmitted
146    * indirectly, or even be broadcast).
147    */
148   struct GNUNET_PeerIdentity target;
149
150   /**
151    * Ephemeral key setup by the sender for @e target, used
152    * to encrypt the payload.
153    */
154   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
155
156   /**
157    * HMAC over the ciphertext of the encrypted, variable-size
158    * body that follows.  Verified via DH of @e target and
159    * @e ephemeral_key
160    */
161   struct GNUNET_HashCode hmac;
162
163   /* Followed by encrypted, variable-size payload */
164 };
165
166
167 /**
168  * Message by which a peer confirms that it is using an
169  * ephemeral key.
170  */
171 struct EphemeralConfirmation
172 {
173
174   /**
175    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
176    */
177   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
178
179   /**
180    * How long is this signature over the ephemeral key
181    * valid?
182    */
183   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
184
185   /**
186    * Ephemeral key setup by the sender for @e target, used
187    * to encrypt the payload.
188    */
189   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
190
191 };
192
193
194 /**
195  * Plaintext of the variable-size payload that is encrypted
196  * within a `struct TransportBackchannelEncapsulationMessage`
197  */
198 struct TransportBackchannelRequestPayload
199 {
200
201   /**
202    * Sender's peer identity.
203    */
204   struct GNUNET_PeerIdentity sender;
205
206   /**
207    * Signature of the sender over an
208    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
209    */
210   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
211
212   /**
213    * How long is this signature over the ephemeral key
214    * valid?
215    */
216   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
217
218   /**
219    * Current monotonic time of the sending transport service.  Used to
220    * detect replayed messages.  Note that the receiver should remember
221    * a list of the recently seen timestamps and only reject messages
222    * if the timestamp is in the list, or the list is "full" and the
223    * timestamp is smaller than the lowest in the list.  This list of
224    * timestamps per peer should be persisted to guard against replays
225    * after restarts.
226    */
227   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
228
229   /* Followed by a `struct GNUNET_MessageHeader` with a message
230      for a communicator */
231
232   /* Followed by a 0-termianted string specifying the name of
233      the communicator which is to receive the message */
234
235 };
236
237
238 /**
239  * Outer layer of an encapsulated unfragmented application message sent
240  * over an unreliable channel.
241  */
242 struct TransportReliabilityBox
243 {
244   /**
245    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
246    */
247   struct GNUNET_MessageHeader header;
248
249   /**
250    * Number of messages still to be sent before a commulative
251    * ACK is requested.  Zero if an ACK is requested immediately.
252    * In NBO.  Note that the receiver may send the ACK faster
253    * if it believes that is reasonable.
254    */
255   uint32_t ack_countdown GNUNET_PACKED;
256
257   /**
258    * Unique ID of the message used for signalling receipt of
259    * messages sent over possibly unreliable channels.  Should
260    * be a random.
261    */
262   struct GNUNET_ShortHashCode msg_uuid;
263 };
264
265
266 /**
267  * Confirmation that the receiver got a
268  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
269  * confirmation may be transmitted over a completely different queue,
270  * so ACKs are identified by a combination of PID of sender and
271  * message UUID, without the queue playing any role!
272  */
273 struct TransportReliabilityAckMessage
274 {
275   /**
276    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
277    */
278   struct GNUNET_MessageHeader header;
279
280   /**
281    * Reserved. Zero.
282    */
283   uint32_t reserved GNUNET_PACKED;
284
285   /**
286    * How long was the ACK delayed relative to the average time of
287    * receipt of the messages being acknowledged?  Used to calculate
288    * the average RTT by taking the receipt time of the ack minus the
289    * average transmission time of the sender minus this value.
290    */
291   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
292
293   /* followed by any number of `struct GNUNET_ShortHashCode`
294      messages providing ACKs */
295 };
296
297
298 /**
299  * Outer layer of an encapsulated fragmented application message.
300  */
301 struct TransportFragmentBox
302 {
303   /**
304    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
305    */
306   struct GNUNET_MessageHeader header;
307
308   /**
309    * Unique ID of this fragment (and fragment transmission!). Will
310    * change even if a fragement is retransmitted to make each
311    * transmission attempt unique! Should be incremented by one for
312    * each fragment transmission. If a client receives a duplicate
313    * fragment (same @e frag_off), it must send
314    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
315    */
316   uint32_t frag_uuid GNUNET_PACKED;
317
318   /**
319    * Original message ID for of the message that all the1
320    * fragments belong to.  Must be the same for all fragments.
321    */ 
322   struct GNUNET_ShortHashCode msg_uuid;
323
324   /**
325    * Offset of this fragment in the overall message.
326    */ 
327   uint16_t frag_off GNUNET_PACKED;
328
329   /**
330    * Total size of the message that is being fragmented.
331    */ 
332   uint16_t msg_size GNUNET_PACKED;
333
334 };
335
336
337 /**
338  * Outer layer of an fragmented application message sent over a queue
339  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
340  * received, the receiver has two RTTs or 64 further fragments with
341  * the same basic message time to send an acknowledgement, possibly
342  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
343  * sent immediately once all fragments were sent.
344  */
345 struct TransportFragmentAckMessage
346 {
347   /**
348    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
349    */
350   struct GNUNET_MessageHeader header;
351
352   /**
353    * Unique ID of the lowest fragment UUID being acknowledged.
354    */
355   uint32_t frag_uuid GNUNET_PACKED;
356
357   /**
358    * Bitfield of up to 64 additional fragments following the
359    * @e msg_uuid being acknowledged by this message.
360    */ 
361   uint64_t extra_acks GNUNET_PACKED;
362
363   /**
364    * Original message ID for of the message that all the
365    * fragments belong to.
366    */ 
367   struct GNUNET_ShortHashCode msg_uuid;
368
369   /**
370    * How long was the ACK delayed relative to the average time of
371    * receipt of the fragments being acknowledged?  Used to calculate
372    * the average RTT by taking the receipt time of the ack minus the
373    * average transmission time of the sender minus this value.
374    */
375   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
376 };
377
378
379 /**
380  * Internal message used by transport for distance vector learning.
381  * If @e num_hops does not exceed the threshold, peers should append
382  * themselves to the peer list and flood the message (possibly only
383  * to a subset of their neighbours to limit discoverability of the
384  * network topology).  To the extend that the @e bidirectional bits
385  * are set, peers may learn the inverse paths even if they did not
386  * initiate. 
387  *
388  * Unless received on a bidirectional queue and @e num_hops just
389  * zero, peers that can forward to the initator should always try to
390  * forward to the initiator.
391  */
392 struct TransportDVLearn
393 {
394   /**
395    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
396    */
397   struct GNUNET_MessageHeader header;
398
399   /**
400    * Number of hops this messages has travelled, in NBO. Zero if
401    * sent by initiator.
402    */
403   uint16_t num_hops GNUNET_PACKED;
404
405   /**
406    * Bitmask of the last 16 hops indicating whether they are confirmed
407    * available (without DV) in both directions or not, in NBO.  Used
408    * to possibly instantly learn a path in both directions.  Each peer
409    * should shift this value by one to the left, and then set the
410    * lowest bit IF the current sender can be reached from it (without
411    * DV routing).  
412    */ 
413   uint16_t bidirectional GNUNET_PACKED;
414
415   /**
416    * Peers receiving this message and delaying forwarding to other
417    * peers for any reason should increment this value such as to 
418    * enable the origin to determine the actual network-only delay
419    * in addition to the real-time delay (assuming the message loops
420    * back to the origin).
421    */
422   struct GNUNET_TIME_Relative cummulative_non_network_delay;
423
424   /**
425    * Identity of the peer that started this learning activity.
426    */
427   struct GNUNET_PeerIdentity initiator;
428   
429   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
430      excluding the initiator of the DV trace; the last entry is the
431      current sender; the current peer must not be included except if
432      it is the sender. */
433   
434 };
435
436
437 /**
438  * Outer layer of an encapsulated message send over multiple hops.
439  * The path given only includes the identities of the subsequent
440  * peers, i.e. it will be empty if we are the receiver. Each
441  * forwarding peer should scan the list from the end, and if it can,
442  * forward to the respective peer. The list should then be shortened
443  * by all the entries up to and including that peer.  Each hop should
444  * also increment @e total_hops to allow the receiver to get a precise
445  * estimate on the number of hops the message travelled.  Senders must
446  * provide a learned path that thus should work, but intermediaries
447  * know of a shortcut, they are allowed to send the message via that
448  * shortcut.
449  *
450  * If a peer finds itself still on the list, it must drop the message.
451  */
452 struct TransportDVBox
453 {
454   /**
455    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
456    */
457   struct GNUNET_MessageHeader header;
458
459   /**
460    * Number of total hops this messages travelled. In NBO.
461    * @e origin sets this to zero, to be incremented at
462    * each hop.
463    */
464   uint16_t total_hops GNUNET_PACKED;
465
466   /**
467    * Number of hops this messages includes. In NBO.
468    */
469   uint16_t num_hops GNUNET_PACKED;
470   
471   /**
472    * Identity of the peer that originated the message.
473    */
474   struct GNUNET_PeerIdentity origin;
475
476   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
477      excluding the @e origin and the current peer, the last must be
478      the ultimate target; if @e num_hops is zero, the receiver of this
479      message is the ultimate target. */
480
481   /* Followed by the actual message, which itself may be
482      another box, but not a DV_LEARN or DV_BOX message! */
483 };
484
485
486 GNUNET_NETWORK_STRUCT_END
487
488
489
490 /**
491  * What type of client is the `struct TransportClient` about?
492  */
493 enum ClientType
494 {
495   /**
496    * We do not know yet (client is fresh).
497    */
498   CT_NONE = 0,
499
500   /**
501    * Is the CORE service, we need to forward traffic to it.
502    */
503   CT_CORE = 1,
504
505   /**
506    * It is a monitor, forward monitor data.
507    */
508   CT_MONITOR = 2,
509
510   /**
511    * It is a communicator, use for communication.
512    */
513   CT_COMMUNICATOR = 3
514 };
515
516
517 /**
518  * Entry in our cache of ephemeral keys we currently use.
519  */
520 struct EphemeralCacheEntry
521 {
522
523   /**
524    * Target's peer identity (we don't re-use ephemerals
525    * to limit linkability of messages).
526    */
527   struct GNUNET_PeerIdentity target;
528
529   /**
530    * Signature affirming @e ephemeral_key of type
531    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
532    */
533   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
534
535   /**
536    * How long is @e sender_sig valid
537    */
538   struct GNUNET_TIME_Absolute ephemeral_validity;
539
540   /**
541    * Our ephemeral key.
542    */
543   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
544
545   /**
546    * Node in the ephemeral cache for this entry.
547    * Used for expiration.
548    */
549   struct GNUNET_CONTAINER_HeapNode *hn;
550 };
551
552
553 /**
554  * Client connected to the transport service.
555  */
556 struct TransportClient;
557
558
559 /**
560  * A neighbour that at least one communicator is connected to.
561  */
562 struct Neighbour;
563
564
565 /**
566  * Entry identifying transmission in one of our `struct
567  * GNUNET_ATS_Sessions` which still awaits an ACK.  This is used to
568  * ensure we do not overwhelm a communicator and limit the number of
569  * messages outstanding per communicator (say in case communicator is
570  * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
571  * what the communicator can actually provide towards a particular
572  * peer/target).
573  */
574 struct QueueEntry
575 {
576
577   /**
578    * Kept as a DLL.
579    */ 
580   struct QueueEntry *next;
581
582   /**
583    * Kept as a DLL.
584    */ 
585   struct QueueEntry *prev;
586
587   /**
588    * ATS session this entry is queued with.
589    */
590   struct GNUNET_ATS_Session *session;
591   
592   /**
593    * Message ID used for this message with the queue used for transmission.
594    */
595   uint64_t mid;
596 };
597
598
599 /**
600  * An ATS session is a message queue provided by a communicator
601  * via which we can reach a particular neighbour.
602  */
603 struct GNUNET_ATS_Session
604 {
605   /**
606    * Kept in a MDLL.
607    */
608   struct GNUNET_ATS_Session *next_neighbour;
609
610   /**
611    * Kept in a MDLL.
612    */
613   struct GNUNET_ATS_Session *prev_neighbour;
614
615   /**
616    * Kept in a MDLL.
617    */
618   struct GNUNET_ATS_Session *prev_client;
619
620   /**
621    * Kept in a MDLL.
622    */
623   struct GNUNET_ATS_Session *next_client;
624
625   /**
626    * Head of DLL of unacked transmission requests.
627    */ 
628   struct QueueEntry *queue_head;
629
630   /**
631    * End of DLL of unacked transmission requests.
632    */ 
633   struct QueueEntry *queue_tail;
634
635   /**
636    * Which neighbour is this ATS session for?
637    */
638   struct Neighbour *neighbour;
639
640   /**
641    * Which communicator offers this ATS session?
642    */
643   struct TransportClient *tc;
644
645   /**
646    * Address served by the ATS session.
647    */
648   const char *address;
649
650   /**
651    * Handle by which we inform ATS about this queue.
652    */
653   struct GNUNET_ATS_SessionRecord *sr;
654
655   /**
656    * Task scheduled for the time when this queue can (likely) transmit the
657    * next message. Still needs to check with the @e tracker_out to be sure.
658    */ 
659   struct GNUNET_SCHEDULER_Task *transmit_task;
660   
661   /**
662    * Our current RTT estimate for this ATS session.
663    */
664   struct GNUNET_TIME_Relative rtt;
665
666   /**
667    * Message ID generator for transmissions on this queue.
668    */ 
669   uint64_t mid_gen;
670   
671   /**
672    * Unique identifier of this ATS session with the communicator.
673    */
674   uint32_t qid;
675
676   /**
677    * Maximum transmission unit supported by this ATS session.
678    */
679   uint32_t mtu;
680
681   /**
682    * Distance to the target of this ATS session.
683    */
684   uint32_t distance;
685
686   /**
687    * Messages pending.
688    */
689   uint32_t num_msg_pending;
690
691   /**
692    * Bytes pending.
693    */
694   uint32_t num_bytes_pending;
695
696   /**
697    * Length of the DLL starting at @e queue_head.
698    */
699   unsigned int queue_length;
700   
701   /**
702    * Network type offered by this ATS session.
703    */
704   enum GNUNET_NetworkType nt;
705
706   /**
707    * Connection status for this ATS session.
708    */
709   enum GNUNET_TRANSPORT_ConnectionStatus cs;
710
711   /**
712    * How much outbound bandwidth do we have available for this session?
713    */
714   struct GNUNET_BANDWIDTH_Tracker tracker_out;
715
716   /**
717    * How much inbound bandwidth do we have available for this session?
718    */
719   struct GNUNET_BANDWIDTH_Tracker tracker_in;
720 };
721
722
723 /**
724  * A neighbour that at least one communicator is connected to.
725  */
726 struct Neighbour
727 {
728
729   /**
730    * Which peer is this about?
731    */
732   struct GNUNET_PeerIdentity pid;
733
734   /**
735    * Head of list of messages pending for this neighbour.
736    */
737   struct PendingMessage *pending_msg_head;
738
739   /**
740    * Tail of list of messages pending for this neighbour.
741    */
742   struct PendingMessage *pending_msg_tail;
743
744   /**
745    * Head of DLL of ATS sessions to this peer.
746    */
747   struct GNUNET_ATS_Session *session_head;
748
749   /**
750    * Tail of DLL of ATS sessions to this peer.
751    */
752   struct GNUNET_ATS_Session *session_tail;
753
754   /**
755    * Task run to cleanup pending messages that have exceeded their timeout.
756    */  
757   struct GNUNET_SCHEDULER_Task *timeout_task;
758
759   /**
760    * Quota at which CORE is allowed to transmit to this peer
761    * according to ATS.
762    *
763    * FIXME: not yet used, tricky to get right given multiple queues!
764    *        (=> Idea: let ATS set a quota per queue and we add them up here?)
765    * FIXME: how do we set this value initially when we tell CORE?
766    *    Options: start at a minimum value or at literally zero (before ATS?)
767    *         (=> Current thought: clean would be zero!)
768    */
769   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
770
771   /**
772    * What is the earliest timeout of any message in @e pending_msg_tail?
773    */ 
774   struct GNUNET_TIME_Absolute earliest_timeout;
775   
776 };
777
778
779 /**
780  * Types of different pending messages.
781  */ 
782 enum PendingMessageType
783 {
784
785   /**
786    * Ordinary message received from the CORE service.
787    */
788   PMT_CORE = 0,
789
790   /**
791    * Fragment box.
792    */
793   PMT_FRAGMENT_BOX = 1,
794
795   /**
796    * Reliability box.
797    */
798   PMT_RELIABILITY_BOX = 2,
799
800   /**
801    * Any type of acknowledgement.
802    */
803   PMT_ACKNOWLEDGEMENT = 3
804
805   
806 };
807
808
809 /**
810  * Transmission request that is awaiting delivery.  The original
811  * transmission requests from CORE may be too big for some queues.
812  * In this case, a *tree* of fragments is created.  At each
813  * level of the tree, fragments are kept in a DLL ordered by which
814  * fragment should be sent next (at the head).  The tree is searched
815  * top-down, with the original message at the root.
816  *
817  * To select a node for transmission, first it is checked if the
818  * current node's message fits with the MTU.  If it does not, we
819  * either calculate the next fragment (based on @e frag_off) from the
820  * current node, or, if all fragments have already been created,
821  * descend to the @e head_frag.  Even though the node was already
822  * fragmented, the fragment may be too big if the fragment was 
823  * generated for a queue with a larger MTU. In this case, the node
824  * may be fragmented again, thus creating a tree.
825  *
826  * When acknowledgements for fragments are received, the tree
827  * must be pruned, removing those parts that were already 
828  * acknowledged.  When fragments are sent over a reliable 
829  * channel, they can be immediately removed.
830  *
831  * If a message is ever fragmented, then the original "full" message
832  * is never again transmitted (even if it fits below the MTU), and
833  * only (remaining) fragments are sent.
834  */
835 struct PendingMessage
836 {
837   /**
838    * Kept in a MDLL of messages for this @a target.
839    */
840   struct PendingMessage *next_neighbour;
841
842   /**
843    * Kept in a MDLL of messages for this @a target.
844    */
845   struct PendingMessage *prev_neighbour;
846
847   /**
848    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
849    */
850   struct PendingMessage *next_client;
851   
852   /**
853    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
854    */
855   struct PendingMessage *prev_client;
856
857   /**
858    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
859    */
860   struct PendingMessage *next_frag;
861   
862   /**
863    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
864    */
865   struct PendingMessage *prev_frag;
866     
867   /**
868    * Target of the request.
869    */
870   struct Neighbour *target;
871       
872   /**
873    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
874    */
875   struct TransportClient *client;
876   
877   /**
878    * Head of a MDLL of fragments created for this core message.
879    */
880   struct PendingMessage *head_frag;
881   
882   /**
883    * Tail of a MDLL of fragments created for this core message.
884    */
885   struct PendingMessage *tail_frag;
886
887   /**
888    * Our parent in the fragmentation tree.
889    */
890   struct PendingMessage *frag_parent;
891     
892   /**
893    * At what time should we give up on the transmission (and no longer retry)?
894    */
895   struct GNUNET_TIME_Absolute timeout;
896
897   /**
898    * What is the earliest time for us to retry transmission of this message?
899    */
900   struct GNUNET_TIME_Absolute next_attempt;
901
902   /**
903    * UUID to use for this message (used for reassembly of fragments, only
904    * initialized if @e msg_uuid_set is #GNUNET_YES).
905    */
906   struct GNUNET_ShortHashCode msg_uuid;
907   
908   /**
909    * Counter incremented per generated fragment.
910    */ 
911   uint32_t frag_uuidgen;
912   
913   /**
914    * Type of the pending message.
915    */
916   enum PendingMessageType pmt;
917
918   /**
919    * Size of the original message.
920    */
921   uint16_t bytes_msg;
922
923   /**
924    * Offset at which we should generate the next fragment.
925    */ 
926   uint16_t frag_off;
927
928   /**
929    * #GNUNET_YES once @e msg_uuid was initialized
930    */
931   int16_t msg_uuid_set;
932   
933   /* Followed by @e bytes_msg to transmit */
934 };
935
936
937 /**
938  * One of the addresses of this peer.
939  */
940 struct AddressListEntry
941 {
942
943   /**
944    * Kept in a DLL.
945    */
946   struct AddressListEntry *next;
947
948   /**
949    * Kept in a DLL.
950    */
951   struct AddressListEntry *prev;
952
953   /**
954    * Which communicator provides this address?
955    */
956   struct TransportClient *tc;
957
958   /**
959    * The actual address.
960    */
961   const char *address;
962
963   /**
964    * Current context for storing this address in the peerstore.
965    */
966   struct GNUNET_PEERSTORE_StoreContext *sc;
967
968   /**
969    * Task to periodically do @e st operation.
970    */
971   struct GNUNET_SCHEDULER_Task *st;
972
973   /**
974    * What is a typical lifetime the communicator expects this
975    * address to have? (Always from now.)
976    */
977   struct GNUNET_TIME_Relative expiration;
978
979   /**
980    * Address identifier used by the communicator.
981    */
982   uint32_t aid;
983
984   /**
985    * Network type offered by this address.
986    */
987   enum GNUNET_NetworkType nt;
988
989 };
990
991
992 /**
993  * Client connected to the transport service.
994  */
995 struct TransportClient
996 {
997
998   /**
999    * Kept in a DLL.
1000    */
1001   struct TransportClient *next;
1002
1003   /**
1004    * Kept in a DLL.
1005    */
1006   struct TransportClient *prev;
1007
1008   /**
1009    * Handle to the client.
1010    */
1011   struct GNUNET_SERVICE_Client *client;
1012
1013   /**
1014    * Message queue to the client.
1015    */
1016   struct GNUNET_MQ_Handle *mq;
1017
1018   /**
1019    * What type of client is this?
1020    */
1021   enum ClientType type;
1022
1023   union
1024   {
1025
1026     /**
1027      * Information for @e type #CT_CORE.
1028      */
1029     struct {
1030
1031       /**
1032        * Head of list of messages pending for this client, sorted by 
1033        * transmission time ("next_attempt" + possibly internal prioritization).
1034        */
1035       struct PendingMessage *pending_msg_head;
1036
1037       /**
1038        * Tail of list of messages pending for this client.
1039        */
1040       struct PendingMessage *pending_msg_tail;
1041
1042     } core;
1043
1044     /**
1045      * Information for @e type #CT_MONITOR.
1046      */
1047     struct {
1048
1049       /**
1050        * Peer identity to monitor the addresses of.
1051        * Zero to monitor all neighbours.  Valid if
1052        * @e type is #CT_MONITOR.
1053        */
1054       struct GNUNET_PeerIdentity peer;
1055
1056       /**
1057        * Is this a one-shot monitor?
1058        */
1059       int one_shot;
1060
1061     } monitor;
1062
1063
1064     /**
1065      * Information for @e type #CT_COMMUNICATOR.
1066      */
1067     struct {
1068       /**
1069        * If @e type is #CT_COMMUNICATOR, this communicator
1070        * supports communicating using these addresses.
1071        */
1072       char *address_prefix;
1073
1074       /**
1075        * Head of DLL of queues offered by this communicator.
1076        */
1077       struct GNUNET_ATS_Session *session_head;
1078
1079       /**
1080        * Tail of DLL of queues offered by this communicator.
1081        */
1082       struct GNUNET_ATS_Session *session_tail;
1083
1084       /**
1085        * Head of list of the addresses of this peer offered by this communicator.
1086        */
1087       struct AddressListEntry *addr_head;
1088
1089       /**
1090        * Tail of list of the addresses of this peer offered by this communicator.
1091        */
1092       struct AddressListEntry *addr_tail;
1093
1094       /**
1095        * Number of queue entries in all queues to this communicator. Used
1096        * throttle sending to a communicator if we see that the communicator
1097        * is globally unable to keep up.
1098        */
1099       unsigned int total_queue_length;
1100       
1101       /**
1102        * Characteristics of this communicator.
1103        */
1104       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1105
1106     } communicator;
1107
1108   } details;
1109
1110 };
1111
1112
1113 /**
1114  * Head of linked list of all clients to this service.
1115  */
1116 static struct TransportClient *clients_head;
1117
1118 /**
1119  * Tail of linked list of all clients to this service.
1120  */
1121 static struct TransportClient *clients_tail;
1122
1123 /**
1124  * Statistics handle.
1125  */
1126 static struct GNUNET_STATISTICS_Handle *GST_stats;
1127
1128 /**
1129  * Configuration handle.
1130  */
1131 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1132
1133 /**
1134  * Our public key.
1135  */
1136 static struct GNUNET_PeerIdentity GST_my_identity;
1137
1138 /**
1139  * Our private key.
1140  */
1141 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1142
1143 /**
1144  * Map from PIDs to `struct Neighbour` entries.  A peer is
1145  * a neighbour if we have an MQ to it from some communicator.
1146  */
1147 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1148
1149 /**
1150  * Database for peer's HELLOs.
1151  */
1152 static struct GNUNET_PEERSTORE_Handle *peerstore;
1153
1154 /**
1155  * Heap sorting `struct EphemeralCacheEntry` by their
1156  * key/signature validity.
1157  */
1158 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1159
1160 /**
1161  * Hash map for looking up `struct EphemeralCacheEntry`s
1162  * by peer identity. (We may have ephemerals in our
1163  * cache for which we do not have a neighbour entry,
1164  * and similar many neighbours may not need ephemerals,
1165  * so we use a second map.)
1166  */
1167 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1168
1169 /**
1170  * Our connection to ATS for allocation and bootstrapping.
1171  */
1172 static struct GNUNET_ATS_TransportHandle *ats;
1173
1174
1175 /**
1176  * Free cached ephemeral key.
1177  *
1178  * @param ece cached signature to free
1179  */
1180 static void
1181 free_ephemeral (struct EphemeralCacheEntry *ece)
1182 {
1183   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1184                                         &ece->target,
1185                                         ece);
1186   GNUNET_CONTAINER_heap_remove_node (ece->hn);
1187   GNUNET_free (ece);
1188 }
1189
1190
1191 /**
1192  * Lookup neighbour record for peer @a pid.
1193  *
1194  * @param pid neighbour to look for
1195  * @return NULL if we do not have this peer as a neighbour
1196  */
1197 static struct Neighbour *
1198 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1199 {
1200   return GNUNET_CONTAINER_multipeermap_get (neighbours,
1201                                             pid);
1202 }
1203
1204
1205 /**
1206  * Details about what to notify monitors about.
1207  */
1208 struct MonitorEvent
1209 {
1210   /**
1211    * @deprecated To be discussed if we keep these...
1212    */
1213   struct GNUNET_TIME_Absolute last_validation;
1214   struct GNUNET_TIME_Absolute valid_until;
1215   struct GNUNET_TIME_Absolute next_validation;
1216
1217   /**
1218    * Current round-trip time estimate.
1219    */
1220   struct GNUNET_TIME_Relative rtt;
1221
1222   /**
1223    * Connection status.
1224    */
1225   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1226
1227   /**
1228    * Messages pending.
1229    */
1230   uint32_t num_msg_pending;
1231
1232   /**
1233    * Bytes pending.
1234    */
1235   uint32_t num_bytes_pending;
1236
1237
1238 };
1239
1240
1241 /**
1242  * Notify monitor @a tc about an event.  That @a tc
1243  * cares about the event has already been checked.
1244  *
1245  * Send @a tc information in @a me about a @a peer's status with
1246  * respect to some @a address to all monitors that care.
1247  *
1248  * @param tc monitor to inform
1249  * @param peer peer the information is about
1250  * @param address address the information is about
1251  * @param nt network type associated with @a address
1252  * @param me detailed information to transmit
1253  */
1254 static void
1255 notify_monitor (struct TransportClient *tc,
1256                 const struct GNUNET_PeerIdentity *peer,
1257                 const char *address,
1258                 enum GNUNET_NetworkType nt,
1259                 const struct MonitorEvent *me)
1260 {
1261   struct GNUNET_MQ_Envelope *env;
1262   struct GNUNET_TRANSPORT_MonitorData *md;
1263   size_t addr_len = strlen (address) + 1;
1264
1265   env = GNUNET_MQ_msg_extra (md,
1266                              addr_len,
1267                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1268   md->nt = htonl ((uint32_t) nt);
1269   md->peer = *peer;
1270   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1271   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1272   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1273   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1274   md->cs = htonl ((uint32_t) me->cs);
1275   md->num_msg_pending = htonl (me->num_msg_pending);
1276   md->num_bytes_pending = htonl (me->num_bytes_pending);
1277   memcpy (&md[1],
1278           address,
1279           addr_len);
1280   GNUNET_MQ_send (tc->mq,
1281                   env);
1282 }
1283
1284
1285 /**
1286  * Send information in @a me about a @a peer's status with respect
1287  * to some @a address to all monitors that care.
1288  *
1289  * @param peer peer the information is about
1290  * @param address address the information is about
1291  * @param nt network type associated with @a address
1292  * @param me detailed information to transmit
1293  */
1294 static void
1295 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1296                  const char *address,
1297                  enum GNUNET_NetworkType nt,
1298                  const struct MonitorEvent *me)
1299 {
1300   static struct GNUNET_PeerIdentity zero;
1301
1302   for (struct TransportClient *tc = clients_head;
1303        NULL != tc;
1304        tc = tc->next)
1305   {
1306     if (CT_MONITOR != tc->type)
1307       continue;
1308     if (tc->details.monitor.one_shot)
1309       continue;
1310     if ( (0 != memcmp (&tc->details.monitor.peer,
1311                        &zero,
1312                        sizeof (zero))) &&
1313          (0 != memcmp (&tc->details.monitor.peer,
1314                        peer,
1315                        sizeof (*peer))) )
1316       continue;
1317     notify_monitor (tc,
1318                     peer,
1319                     address,
1320                     nt,
1321                     me);
1322   }
1323 }
1324
1325
1326 /**
1327  * Called whenever a client connects.  Allocates our
1328  * data structures associated with that client.
1329  *
1330  * @param cls closure, NULL
1331  * @param client identification of the client
1332  * @param mq message queue for the client
1333  * @return our `struct TransportClient`
1334  */
1335 static void *
1336 client_connect_cb (void *cls,
1337                    struct GNUNET_SERVICE_Client *client,
1338                    struct GNUNET_MQ_Handle *mq)
1339 {
1340   struct TransportClient *tc;
1341
1342   tc = GNUNET_new (struct TransportClient);
1343   tc->client = client;
1344   tc->mq = mq;
1345   GNUNET_CONTAINER_DLL_insert (clients_head,
1346                                clients_tail,
1347                                tc);
1348   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349               "Client %p connected\n",
1350               tc);
1351   return tc;
1352 }
1353
1354
1355 /**
1356  * Release memory used by @a neighbour.
1357  *
1358  * @param neighbour neighbour entry to free
1359  */
1360 static void
1361 free_neighbour (struct Neighbour *neighbour)
1362 {
1363   GNUNET_assert (NULL == neighbour->session_head);
1364   GNUNET_assert (GNUNET_YES ==
1365                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
1366                                                        &neighbour->pid,
1367                                                        neighbour));
1368   if (NULL != neighbour->timeout_task)
1369     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1370   GNUNET_free (neighbour);
1371 }
1372
1373
1374 /**
1375  * Send message to CORE clients that we lost a connection.
1376  *
1377  * @param tc client to inform (must be CORE client)
1378  * @param pid peer the connection is for
1379  * @param quota_out current quota for the peer
1380  */
1381 static void
1382 core_send_connect_info (struct TransportClient *tc,
1383                         const struct GNUNET_PeerIdentity *pid,
1384                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1385 {
1386   struct GNUNET_MQ_Envelope *env;
1387   struct ConnectInfoMessage *cim;
1388
1389   GNUNET_assert (CT_CORE == tc->type);
1390   env = GNUNET_MQ_msg (cim,
1391                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1392   cim->quota_out = quota_out;
1393   cim->id = *pid;
1394   GNUNET_MQ_send (tc->mq,
1395                   env);
1396 }
1397
1398
1399 /**
1400  * Send message to CORE clients that we gained a connection
1401  *
1402  * @param pid peer the queue was for
1403  * @param quota_out current quota for the peer
1404  */
1405 static void
1406 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1407                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1408 {
1409   for (struct TransportClient *tc = clients_head;
1410        NULL != tc;
1411        tc = tc->next)
1412   {
1413     if (CT_CORE != tc->type)
1414       continue;
1415     core_send_connect_info (tc,
1416                             pid,
1417                             quota_out);
1418   }
1419 }
1420
1421
1422 /**
1423  * Send message to CORE clients that we lost a connection.
1424  *
1425  * @param pid peer the connection was for
1426  */
1427 static void
1428 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1429 {
1430   for (struct TransportClient *tc = clients_head;
1431        NULL != tc;
1432        tc = tc->next)
1433   {
1434     struct GNUNET_MQ_Envelope *env;
1435     struct DisconnectInfoMessage *dim;
1436
1437     if (CT_CORE != tc->type)
1438       continue;
1439     env = GNUNET_MQ_msg (dim,
1440                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1441     dim->peer = *pid;
1442     GNUNET_MQ_send (tc->mq,
1443                     env);
1444   }
1445 }
1446
1447
1448 /**
1449  * We believe we are ready to transmit a message on a queue. Double-checks
1450  * with the queue's "tracker_out" and then gives the message to the 
1451  * communicator for transmission (updating the tracker, and re-scheduling
1452  * itself if applicable).  
1453  *
1454  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1455  */ 
1456 static void
1457 transmit_on_queue (void *cls);
1458
1459
1460 /**
1461  * Schedule next run of #transmit_on_queue().  Does NOTHING if 
1462  * we should run immediately or if the message queue is empty.
1463  * Test for no task being added AND queue not being empty to
1464  * transmit immediately afterwards!  This function must only
1465  * be called if the message queue is non-empty!
1466  *
1467  * @param queue the queue to do scheduling for
1468  */ 
1469 static void
1470 schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1471 {
1472   struct Neighbour *n = queue->neighbour;
1473   struct PendingMessage *pm = n->pending_msg_head;
1474   struct GNUNET_TIME_Relative out_delay;
1475   unsigned int wsize;
1476
1477   GNUNET_assert (NULL != pm);
1478   if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1479   {
1480     GNUNET_STATISTICS_update (GST_stats,
1481                               "# Transmission throttled due to communicator queue limit",
1482                               1,
1483                               GNUNET_NO);    
1484     return;
1485   }
1486   if (queue->queue_length >= SESSION_QUEUE_LIMIT)
1487   {
1488     GNUNET_STATISTICS_update (GST_stats,
1489                               "# Transmission throttled due to session queue limit",
1490                               1,
1491                               GNUNET_NO);    
1492     return;
1493   }
1494       
1495   wsize = (0 == queue->mtu)
1496     ? pm->bytes_msg /* FIXME: add overheads? */
1497     : queue->mtu;
1498   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1499                                                   wsize);
1500   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1501                                         out_delay);
1502   if (0 == out_delay.rel_value_us)
1503     return; /* we should run immediately! */
1504   /* queue has changed since we were scheduled, reschedule again */
1505   queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1506                                                        &transmit_on_queue,
1507                                                        queue);
1508   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1509     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1510                 "Next transmission on queue `%s' in %s (high delay)\n",
1511                 queue->address,
1512                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1513                                                         GNUNET_YES));
1514   else
1515     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1516                 "Next transmission on queue `%s' in %s\n",
1517                 queue->address,
1518                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1519                                                         GNUNET_YES));
1520 }
1521
1522
1523 /**
1524  * Free @a session.
1525  *
1526  * @param session the session to free
1527  */
1528 static void
1529 free_session (struct GNUNET_ATS_Session *session)
1530 {
1531   struct Neighbour *neighbour = session->neighbour;
1532   struct TransportClient *tc = session->tc;
1533   struct MonitorEvent me = {
1534     .cs = GNUNET_TRANSPORT_CS_DOWN,
1535     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
1536   };
1537   struct QueueEntry *qe;
1538   int maxxed;
1539
1540   if (NULL != session->transmit_task)
1541   {
1542     GNUNET_SCHEDULER_cancel (session->transmit_task);
1543     session->transmit_task = NULL;
1544   }
1545   GNUNET_CONTAINER_MDLL_remove (neighbour,
1546                                 neighbour->session_head,
1547                                 neighbour->session_tail,
1548                                 session);
1549   GNUNET_CONTAINER_MDLL_remove (client,
1550                                 tc->details.communicator.session_head,
1551                                 tc->details.communicator.session_tail,
1552                                 session);
1553   maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
1554   while (NULL != (qe = session->queue_head))
1555   {
1556     GNUNET_CONTAINER_DLL_remove (session->queue_head,
1557                                  session->queue_tail,
1558                                  qe);
1559     session->queue_length--;
1560     tc->details.communicator.total_queue_length--;
1561     GNUNET_free (qe);
1562   }
1563   GNUNET_assert (0 == session->queue_length);
1564   if ( (maxxed) &&
1565        (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
1566   {
1567     /* Communicator dropped below threshold, resume all queues */
1568     GNUNET_STATISTICS_update (GST_stats,
1569                               "# Transmission throttled due to communicator queue limit",
1570                               -1,
1571                               GNUNET_NO);
1572     for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
1573          NULL != s;
1574          s = s->next_client)
1575       schedule_transmit_on_queue (s);
1576   }
1577   notify_monitors (&neighbour->pid,
1578                    session->address,
1579                    session->nt,
1580                    &me);
1581   GNUNET_ATS_session_del (session->sr);
1582   GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
1583   GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
1584   GNUNET_free (session);
1585   if (NULL == neighbour->session_head)
1586   {
1587     cores_send_disconnect_info (&neighbour->pid);
1588     free_neighbour (neighbour);
1589   }
1590 }
1591
1592
1593 /**
1594  * Free @a ale
1595  *
1596  * @param ale address list entry to free
1597  */
1598 static void
1599 free_address_list_entry (struct AddressListEntry *ale)
1600 {
1601   struct TransportClient *tc = ale->tc;
1602
1603   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
1604                                tc->details.communicator.addr_tail,
1605                                ale);
1606   if (NULL != ale->sc)
1607   {
1608     GNUNET_PEERSTORE_store_cancel (ale->sc);
1609     ale->sc = NULL;
1610   }
1611   if (NULL != ale->st)
1612   {
1613     GNUNET_SCHEDULER_cancel (ale->st);
1614     ale->st = NULL;
1615   }
1616   GNUNET_free (ale);
1617 }
1618
1619
1620 /**
1621  * Called whenever a client is disconnected.  Frees our
1622  * resources associated with that client.
1623  *
1624  * @param cls closure, NULL
1625  * @param client identification of the client
1626  * @param app_ctx our `struct TransportClient`
1627  */
1628 static void
1629 client_disconnect_cb (void *cls,
1630                       struct GNUNET_SERVICE_Client *client,
1631                       void *app_ctx)
1632 {
1633   struct TransportClient *tc = app_ctx;
1634
1635   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636               "Client %p disconnected, cleaning up.\n",
1637               tc);
1638   GNUNET_CONTAINER_DLL_remove (clients_head,
1639                                clients_tail,
1640                                tc);
1641   switch (tc->type)
1642   {
1643   case CT_NONE:
1644     break;
1645   case CT_CORE:
1646     {
1647       struct PendingMessage *pm;
1648
1649       while (NULL != (pm = tc->details.core.pending_msg_head))
1650       {
1651         GNUNET_CONTAINER_MDLL_remove (client,
1652                                       tc->details.core.pending_msg_head,
1653                                       tc->details.core.pending_msg_tail,
1654                                       pm);
1655         pm->client = NULL;
1656       }
1657     }
1658     break;
1659   case CT_MONITOR:
1660     break;
1661   case CT_COMMUNICATOR:
1662     {
1663       struct GNUNET_ATS_Session *q;
1664       struct AddressListEntry *ale;
1665
1666       while (NULL != (q = tc->details.communicator.session_head))
1667         free_session (q);
1668       while (NULL != (ale = tc->details.communicator.addr_head))
1669         free_address_list_entry (ale);
1670       GNUNET_free (tc->details.communicator.address_prefix);
1671     }
1672     break;
1673   }
1674   GNUNET_free (tc);
1675 }
1676
1677
1678 /**
1679  * Iterator telling new CORE client about all existing
1680  * connections to peers.
1681  *
1682  * @param cls the new `struct TransportClient`
1683  * @param pid a connected peer
1684  * @param value the `struct Neighbour` with more information
1685  * @return #GNUNET_OK (continue to iterate)
1686  */
1687 static int
1688 notify_client_connect_info (void *cls,
1689                             const struct GNUNET_PeerIdentity *pid,
1690                             void *value)
1691 {
1692   struct TransportClient *tc = cls;
1693   struct Neighbour *neighbour = value;
1694
1695   core_send_connect_info (tc,
1696                           pid,
1697                           neighbour->quota_out);
1698   return GNUNET_OK;
1699 }
1700
1701
1702 /**
1703  * Initialize a "CORE" client.  We got a start message from this
1704  * client, so add it to the list of clients for broadcasting of
1705  * inbound messages.
1706  *
1707  * @param cls the client
1708  * @param start the start message that was sent
1709  */
1710 static void
1711 handle_client_start (void *cls,
1712                      const struct StartMessage *start)
1713 {
1714   struct TransportClient *tc = cls;
1715   uint32_t options;
1716
1717   options = ntohl (start->options);
1718   if ( (0 != (1 & options)) &&
1719        (0 !=
1720         memcmp (&start->self,
1721                 &GST_my_identity,
1722                 sizeof (struct GNUNET_PeerIdentity)) ) )
1723   {
1724     /* client thinks this is a different peer, reject */
1725     GNUNET_break (0);
1726     GNUNET_SERVICE_client_drop (tc->client);
1727     return;
1728   }
1729   if (CT_NONE != tc->type)
1730   {
1731     GNUNET_break (0);
1732     GNUNET_SERVICE_client_drop (tc->client);
1733     return;
1734   }
1735   tc->type = CT_CORE;
1736   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1737                                          &notify_client_connect_info,
1738                                          tc);
1739   GNUNET_SERVICE_client_continue (tc->client);
1740 }
1741
1742
1743 /**
1744  * Client asked for transmission to a peer.  Process the request.
1745  *
1746  * @param cls the client
1747  * @param obm the send message that was sent
1748  */
1749 static int
1750 check_client_send (void *cls,
1751                    const struct OutboundMessage *obm)
1752 {
1753   struct TransportClient *tc = cls;
1754   uint16_t size;
1755   const struct GNUNET_MessageHeader *obmm;
1756
1757   if (CT_CORE != tc->type)
1758   {
1759     GNUNET_break (0);
1760     return GNUNET_SYSERR;
1761   }
1762   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
1763   if (size < sizeof (struct GNUNET_MessageHeader))
1764   {
1765     GNUNET_break (0);
1766     return GNUNET_SYSERR;
1767   }
1768   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1769   if (size != ntohs (obmm->size))
1770   {
1771     GNUNET_break (0);
1772     return GNUNET_SYSERR;
1773   }
1774   return GNUNET_OK;
1775 }
1776
1777
1778 /**
1779  * Free fragment tree below @e root, excluding @e root itself.
1780  *
1781  * @param root root of the tree to free
1782  */  
1783 static void
1784 free_fragment_tree (struct PendingMessage *root)
1785 {
1786   struct PendingMessage *frag;
1787
1788   while (NULL != (frag = root->head_frag))
1789   {
1790     free_fragment_tree (frag);
1791     GNUNET_CONTAINER_MDLL_remove (frag,
1792                                   root->head_frag,
1793                                   root->tail_frag,
1794                                   frag);
1795     GNUNET_free (frag);
1796   }
1797 }
1798
1799
1800 /**
1801  * Send a response to the @a pm that we have processed a
1802  * "send" request with status @a success. We
1803  * transmitted @a bytes_physical on the actual wire.
1804  * Sends a confirmation to the "core" client responsible
1805  * for the original request and free's @a pm.
1806  *
1807  * @param pm handle to the original pending message
1808  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
1809  *          for transmission failure
1810  * @param bytes_physical amount of bandwidth consumed
1811  */
1812 static void
1813 client_send_response (struct PendingMessage *pm,
1814                       int success,
1815                       uint32_t bytes_physical)
1816 {
1817   struct TransportClient *tc = pm->client;
1818   struct Neighbour *target = pm->target;
1819   struct GNUNET_MQ_Envelope *env;
1820   struct SendOkMessage *som;
1821
1822   if (NULL != tc)
1823   {
1824     env = GNUNET_MQ_msg (som,
1825                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1826     som->success = htonl ((uint32_t) success);
1827     som->bytes_msg = htons (pm->bytes_msg);
1828     som->bytes_physical = htonl (bytes_physical);
1829     som->peer = target->pid;
1830     GNUNET_MQ_send (tc->mq,
1831                     env);
1832     GNUNET_CONTAINER_MDLL_remove (client,
1833                                   tc->details.core.pending_msg_head,
1834                                   tc->details.core.pending_msg_tail,
1835                                   pm);
1836   }
1837   GNUNET_CONTAINER_MDLL_remove (neighbour,
1838                                 target->pending_msg_head,
1839                                 target->pending_msg_tail,
1840                                 pm);
1841   free_fragment_tree (pm);
1842   GNUNET_free (pm);
1843 }
1844
1845
1846 /**
1847  * Checks the message queue for a neighbour for messages that have timed
1848  * out and purges them.
1849  *
1850  * @param cls a `struct Neighbour`
1851  */
1852 static void
1853 check_queue_timeouts (void *cls)
1854 {
1855   struct Neighbour *n = cls;
1856   struct PendingMessage *pm;
1857   struct GNUNET_TIME_Absolute now;
1858   struct GNUNET_TIME_Absolute earliest_timeout;
1859
1860   n->timeout_task = NULL;
1861   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1862   now = GNUNET_TIME_absolute_get ();
1863   for (struct PendingMessage *pos = n->pending_msg_head;
1864        NULL != pos;
1865        pos = pm)
1866   {
1867     pm = pos->next_neighbour;
1868     if (pos->timeout.abs_value_us <= now.abs_value_us)
1869     {
1870       GNUNET_STATISTICS_update (GST_stats,
1871                                 "# messages dropped (timeout before confirmation)",
1872                                 1,
1873                                 GNUNET_NO);
1874       client_send_response (pm,
1875                             GNUNET_NO,
1876                             0);      
1877       continue;
1878     }
1879     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
1880                                                  pos->timeout);
1881   }
1882   n->earliest_timeout = earliest_timeout;
1883   if (NULL != n->pending_msg_head)
1884     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
1885                                                &check_queue_timeouts,
1886                                                n);
1887 }
1888
1889
1890 /**
1891  * Client asked for transmission to a peer.  Process the request.
1892  *
1893  * @param cls the client
1894  * @param obm the send message that was sent
1895  */
1896 static void
1897 handle_client_send (void *cls,
1898                     const struct OutboundMessage *obm)
1899 {
1900   struct TransportClient *tc = cls;
1901   struct PendingMessage *pm;
1902   const struct GNUNET_MessageHeader *obmm;
1903   struct Neighbour *target;
1904   uint32_t bytes_msg;
1905
1906   GNUNET_assert (CT_CORE == tc->type);
1907   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1908   bytes_msg = ntohs (obmm->size);
1909   target = lookup_neighbour (&obm->peer);
1910   if (NULL == target)
1911   {
1912     /* Failure: don't have this peer as a neighbour (anymore).
1913        Might have gone down asynchronously, so this is NOT
1914        a protocol violation by CORE. Still count the event,
1915        as this should be rare. */
1916     struct GNUNET_MQ_Envelope *env;
1917     struct SendOkMessage *som;
1918
1919     env = GNUNET_MQ_msg (som,
1920                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1921     som->success = htonl (GNUNET_SYSERR);
1922     som->bytes_msg = htonl (bytes_msg);
1923     som->bytes_physical = htonl (0);
1924     som->peer = obm->peer;
1925     GNUNET_MQ_send (tc->mq,
1926                     env);
1927     GNUNET_SERVICE_client_continue (tc->client);
1928     GNUNET_STATISTICS_update (GST_stats,
1929                               "# messages dropped (neighbour unknown)",
1930                               1,
1931                               GNUNET_NO);
1932     return;
1933   }
1934   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
1935   pm->client = tc;
1936   pm->target = target;
1937   pm->bytes_msg = bytes_msg;
1938   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
1939   memcpy (&pm[1],
1940           &obm[1],
1941           bytes_msg);
1942   GNUNET_CONTAINER_MDLL_insert (neighbour,
1943                                 target->pending_msg_head,
1944                                 target->pending_msg_tail,
1945                                 pm);
1946   GNUNET_CONTAINER_MDLL_insert (client,
1947                                 tc->details.core.pending_msg_head,
1948                                 tc->details.core.pending_msg_tail,
1949                                 pm);
1950   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
1951   {
1952     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
1953     if (NULL != target->timeout_task)
1954       GNUNET_SCHEDULER_cancel (target->timeout_task);
1955     target->timeout_task 
1956       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
1957                                  &check_queue_timeouts,
1958                                  target);
1959   }
1960 }
1961
1962
1963 /**
1964  * Communicator started.  Test message is well-formed.
1965  *
1966  * @param cls the client
1967  * @param cam the send message that was sent
1968  */
1969 static int
1970 check_communicator_available (void *cls,
1971                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
1972 {
1973   struct TransportClient *tc = cls;
1974   uint16_t size;
1975
1976   if (CT_NONE != tc->type)
1977   {
1978     GNUNET_break (0);
1979     return GNUNET_SYSERR;
1980   }
1981   tc->type = CT_COMMUNICATOR;
1982   size = ntohs (cam->header.size) - sizeof (*cam);
1983   if (0 == size)
1984     return GNUNET_OK; /* receive-only communicator */
1985   GNUNET_MQ_check_zero_termination (cam);
1986   return GNUNET_OK;
1987 }
1988
1989
1990 /**
1991  * Communicator started.  Process the request.
1992  *
1993  * @param cls the client
1994  * @param cam the send message that was sent
1995  */
1996 static void
1997 handle_communicator_available (void *cls,
1998                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
1999 {
2000   struct TransportClient *tc = cls;
2001   uint16_t size;
2002
2003   size = ntohs (cam->header.size) - sizeof (*cam);
2004   if (0 == size)
2005     return; /* receive-only communicator */
2006   tc->details.communicator.address_prefix
2007     = GNUNET_strdup ((const char *) &cam[1]);
2008   tc->details.communicator.cc
2009     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2010   GNUNET_SERVICE_client_continue (tc->client);
2011 }
2012
2013
2014 /**
2015  * Address of our peer added.  Test message is well-formed.
2016  *
2017  * @param cls the client
2018  * @param aam the send message that was sent
2019  */
2020 static int
2021 check_add_address (void *cls,
2022                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2023 {
2024   struct TransportClient *tc = cls;
2025
2026   if (CT_COMMUNICATOR != tc->type)
2027   {
2028     GNUNET_break (0);
2029     return GNUNET_SYSERR;
2030   }
2031   GNUNET_MQ_check_zero_termination (aam);
2032   return GNUNET_OK;
2033 }
2034
2035
2036 /**
2037  * Ask peerstore to store our address.
2038  *
2039  * @param cls an `struct AddressListEntry *`
2040  */
2041 static void
2042 store_pi (void *cls);
2043
2044
2045 /**
2046  * Function called when peerstore is done storing our address.
2047  */
2048 static void
2049 peerstore_store_cb (void *cls,
2050                     int success)
2051 {
2052   struct AddressListEntry *ale = cls;
2053
2054   ale->sc = NULL;
2055   if (GNUNET_YES != success)
2056     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2057                 "Failed to store our own address `%s' in peerstore!\n",
2058                 ale->address);
2059   /* refresh period is 1/4 of expiration time, that should be plenty
2060      without being excessive. */
2061   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
2062                                                                        4ULL),
2063                                           &store_pi,
2064                                           ale);
2065 }
2066
2067
2068 /**
2069  * Ask peerstore to store our address.
2070  *
2071  * @param cls an `struct AddressListEntry *`
2072  */
2073 static void
2074 store_pi (void *cls)
2075 {
2076   struct AddressListEntry *ale = cls;
2077   void *addr;
2078   size_t addr_len;
2079   struct GNUNET_TIME_Absolute expiration;
2080
2081   ale->st = NULL;
2082   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
2083   GNUNET_HELLO_sign_address (ale->address,
2084                              ale->nt,
2085                              expiration,
2086                              GST_my_private_key,
2087                              &addr,
2088                              &addr_len);
2089   ale->sc = GNUNET_PEERSTORE_store (peerstore,
2090                                     "transport",
2091                                     &GST_my_identity,
2092                                     GNUNET_HELLO_PEERSTORE_KEY,
2093                                     addr,
2094                                     addr_len,
2095                                     expiration,
2096                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
2097                                     &peerstore_store_cb,
2098                                     ale);
2099   GNUNET_free (addr);
2100   if (NULL == ale->sc)
2101   {
2102     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2103                 "Failed to store our address `%s' with peerstore\n",
2104                 ale->address);
2105     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
2106                                             &store_pi,
2107                                             ale);
2108   }
2109 }
2110
2111
2112 /**
2113  * Address of our peer added.  Process the request.
2114  *
2115  * @param cls the client
2116  * @param aam the send message that was sent
2117  */
2118 static void
2119 handle_add_address (void *cls,
2120                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2121 {
2122   struct TransportClient *tc = cls;
2123   struct AddressListEntry *ale;
2124   size_t slen;
2125
2126   slen = ntohs (aam->header.size) - sizeof (*aam);
2127   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
2128   ale->tc = tc;
2129   ale->address = (const char *) &ale[1];
2130   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
2131   ale->aid = aam->aid;
2132   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
2133   memcpy (&ale[1],
2134           &aam[1],
2135           slen);
2136   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
2137                                tc->details.communicator.addr_tail,
2138                                ale);
2139   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
2140                                       ale);
2141   GNUNET_SERVICE_client_continue (tc->client);
2142 }
2143
2144
2145 /**
2146  * Address of our peer deleted.  Process the request.
2147  *
2148  * @param cls the client
2149  * @param dam the send message that was sent
2150  */
2151 static void
2152 handle_del_address (void *cls,
2153                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
2154 {
2155   struct TransportClient *tc = cls;
2156
2157   if (CT_COMMUNICATOR != tc->type)
2158   {
2159     GNUNET_break (0);
2160     GNUNET_SERVICE_client_drop (tc->client);
2161     return;
2162   }
2163   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
2164        NULL != ale;
2165        ale = ale->next)
2166   {
2167     if (dam->aid != ale->aid)
2168       continue;
2169     GNUNET_assert (ale->tc == tc);
2170     free_address_list_entry (ale);
2171     GNUNET_SERVICE_client_continue (tc->client);
2172   }
2173   GNUNET_break (0);
2174   GNUNET_SERVICE_client_drop (tc->client);
2175 }
2176
2177
2178 /**
2179  * Client notified us about transmission from a peer.  Process the request.
2180  *
2181  * @param cls the client
2182  * @param obm the send message that was sent
2183  */
2184 static int
2185 check_incoming_msg (void *cls,
2186                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
2187 {
2188   struct TransportClient *tc = cls;
2189   uint16_t size;
2190   const struct GNUNET_MessageHeader *obmm;
2191
2192   if (CT_COMMUNICATOR != tc->type)
2193   {
2194     GNUNET_break (0);
2195     return GNUNET_SYSERR;
2196   }
2197   size = ntohs (im->header.size) - sizeof (*im);
2198   if (size < sizeof (struct GNUNET_MessageHeader))
2199   {
2200     GNUNET_break (0);
2201     return GNUNET_SYSERR;
2202   }
2203   obmm = (const struct GNUNET_MessageHeader *) &im[1];
2204   if (size != ntohs (obmm->size))
2205   {
2206     GNUNET_break (0);
2207     return GNUNET_SYSERR;
2208   }
2209   return GNUNET_OK;
2210 }
2211
2212
2213 /**
2214  * Incoming meessage.  Process the request.
2215  *
2216  * @param cls the client
2217  * @param im the send message that was received
2218  */
2219 static void
2220 handle_incoming_msg (void *cls,
2221                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
2222 {
2223   struct TransportClient *tc = cls;
2224
2225   GNUNET_SERVICE_client_continue (tc->client);
2226 }
2227
2228
2229 /**
2230  * New queue became available.  Check message.
2231  *
2232  * @param cls the client
2233  * @param aqm the send message that was sent
2234  */
2235 static int
2236 check_add_queue_message (void *cls,
2237                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
2238 {
2239   struct TransportClient *tc = cls;
2240
2241   if (CT_COMMUNICATOR != tc->type)
2242   {
2243     GNUNET_break (0);
2244     return GNUNET_SYSERR;
2245   }
2246   GNUNET_MQ_check_zero_termination (aqm);
2247   return GNUNET_OK;
2248 }
2249
2250
2251 /**
2252  * Bandwidth tracker informs us that the delay until we should receive
2253  * more has changed.
2254  *
2255  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
2256  */
2257 static void
2258 tracker_update_in_cb (void *cls)
2259 {
2260   struct GNUNET_ATS_Session *queue = cls;
2261   struct GNUNET_TIME_Relative in_delay;
2262   unsigned int rsize;
2263   
2264   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
2265   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
2266                                                  rsize);
2267   // FIXME: how exactly do we do inbound flow control?
2268 }
2269
2270
2271 /**
2272  * Fragment the given @a pm to the given @a mtu.  Adds 
2273  * additional fragments to the neighbour as well. If the
2274  * @a mtu is too small, generates and error for the @a pm
2275  * and returns NULL.
2276  *
2277  * @param pm pending message to fragment for transmission
2278  * @param mtu MTU to apply
2279  * @return new message to transmit
2280  */
2281 static struct PendingMessage *
2282 fragment_message (struct PendingMessage *pm,
2283                   uint16_t mtu)
2284 {
2285   struct PendingMessage *ff;
2286
2287   if (GNUNET_NO == pm->msg_uuid_set)
2288   {
2289     GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
2290                                 &pm->msg_uuid,
2291                                 sizeof (pm->msg_uuid));
2292     pm->msg_uuid_set = GNUNET_YES;
2293   }
2294   
2295   /* This invariant is established in #handle_add_queue_message() */
2296   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
2297
2298   /* select fragment for transmission, descending the tree if it has
2299      been expanded until we are at a leaf or at a fragment that is small enough */
2300   ff = pm;
2301   while ( ( (ff->bytes_msg > mtu) ||
2302             (pm == ff) ) &&
2303           (ff->frag_off == ff->bytes_msg) &&
2304           (NULL != ff->head_frag) )
2305   {
2306     ff = ff->head_frag; /* descent into fragmented fragments */
2307   }
2308
2309   if ( ( (ff->bytes_msg > mtu) ||
2310          (pm == ff) ) &&
2311        (pm->frag_off < pm->bytes_msg) )
2312   {
2313     /* Did not yet calculate all fragments, calculate next fragment */
2314     struct PendingMessage *frag;
2315     struct TransportFragmentBox tfb;
2316     const char *orig;
2317     char *msg;
2318     uint16_t fragmax;
2319     uint16_t fragsize;
2320     uint16_t msize;
2321     uint16_t xoff = 0;
2322
2323     orig = (const char *) &ff[1];
2324     msize = ff->bytes_msg;
2325     if (pm != ff)
2326     {
2327       const struct TransportFragmentBox *tfbo;
2328
2329       tfbo = (const struct TransportFragmentBox *) orig;
2330       orig += sizeof (struct TransportFragmentBox);
2331       msize -= sizeof (struct TransportFragmentBox);
2332       xoff = ntohs (tfbo->frag_off);
2333     }
2334     fragmax = mtu - sizeof (struct TransportFragmentBox);
2335     fragsize = GNUNET_MIN (msize - ff->frag_off,
2336                            fragmax);
2337     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
2338                           sizeof (struct TransportFragmentBox) +
2339                           fragsize);
2340     frag->target = pm->target;
2341     frag->frag_parent = ff;
2342     frag->timeout = pm->timeout;
2343     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
2344     frag->pmt = PMT_FRAGMENT_BOX;
2345     msg = (char *) &frag[1];
2346     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
2347     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
2348                              fragsize);
2349     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
2350     tfb.msg_uuid = pm->msg_uuid;
2351     tfb.frag_off = htons (ff->frag_off + xoff);
2352     tfb.msg_size = htons (pm->bytes_msg);
2353     memcpy (msg,
2354             &tfb,
2355             sizeof (tfb));
2356     memcpy (&msg[sizeof (tfb)],
2357             &orig[ff->frag_off],
2358             fragsize);
2359     GNUNET_CONTAINER_MDLL_insert (frag,
2360                                   ff->head_frag,
2361                                   ff->tail_frag,
2362                                   frag);
2363     ff->frag_off += fragsize;
2364     ff = frag;
2365   }
2366
2367   /* Move head to the tail and return it */
2368   GNUNET_CONTAINER_MDLL_remove (frag,
2369                                 ff->frag_parent->head_frag,
2370                                 ff->frag_parent->tail_frag,
2371                                 ff);
2372   GNUNET_CONTAINER_MDLL_insert_tail (frag,
2373                                      ff->frag_parent->head_frag,
2374                                      ff->frag_parent->tail_frag,
2375                                      ff);
2376   return ff;
2377 }
2378
2379
2380 /**
2381  * Reliability-box the given @a pm. On error (can there be any), NULL
2382  * may be returned, otherwise the "replacement" for @a pm (which
2383  * should then be added to the respective neighbour's queue instead of
2384  * @a pm).  If the @a pm is already fragmented or reliability boxed,
2385  * or itself an ACK, this function simply returns @a pm.
2386  *
2387  * @param pm pending message to box for transmission over unreliabile queue
2388  * @return new message to transmit
2389  */
2390 static struct PendingMessage *
2391 reliability_box_message (struct PendingMessage *pm)
2392 {
2393   if (PMT_CORE != pm->pmt) 
2394   {
2395     /* already fragmented or reliability boxed, or control message: do nothing */
2396     return pm;
2397   }
2398   
2399   if (0) // FIXME
2400   {
2401     /* failed hard */
2402     // FIMXE: bitch
2403     client_send_response (pm,
2404                           GNUNET_NO,
2405                           0);
2406     return NULL;
2407   }
2408
2409   /* FIXME: return boxed PM here! */
2410   return NULL;
2411 }
2412
2413
2414 /**
2415  * We believe we are ready to transmit a message on a queue. Double-checks
2416  * with the queue's "tracker_out" and then gives the message to the 
2417  * communicator for transmission (updating the tracker, and re-scheduling
2418  * itself if applicable).  
2419  *
2420  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
2421  */ 
2422 static void
2423 transmit_on_queue (void *cls)
2424 {
2425   struct GNUNET_ATS_Session *queue = cls;
2426   struct Neighbour *n = queue->neighbour;
2427   struct QueueEntry *qe;
2428   struct PendingMessage *pm;
2429   struct PendingMessage *s;
2430   uint32_t overhead;
2431   struct GNUNET_TRANSPORT_SendMessageTo *smt;
2432   struct GNUNET_MQ_Envelope *env;
2433
2434   queue->transmit_task = NULL;
2435   if (NULL == (pm = n->pending_msg_head))
2436   {
2437     /* no message pending, nothing to do here! */
2438     return; 
2439   }
2440   schedule_transmit_on_queue (queue);
2441   if (NULL != queue->transmit_task)
2442     return; /* do it later */
2443   overhead = 0;
2444   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
2445     overhead += sizeof (struct TransportReliabilityBox);
2446   s = pm;
2447   if ( ( (0 != queue->mtu) &&
2448          (pm->bytes_msg + overhead > queue->mtu) ) ||
2449        (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
2450        (NULL != pm->head_frag /* fragments already exist, should
2451                                  respect that even if MTU is 0 for
2452                                  this queue */) )
2453     s = fragment_message (s,
2454                           (0 == queue->mtu)
2455                           ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
2456                           : queue->mtu);
2457   if (NULL == s)
2458   {
2459     /* Fragmentation failed, try next message... */
2460     schedule_transmit_on_queue (queue);
2461     return;
2462   }
2463   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
2464     s = reliability_box_message (s);
2465   if (NULL == s)
2466   {
2467     /* Reliability boxing failed, try next message... */
2468     schedule_transmit_on_queue (queue);
2469     return;
2470   }
2471
2472   /* Pass 's' for transission to the communicator */
2473   qe = GNUNET_new (struct QueueEntry);
2474   qe->mid = queue->mid_gen++;
2475   qe->session = queue;
2476   // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
2477   GNUNET_CONTAINER_DLL_insert (queue->queue_head,
2478                                queue->queue_tail,
2479                                qe);
2480   env = GNUNET_MQ_msg_extra (smt,
2481                              s->bytes_msg,
2482                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
2483   smt->qid = queue->qid;
2484   smt->mid = qe->mid;
2485   smt->receiver = n->pid;
2486   memcpy (&smt[1],
2487           &s[1],
2488           s->bytes_msg);
2489   GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
2490   queue->queue_length++;
2491   queue->tc->details.communicator.total_queue_length++;
2492   GNUNET_MQ_send (queue->tc->mq,
2493                   env);
2494   
2495   // FIXME: do something similar to the logic below
2496   // in defragmentation / reliability ACK handling!
2497
2498   /* Check if this transmission somehow conclusively finished handing 'pm'
2499      even without any explicit ACKs */
2500   if ( (PMT_CORE == s->pmt) &&
2501        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
2502   {
2503     /* Full message sent, and over reliabile channel */
2504     client_send_response (pm,
2505                           GNUNET_YES,
2506                           pm->bytes_msg);
2507   }
2508   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
2509             (PMT_FRAGMENT_BOX == s->pmt) )
2510   {
2511     struct PendingMessage *pos;
2512     
2513     /* Fragment sent over reliabile channel */
2514     free_fragment_tree (s);
2515     pos = s->frag_parent;
2516     GNUNET_CONTAINER_MDLL_remove (frag,
2517                                   pos->head_frag,
2518                                   pos->tail_frag,
2519                                   s);
2520     GNUNET_free (s);
2521     /* check if subtree is done */
2522     while ( (NULL == pos->head_frag) &&
2523             (pos->frag_off == pos->bytes_msg) &&
2524             (pos != pm) )
2525     {
2526       s = pos;
2527       pos = s->frag_parent;
2528       GNUNET_CONTAINER_MDLL_remove (frag,
2529                                     pos->head_frag,
2530                                     pos->tail_frag,
2531                                     s);
2532       GNUNET_free (s);      
2533     }
2534     
2535     /* Was this the last applicable fragmment? */
2536     if ( (NULL == pm->head_frag) &&
2537          (pm->frag_off == pm->bytes_msg) )
2538       client_send_response (pm,
2539                             GNUNET_YES,
2540                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
2541   }
2542   else if (PMT_CORE != pm->pmt)
2543   {
2544     /* This was an acknowledgement of some type, always free */
2545
2546     struct Neighbour *neighbour = pm->target;
2547     GNUNET_CONTAINER_MDLL_remove (neighbour,
2548                                   neighbour->pending_msg_head,
2549                                   neighbour->pending_msg_tail,
2550                                   pm);
2551     GNUNET_free (pm);
2552   }
2553   else
2554   {
2555     /* message not finished, waiting for acknowledgement */
2556     // FIXME: update time by which we might retransmit 's' based on
2557     // queue characteristics (i.e. RTT)
2558     
2559     // FIXME: move 'pm' back in the transmission queue (simplistic: to
2560     // the end, better: with position depending on type, timeout,
2561     // etc.)
2562   }
2563   
2564   /* finally, re-schedule self */
2565   schedule_transmit_on_queue (queue);
2566 }
2567
2568
2569 /**
2570  * Bandwidth tracker informs us that the delay until we
2571  * can transmit again changed.
2572  *
2573  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
2574  */
2575 static void
2576 tracker_update_out_cb (void *cls)
2577 {
2578   struct GNUNET_ATS_Session *queue = cls;
2579   struct Neighbour *n = queue->neighbour;
2580
2581   if (NULL == n->pending_msg_head)
2582   {
2583     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2584                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
2585                 queue->address);
2586     return; /* no message pending, nothing to do here! */
2587   }
2588   GNUNET_SCHEDULER_cancel (queue->transmit_task);
2589   queue->transmit_task = NULL;
2590   schedule_transmit_on_queue (queue);
2591 }
2592
2593
2594 /**
2595  * Bandwidth tracker informs us that excessive outbound bandwidth was
2596  * allocated which is not being used.
2597  *
2598  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
2599  */
2600 static void
2601 tracker_excess_out_cb (void *cls)
2602 {
2603   /* FIXME: trigger excess bandwidth report to core? Right now,
2604      this is done internally within transport_api2_core already,
2605      but we probably want to change the logic and trigger it 
2606      from here via a message instead! */
2607   /* TODO: maybe inform ATS at this point? */
2608   GNUNET_STATISTICS_update (GST_stats,
2609                             "# Excess outbound bandwidth reported",
2610                             1,
2611                             GNUNET_NO);    
2612 }
2613
2614
2615
2616 /**
2617  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
2618  * which is not being used.
2619  *
2620  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
2621  */
2622 static void
2623 tracker_excess_in_cb (void *cls)
2624 {
2625   /* TODO: maybe inform ATS at this point? */
2626   GNUNET_STATISTICS_update (GST_stats,
2627                             "# Excess inbound bandwidth reported",
2628                             1,
2629                             GNUNET_NO);    
2630 }
2631
2632
2633 /**
2634  * New queue became available.  Process the request.
2635  *
2636  * @param cls the client
2637  * @param aqm the send message that was sent
2638  */
2639 static void
2640 handle_add_queue_message (void *cls,
2641                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
2642 {
2643   struct TransportClient *tc = cls;
2644   struct GNUNET_ATS_Session *queue;
2645   struct Neighbour *neighbour;
2646   const char *addr;
2647   uint16_t addr_len;
2648
2649   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
2650   {
2651     /* MTU so small as to be useless for transmissions,
2652        required for #fragment_message()! */
2653     GNUNET_break_op (0);
2654     GNUNET_SERVICE_client_drop (tc->client);
2655     return;
2656   }
2657   neighbour = lookup_neighbour (&aqm->receiver);
2658   if (NULL == neighbour)
2659   {
2660     neighbour = GNUNET_new (struct Neighbour);
2661     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2662     neighbour->pid = aqm->receiver;
2663     GNUNET_assert (GNUNET_OK ==
2664                    GNUNET_CONTAINER_multipeermap_put (neighbours,
2665                                                       &neighbour->pid,
2666                                                       neighbour,
2667                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2668     cores_send_connect_info (&neighbour->pid,
2669                              GNUNET_BANDWIDTH_ZERO);
2670   }
2671   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
2672   addr = (const char *) &aqm[1];
2673
2674   queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
2675   queue->tc = tc;
2676   queue->address = (const char *) &queue[1];
2677   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
2678   queue->qid = aqm->qid;
2679   queue->mtu = ntohl (aqm->mtu);
2680   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
2681   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
2682   queue->neighbour = neighbour;
2683   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
2684                                   &tracker_update_in_cb,
2685                                   queue,
2686                                   GNUNET_BANDWIDTH_ZERO,
2687                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
2688                                   &tracker_excess_in_cb,
2689                                   queue);
2690   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
2691                                   &tracker_update_out_cb,
2692                                   queue,
2693                                   GNUNET_BANDWIDTH_ZERO,
2694                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
2695                                   &tracker_excess_out_cb,
2696                                   queue);
2697   memcpy (&queue[1],
2698           addr,
2699           addr_len);
2700   /* notify ATS about new queue */
2701   {
2702     struct GNUNET_ATS_Properties prop = {
2703       .delay = GNUNET_TIME_UNIT_FOREVER_REL,
2704       .mtu = queue->mtu,
2705       .nt = queue->nt,
2706       .cc = tc->details.communicator.cc
2707     };
2708     
2709     queue->sr = GNUNET_ATS_session_add (ats,
2710                                         &neighbour->pid,
2711                                         queue->address,
2712                                         queue,
2713                                         &prop);
2714     if  (NULL == queue->sr)
2715     {
2716       /* This can only happen if the 'address' was way too long for ATS
2717          (approaching 64k in strlen()!). In this case, the communicator
2718          must be buggy and we drop it. */
2719       GNUNET_break (0);
2720       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2721       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2722       GNUNET_free (queue);
2723       if (NULL == neighbour->session_head)
2724       {
2725         cores_send_disconnect_info (&neighbour->pid);
2726         free_neighbour (neighbour);
2727       }
2728       GNUNET_SERVICE_client_drop (tc->client);
2729       return;
2730     }
2731   }
2732   /* notify monitors about new queue */
2733   {
2734     struct MonitorEvent me = {
2735       .rtt = queue->rtt,
2736       .cs = queue->cs
2737     };
2738
2739     notify_monitors (&neighbour->pid,
2740                      queue->address,
2741                      queue->nt,
2742                      &me);
2743   }
2744   GNUNET_CONTAINER_MDLL_insert (neighbour,
2745                                 neighbour->session_head,
2746                                 neighbour->session_tail,
2747                                 queue);
2748   GNUNET_CONTAINER_MDLL_insert (client,
2749                                 tc->details.communicator.session_head,
2750                                 tc->details.communicator.session_tail,
2751                                 queue);
2752   GNUNET_SERVICE_client_continue (tc->client);
2753 }
2754
2755
2756 /**
2757  * Queue to a peer went down.  Process the request.
2758  *
2759  * @param cls the client
2760  * @param dqm the send message that was sent
2761  */
2762 static void
2763 handle_del_queue_message (void *cls,
2764                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
2765 {
2766   struct TransportClient *tc = cls;
2767
2768   if (CT_COMMUNICATOR != tc->type)
2769   {
2770     GNUNET_break (0);
2771     GNUNET_SERVICE_client_drop (tc->client);
2772     return;
2773   }
2774   for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
2775        NULL != session;
2776        session = session->next_client)
2777   {
2778     struct Neighbour *neighbour = session->neighbour;
2779
2780     if ( (dqm->qid != session->qid) ||
2781          (0 != memcmp (&dqm->receiver,
2782                        &neighbour->pid,
2783                        sizeof (struct GNUNET_PeerIdentity))) )
2784       continue;
2785     free_session (session);
2786     GNUNET_SERVICE_client_continue (tc->client);
2787     return;
2788   }
2789   GNUNET_break (0);
2790   GNUNET_SERVICE_client_drop (tc->client);
2791 }
2792
2793
2794 /**
2795  * Message was transmitted.  Process the request.
2796  *
2797  * @param cls the client
2798  * @param sma the send message that was sent
2799  */
2800 static void
2801 handle_send_message_ack (void *cls,
2802                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
2803 {
2804   struct TransportClient *tc = cls;
2805   struct QueueEntry *queue;
2806   
2807   if (CT_COMMUNICATOR != tc->type)
2808   {
2809     GNUNET_break (0);
2810     GNUNET_SERVICE_client_drop (tc->client);
2811     return;
2812   }
2813
2814   /* find our queue entry matching the ACK */
2815   queue = NULL;
2816   for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
2817        NULL != session;
2818        session = session->next_client)
2819   {
2820     if (0 != memcmp (&session->neighbour->pid,
2821                      &sma->receiver,
2822                      sizeof (struct GNUNET_PeerIdentity)))
2823       continue;
2824     for (struct QueueEntry *qe = session->queue_head;
2825          NULL != qe;
2826          qe = qe->next)
2827     {
2828       if (qe->mid != sma->mid)
2829         continue;
2830       queue = qe;
2831       break;
2832     }
2833     break;      
2834   }
2835   if (NULL == queue)
2836   {
2837     /* this should never happen */
2838     GNUNET_break (0); 
2839     GNUNET_SERVICE_client_drop (tc->client);
2840     return;
2841   }
2842   GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
2843                                queue->session->queue_tail,
2844                                queue);
2845   queue->session->queue_length--;
2846   tc->details.communicator.total_queue_length--;
2847   GNUNET_SERVICE_client_continue (tc->client);
2848
2849   /* if applicable, resume transmissions that waited on ACK */
2850   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
2851   {
2852     /* Communicator dropped below threshold, resume all queues */
2853     GNUNET_STATISTICS_update (GST_stats,
2854                               "# Transmission throttled due to communicator queue limit",
2855                               -1,
2856                               GNUNET_NO);
2857     for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
2858          NULL != session;
2859          session = session->next_client)
2860       schedule_transmit_on_queue (session);
2861   }
2862   else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
2863   {
2864     /* queue dropped below threshold; only resume this one queue */
2865     GNUNET_STATISTICS_update (GST_stats,
2866                               "# Transmission throttled due to session queue limit",
2867                               -1,
2868                               GNUNET_NO);    
2869     schedule_transmit_on_queue (queue->session);
2870   }
2871  
2872   /* TODO: we also should react on the status! */
2873   // FIXME: this probably requires queue->pm = s assignment!
2874   // FIXME: react to communicator status about transmission request. We got:
2875   sma->status; // OK success, SYSERR failure
2876
2877   GNUNET_free (queue);
2878 }
2879
2880
2881 /**
2882  * Iterator telling new MONITOR client about all existing
2883  * queues to peers.
2884  *
2885  * @param cls the new `struct TransportClient`
2886  * @param pid a connected peer
2887  * @param value the `struct Neighbour` with more information
2888  * @return #GNUNET_OK (continue to iterate)
2889  */
2890 static int
2891 notify_client_queues (void *cls,
2892                       const struct GNUNET_PeerIdentity *pid,
2893                       void *value)
2894 {
2895   struct TransportClient *tc = cls;
2896   struct Neighbour *neighbour = value;
2897
2898   GNUNET_assert (CT_MONITOR == tc->type);
2899   for (struct GNUNET_ATS_Session *q = neighbour->session_head;
2900        NULL != q;
2901        q = q->next_neighbour)
2902   {
2903     struct MonitorEvent me = {
2904       .rtt = q->rtt,
2905       .cs = q->cs,
2906       .num_msg_pending = q->num_msg_pending,
2907       .num_bytes_pending = q->num_bytes_pending
2908     };
2909
2910     notify_monitor (tc,
2911                     pid,
2912                     q->address,
2913                     q->nt,
2914                     &me);
2915   }
2916   return GNUNET_OK;
2917 }
2918
2919
2920 /**
2921  * Initialize a monitor client.
2922  *
2923  * @param cls the client
2924  * @param start the start message that was sent
2925  */
2926 static void
2927 handle_monitor_start (void *cls,
2928                       const struct GNUNET_TRANSPORT_MonitorStart *start)
2929 {
2930   struct TransportClient *tc = cls;
2931
2932   if (CT_NONE != tc->type)
2933   {
2934     GNUNET_break (0);
2935     GNUNET_SERVICE_client_drop (tc->client);
2936     return;
2937   }
2938   tc->type = CT_MONITOR;
2939   tc->details.monitor.peer = start->peer;
2940   tc->details.monitor.one_shot = ntohl (start->one_shot);
2941   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2942                                          &notify_client_queues,
2943                                          tc);
2944   GNUNET_SERVICE_client_mark_monitor (tc->client);
2945   GNUNET_SERVICE_client_continue (tc->client);
2946 }
2947
2948
2949 /**
2950  * Signature of a function called by ATS with the current bandwidth
2951  * allocation to be used as determined by ATS.
2952  *
2953  * @param cls closure, NULL
2954  * @param session session this is about
2955  * @param bandwidth_out assigned outbound bandwidth for the connection,
2956  *        0 to signal disconnect
2957  * @param bandwidth_in assigned inbound bandwidth for the connection,
2958  *        0 to signal disconnect
2959  */
2960 static void
2961 ats_allocation_cb (void *cls,
2962                    struct GNUNET_ATS_Session *session,
2963                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
2964                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
2965 {
2966   (void) cls;
2967   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
2968                                          bandwidth_out);
2969   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
2970                                          bandwidth_in);
2971 }
2972
2973
2974 /**
2975  * Find transport client providing communication service
2976  * for the protocol @a prefix.
2977  *
2978  * @param prefix communicator name
2979  * @return NULL if no such transport client is available
2980  */
2981 static struct TransportClient *
2982 lookup_communicator (const char *prefix)
2983 {
2984   for (struct TransportClient *tc = clients_head;
2985        NULL != tc;
2986        tc = tc->next)
2987   {
2988     if (CT_COMMUNICATOR != tc->type)
2989       continue;
2990     if (0 == strcmp (prefix,
2991                      tc->details.communicator.address_prefix))
2992       return tc;
2993   }
2994   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2995               "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
2996               prefix);
2997   return NULL;
2998 }
2999
3000
3001 /**
3002  * Signature of a function called by ATS suggesting transport to
3003  * try connecting with a particular address.
3004  *
3005  * @param cls closure, NULL
3006  * @param pid target peer
3007  * @param address the address to try
3008  */
3009 static void
3010 ats_suggestion_cb (void *cls,
3011                    const struct GNUNET_PeerIdentity *pid,
3012                    const char *address)
3013 {
3014   static uint32_t idgen;
3015   struct TransportClient *tc;
3016   char *prefix;
3017   struct GNUNET_TRANSPORT_CreateQueue *cqm;
3018   struct GNUNET_MQ_Envelope *env;
3019   size_t alen;
3020
3021   (void) cls;
3022   prefix = GNUNET_HELLO_address_to_prefix (address);
3023   if (NULL == prefix)
3024   {
3025     GNUNET_break (0); /* ATS gave invalid address!? */
3026     return;
3027   }
3028   tc = lookup_communicator (prefix);
3029   if (NULL == tc)
3030   {
3031     GNUNET_STATISTICS_update (GST_stats,
3032                               "# ATS suggestions ignored due to missing communicator",
3033                               1,
3034                               GNUNET_NO);    
3035     return;
3036   }
3037   /* forward suggestion for queue creation to communicator */
3038   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3039               "Request #%u for `%s' communicator to create queue to `%s'\n",
3040               (unsigned int) idgen,
3041               prefix,
3042               address);
3043   alen = strlen (address) + 1;
3044   env = GNUNET_MQ_msg_extra (cqm,
3045                              alen,
3046                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
3047   cqm->request_id = htonl (idgen++);
3048   cqm->receiver = *pid;
3049   memcpy (&cqm[1],
3050           address,
3051           alen);
3052   GNUNET_MQ_send (tc->mq,
3053                   env);
3054 }
3055
3056
3057 /**
3058  * Communicator tells us that our request to create a queue "worked", that
3059  * is setting up the queue is now in process.
3060  *
3061  * @param cls the `struct TransportClient`
3062  * @param cqr confirmation message
3063  */ 
3064 static void
3065 handle_queue_create_ok (void *cls,
3066                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
3067 {
3068   struct TransportClient *tc = cls;
3069
3070   if (CT_COMMUNICATOR != tc->type)
3071   {
3072     GNUNET_break (0);
3073     GNUNET_SERVICE_client_drop (tc->client);
3074     return;
3075   }
3076   GNUNET_STATISTICS_update (GST_stats,
3077                             "# ATS suggestions succeeded at communicator",
3078                             1,
3079                             GNUNET_NO);
3080   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3081               "Request #%u for communicator to create queue succeeded\n",
3082               (unsigned int) ntohs (cqr->request_id));
3083   GNUNET_SERVICE_client_continue (tc->client);    
3084 }
3085
3086
3087 /**
3088  * Communicator tells us that our request to create a queue failed. This usually
3089  * indicates that the provided address is simply invalid or that the communicator's
3090  * resources are exhausted.
3091  *
3092  * @param cls the `struct TransportClient`
3093  * @param cqr failure message
3094  */ 
3095 static void
3096 handle_queue_create_fail (void *cls,
3097                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
3098 {
3099   struct TransportClient *tc = cls;
3100
3101   if (CT_COMMUNICATOR != tc->type)
3102   {
3103     GNUNET_break (0);
3104     GNUNET_SERVICE_client_drop (tc->client);
3105     return;
3106   }
3107   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3108               "Request #%u for communicator to create queue failed\n",
3109               (unsigned int) ntohs (cqr->request_id));
3110   GNUNET_STATISTICS_update (GST_stats,
3111                             "# ATS suggestions failed in queue creation at communicator",
3112                             1,
3113                             GNUNET_NO);
3114   GNUNET_SERVICE_client_continue (tc->client);    
3115 }
3116
3117
3118 /**
3119  * Free neighbour entry.
3120  *
3121  * @param cls NULL
3122  * @param pid unused
3123  * @param value a `struct Neighbour`
3124  * @return #GNUNET_OK (always)
3125  */
3126 static int
3127 free_neighbour_cb (void *cls,
3128                    const struct GNUNET_PeerIdentity *pid,
3129                    void *value)
3130 {
3131   struct Neighbour *neighbour = value;
3132
3133   (void) cls;
3134   (void) pid;
3135   GNUNET_break (0); // should this ever happen?
3136   free_neighbour (neighbour);
3137
3138   return GNUNET_OK;
3139 }
3140
3141
3142 /**
3143  * Free ephemeral entry.
3144  *
3145  * @param cls NULL
3146  * @param pid unused
3147  * @param value a `struct Neighbour`
3148  * @return #GNUNET_OK (always)
3149  */
3150 static int
3151 free_ephemeral_cb (void *cls,
3152                    const struct GNUNET_PeerIdentity *pid,
3153                    void *value)
3154 {
3155   struct EphemeralCacheEntry *ece = value;
3156
3157   (void) cls;
3158   (void) pid;
3159   free_ephemeral (ece);
3160   return GNUNET_OK;
3161 }
3162
3163
3164 /**
3165  * Function called when the service shuts down.  Unloads our plugins
3166  * and cancels pending validations.
3167  *
3168  * @param cls closure, unused
3169  */
3170 static void
3171 do_shutdown (void *cls)
3172 {
3173   (void) cls;
3174
3175   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
3176                                          &free_neighbour_cb,
3177                                          NULL);
3178   if (NULL != ats)
3179   {
3180     GNUNET_ATS_transport_done (ats);
3181     ats = NULL;
3182   }
3183   if (NULL != peerstore)
3184   {
3185     GNUNET_PEERSTORE_disconnect (peerstore,
3186                                  GNUNET_NO);
3187     peerstore = NULL;
3188   }
3189   if (NULL != GST_stats)
3190   {
3191     GNUNET_STATISTICS_destroy (GST_stats,
3192                                GNUNET_NO);
3193     GST_stats = NULL;
3194   }
3195   if (NULL != GST_my_private_key)
3196   {
3197     GNUNET_free (GST_my_private_key);
3198     GST_my_private_key = NULL;
3199   }
3200   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
3201   neighbours = NULL;
3202   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
3203                                          &free_ephemeral_cb,
3204                                          NULL);
3205   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
3206   ephemeral_map = NULL;
3207   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
3208   ephemeral_heap = NULL;
3209 }
3210
3211
3212 /**
3213  * Initiate transport service.
3214  *
3215  * @param cls closure
3216  * @param c configuration to use
3217  * @param service the initialized service
3218  */
3219 static void
3220 run (void *cls,
3221      const struct GNUNET_CONFIGURATION_Handle *c,
3222      struct GNUNET_SERVICE_Handle *service)
3223 {
3224   (void) cls;
3225   /* setup globals */
3226   GST_cfg = c;
3227   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
3228                                                      GNUNET_YES);
3229   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
3230                                                         GNUNET_YES);
3231   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3232   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
3233   if (NULL == GST_my_private_key)
3234   {
3235     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3236                 _("Transport service is lacking key configuration settings. Exiting.\n"));
3237     GNUNET_SCHEDULER_shutdown ();
3238     return;
3239   }
3240   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
3241                                       &GST_my_identity.public_key);
3242   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
3243              "My identity is `%s'\n",
3244              GNUNET_i2s_full (&GST_my_identity));
3245   GST_stats = GNUNET_STATISTICS_create ("transport",
3246                                         GST_cfg);
3247   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
3248                                  NULL);
3249   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
3250   if (NULL == peerstore)
3251   {
3252     GNUNET_break (0);
3253     GNUNET_SCHEDULER_shutdown ();
3254     return;
3255   }
3256   ats = GNUNET_ATS_transport_init (GST_cfg,
3257                                    &ats_allocation_cb,
3258                                    NULL,
3259                                    &ats_suggestion_cb,
3260                                    NULL);
3261   if (NULL == ats)
3262   {
3263     GNUNET_break (0);
3264     GNUNET_SCHEDULER_shutdown ();
3265     return;
3266   }
3267 }
3268
3269
3270 /**
3271  * Define "main" method using service macro.
3272  */
3273 GNUNET_SERVICE_MAIN
3274 ("transport",
3275  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
3276  &run,
3277  &client_connect_cb,
3278  &client_disconnect_cb,
3279  NULL,
3280  /* communication with core */
3281  GNUNET_MQ_hd_fixed_size (client_start,
3282                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
3283                           struct StartMessage,
3284                           NULL),
3285  GNUNET_MQ_hd_var_size (client_send,
3286                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
3287                         struct OutboundMessage,
3288                         NULL),
3289  /* communication with communicators */
3290  GNUNET_MQ_hd_var_size (communicator_available,
3291                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
3292                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
3293                         NULL),
3294  GNUNET_MQ_hd_var_size (add_address,
3295                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
3296                         struct GNUNET_TRANSPORT_AddAddressMessage,
3297                         NULL),
3298  GNUNET_MQ_hd_fixed_size (del_address,
3299                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
3300                           struct GNUNET_TRANSPORT_DelAddressMessage,
3301                           NULL),
3302  GNUNET_MQ_hd_var_size (incoming_msg,
3303                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
3304                         struct GNUNET_TRANSPORT_IncomingMessage,
3305                         NULL),
3306  GNUNET_MQ_hd_fixed_size (queue_create_ok,
3307                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
3308                           struct GNUNET_TRANSPORT_CreateQueueResponse,
3309                           NULL),
3310  GNUNET_MQ_hd_fixed_size (queue_create_fail,
3311                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
3312                           struct GNUNET_TRANSPORT_CreateQueueResponse,
3313                           NULL),
3314  GNUNET_MQ_hd_var_size (add_queue_message,
3315                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
3316                         struct GNUNET_TRANSPORT_AddQueueMessage,
3317                         NULL),
3318  GNUNET_MQ_hd_fixed_size (del_queue_message,
3319                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
3320                           struct GNUNET_TRANSPORT_DelQueueMessage,
3321                           NULL),
3322  GNUNET_MQ_hd_fixed_size (send_message_ack,
3323                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
3324                           struct GNUNET_TRANSPORT_SendMessageToAck,
3325                           NULL),
3326  /* communication with monitors */
3327  GNUNET_MQ_hd_fixed_size (monitor_start,
3328                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
3329                           struct GNUNET_TRANSPORT_MonitorStart,
3330                           NULL),
3331  GNUNET_MQ_handler_end ());
3332
3333
3334 /* end of file gnunet-service-transport.c */