more work on tng
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement:
36  * - manage defragmentation, retransmission, track RTT, loss, etc.
37  *
38  * Easy:
39  * - use ATS bandwidth allocation callback and schedule transmissions!
40  *
41  * Plan:
42  * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
43  *
44  * Later:
45  * - change transport-core API to provide proper flow control in both
46  *   directions, allow multiple messages per peer simultaneously (tag
47  *   confirmations with unique message ID), and replace quota-out with
48  *   proper flow control;
49  *
50  * Design realizations / discussion:
51  * - communicators do flow control by calling MQ "notify sent"
52  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
53  *   or explicitly via background channel FC ACKs.  As long as the
54  *   channel is not full, they may 'notify sent' even if the other
55  *   peer has not yet confirmed receipt. The other peer confirming
56  *   is _only_ for FC, not for more reliable transmission; reliable
57  *   transmission (i.e. of fragments) is left to _transport_.
58  * - ACKs sent back in uni-directional communicators are done via
59  *   the background channel API; here transport _may_ initially
60  *   broadcast (with bounded # hops) if no path is known;
61  * - transport should _integrate_ DV-routing and build a view of
62  *   the network; then background channel traffic can be
63  *   routed via DV as well as explicit "DV" traffic.
64  * - background channel is also used for ACKs and NAT traversal support
65  * - transport service is responsible for AEAD'ing the background
66  *   channel, timestamps and monotonic time are used against replay
67  *   of old messages -> peerstore needs to be supplied with
68  *   "latest timestamps seen" data
69  * - if transport implements DV, we likely need a 3rd peermap
70  *   in addition to ephemerals and (direct) neighbours
71  *   => in this data structure, we should track ATS metrics (distance, RTT, etc.)
72  *   as well as latest timestamps seen, goodput, fragments for transmission, etc.
73  *   ==> check if stuff needs to be moved out of "Neighbour"
74  * - transport should encapsualte core-level messages and do its
75  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
76  *   for retransmission
77  */
78 #include "platform.h"
79 #include "gnunet_util_lib.h"
80 #include "gnunet_statistics_service.h"
81 #include "gnunet_transport_monitor_service.h"
82 #include "gnunet_peerstore_service.h"
83 #include "gnunet_hello_lib.h"
84 #include "gnunet_ats_transport_service.h"
85 #include "transport.h"
86
87
88 /**
89  * What is the size we assume for a read operation in the
90  * absence of an MTU for the purpose of flow control?
91  */
92 #define IN_PACKET_SIZE_WITHOUT_MTU 128
93
94 /**
95  * If a queue delays the next message by more than this number
96  * of seconds we log a warning. Note: this is for testing,
97  * the value chosen here might be too aggressively low!
98  */
99 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
100
101 /**
102  * How many messages can we have pending for a given communicator
103  * process before we start to throttle that communicator?
104  * 
105  * Used if a communicator might be CPU-bound and cannot handle the traffic.
106  */
107 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
108
109 /**
110  * How many messages can we have pending for a given session (queue to
111  * a particular peer via a communicator) process before we start to
112  * throttle that queue?
113  * 
114  * Used if ATS assigns more bandwidth to a particular transmission
115  * method than that transmission method can right now handle. (Yes,
116  * ATS should eventually notice utilization below allocation and
117  * adjust, but we don't want to queue up tons of messages in the
118  * meantime). Must be significantly below
119  * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
120  */
121 #define SESSION_QUEUE_LIMIT 32
122
123
124 GNUNET_NETWORK_STRUCT_BEGIN
125
126 /**
127  * Outer layer of an encapsulated backchannel message.
128  */
129 struct TransportBackchannelEncapsulationMessage
130 {
131   /**
132    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
133    */
134   struct GNUNET_MessageHeader header;
135
136   /**
137    * Distance the backchannel message has traveled, to be updated at
138    * each hop.  Used to bound the number of hops in case a backchannel
139    * message is broadcast and thus travels without routing
140    * information (during initial backchannel discovery).
141    */
142   uint32_t distance;
143
144   /**
145    * Target's peer identity (as backchannels may be transmitted
146    * indirectly, or even be broadcast).
147    */
148   struct GNUNET_PeerIdentity target;
149
150   /**
151    * Ephemeral key setup by the sender for @e target, used
152    * to encrypt the payload.
153    */
154   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
155
156   /**
157    * HMAC over the ciphertext of the encrypted, variable-size
158    * body that follows.  Verified via DH of @e target and
159    * @e ephemeral_key
160    */
161   struct GNUNET_HashCode hmac;
162
163   /* Followed by encrypted, variable-size payload */
164 };
165
166
167 /**
168  * Body by which a peqer confirms that it is using an ephemeral
169  * key.
170  */
171 struct EphemeralConfirmation
172 {
173
174   /**
175    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
176    */
177   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
178
179   /**
180    * How long is this signature over the ephemeral key
181    * valid?
182    */
183   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
184
185   /**
186    * Ephemeral key setup by the sender for @e target, used
187    * to encrypt the payload.
188    */
189   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
190
191 };
192
193
194 /**
195  * Message by which a peqer confirms that it is using an ephemeral
196  * key.
197  */
198 struct EphemeralConfirmationMessage
199 {
200
201   /**
202    * Message header, type is #GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION
203    */
204   struct GNUNET_MessageHeader header;
205
206   /**
207    * Must be zero.
208    */  
209   uint32_t reserved;
210   
211   /**
212    * How long is this signature over the ephemeral key
213    * valid?
214    */
215   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
216
217   /**
218    * Ephemeral key setup by the sender for @e target, used
219    * to encrypt the payload.
220    */
221   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
222 };
223
224
225 /**
226  * Plaintext of the variable-size payload that is encrypted
227  * within a `struct TransportBackchannelEncapsulationMessage`
228  */
229 struct TransportBackchannelRequestPayload
230 {
231
232   /**
233    * Sender's peer identity.
234    */
235   struct GNUNET_PeerIdentity sender;
236
237   /**
238    * Signature of the sender over an
239    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
240    */
241   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
242
243   /**
244    * How long is this signature over the ephemeral key
245    * valid?
246    */
247   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
248
249   /**
250    * Current monotonic time of the sending transport service.  Used to
251    * detect replayed messages.  Note that the receiver should remember
252    * a list of the recently seen timestamps and only reject messages
253    * if the timestamp is in the list, or the list is "full" and the
254    * timestamp is smaller than the lowest in the list.  This list of
255    * timestamps per peer should be persisted to guard against replays
256    * after restarts.
257    */
258   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
259
260   /* Followed by a `struct GNUNET_MessageHeader` with a message
261      for a communicator */
262
263   /* Followed by a 0-termianted string specifying the name of
264      the communicator which is to receive the message */
265
266 };
267
268
269 /**
270  * Outer layer of an encapsulated unfragmented application message sent
271  * over an unreliable channel.
272  */
273 struct TransportReliabilityBox
274 {
275   /**
276    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
277    */
278   struct GNUNET_MessageHeader header;
279
280   /**
281    * Number of messages still to be sent before a commulative
282    * ACK is requested.  Zero if an ACK is requested immediately.
283    * In NBO.  Note that the receiver may send the ACK faster
284    * if it believes that is reasonable.
285    */
286   uint32_t ack_countdown GNUNET_PACKED;
287
288   /**
289    * Unique ID of the message used for signalling receipt of
290    * messages sent over possibly unreliable channels.  Should
291    * be a random.
292    */
293   struct GNUNET_ShortHashCode msg_uuid;
294 };
295
296
297 /**
298  * Confirmation that the receiver got a
299  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
300  * confirmation may be transmitted over a completely different queue,
301  * so ACKs are identified by a combination of PID of sender and
302  * message UUID, without the queue playing any role!
303  */
304 struct TransportReliabilityAckMessage
305 {
306   /**
307    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
308    */
309   struct GNUNET_MessageHeader header;
310
311   /**
312    * Reserved. Zero.
313    */
314   uint32_t reserved GNUNET_PACKED;
315
316   /**
317    * How long was the ACK delayed relative to the average time of
318    * receipt of the messages being acknowledged?  Used to calculate
319    * the average RTT by taking the receipt time of the ack minus the
320    * average transmission time of the sender minus this value.
321    */
322   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
323
324   /* followed by any number of `struct GNUNET_ShortHashCode`
325      messages providing ACKs */
326 };
327
328
329 /**
330  * Outer layer of an encapsulated fragmented application message.
331  */
332 struct TransportFragmentBox
333 {
334   /**
335    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
336    */
337   struct GNUNET_MessageHeader header;
338
339   /**
340    * Unique ID of this fragment (and fragment transmission!). Will
341    * change even if a fragement is retransmitted to make each
342    * transmission attempt unique! Should be incremented by one for
343    * each fragment transmission. If a client receives a duplicate
344    * fragment (same @e frag_off), it must send
345    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
346    */
347   uint32_t frag_uuid GNUNET_PACKED;
348
349   /**
350    * Original message ID for of the message that all the1
351    * fragments belong to.  Must be the same for all fragments.
352    */ 
353   struct GNUNET_ShortHashCode msg_uuid;
354
355   /**
356    * Offset of this fragment in the overall message.
357    */ 
358   uint16_t frag_off GNUNET_PACKED;
359
360   /**
361    * Total size of the message that is being fragmented.
362    */ 
363   uint16_t msg_size GNUNET_PACKED;
364
365 };
366
367
368 /**
369  * Outer layer of an fragmented application message sent over a queue
370  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
371  * received, the receiver has two RTTs or 64 further fragments with
372  * the same basic message time to send an acknowledgement, possibly
373  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
374  * sent immediately once all fragments were sent.
375  */
376 struct TransportFragmentAckMessage
377 {
378   /**
379    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
380    */
381   struct GNUNET_MessageHeader header;
382
383   /**
384    * Unique ID of the lowest fragment UUID being acknowledged.
385    */
386   uint32_t frag_uuid GNUNET_PACKED;
387
388   /**
389    * Bitfield of up to 64 additional fragments following the
390    * @e msg_uuid being acknowledged by this message.
391    */ 
392   uint64_t extra_acks GNUNET_PACKED;
393
394   /**
395    * Original message ID for of the message that all the
396    * fragments belong to.
397    */ 
398   struct GNUNET_ShortHashCode msg_uuid;
399
400   /**
401    * How long was the ACK delayed relative to the average time of
402    * receipt of the fragments being acknowledged?  Used to calculate
403    * the average RTT by taking the receipt time of the ack minus the
404    * average transmission time of the sender minus this value.
405    */
406   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
407 };
408
409
410 /**
411  * Internal message used by transport for distance vector learning.
412  * If @e num_hops does not exceed the threshold, peers should append
413  * themselves to the peer list and flood the message (possibly only
414  * to a subset of their neighbours to limit discoverability of the
415  * network topology).  To the extend that the @e bidirectional bits
416  * are set, peers may learn the inverse paths even if they did not
417  * initiate. 
418  *
419  * Unless received on a bidirectional queue and @e num_hops just
420  * zero, peers that can forward to the initator should always try to
421  * forward to the initiator.
422  */
423 struct TransportDVLearn
424 {
425   /**
426    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
427    */
428   struct GNUNET_MessageHeader header;
429
430   /**
431    * Number of hops this messages has travelled, in NBO. Zero if
432    * sent by initiator.
433    */
434   uint16_t num_hops GNUNET_PACKED;
435
436   /**
437    * Bitmask of the last 16 hops indicating whether they are confirmed
438    * available (without DV) in both directions or not, in NBO.  Used
439    * to possibly instantly learn a path in both directions.  Each peer
440    * should shift this value by one to the left, and then set the
441    * lowest bit IF the current sender can be reached from it (without
442    * DV routing).  
443    */ 
444   uint16_t bidirectional GNUNET_PACKED;
445
446   /**
447    * Peers receiving this message and delaying forwarding to other
448    * peers for any reason should increment this value such as to 
449    * enable the origin to determine the actual network-only delay
450    * in addition to the real-time delay (assuming the message loops
451    * back to the origin).
452    */
453   struct GNUNET_TIME_Relative cummulative_non_network_delay;
454
455   /**
456    * Identity of the peer that started this learning activity.
457    */
458   struct GNUNET_PeerIdentity initiator;
459   
460   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
461      excluding the initiator of the DV trace; the last entry is the
462      current sender; the current peer must not be included. */
463   
464 };
465
466
467 /**
468  * Outer layer of an encapsulated message send over multiple hops.
469  * The path given only includes the identities of the subsequent
470  * peers, i.e. it will be empty if we are the receiver. Each
471  * forwarding peer should scan the list from the end, and if it can,
472  * forward to the respective peer. The list should then be shortened
473  * by all the entries up to and including that peer.  Each hop should
474  * also increment @e total_hops to allow the receiver to get a precise
475  * estimate on the number of hops the message travelled.  Senders must
476  * provide a learned path that thus should work, but intermediaries
477  * know of a shortcut, they are allowed to send the message via that
478  * shortcut.
479  *
480  * If a peer finds itself still on the list, it must drop the message.
481  */
482 struct TransportDVBox
483 {
484   /**
485    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
486    */
487   struct GNUNET_MessageHeader header;
488
489   /**
490    * Number of total hops this messages travelled. In NBO.
491    * @e origin sets this to zero, to be incremented at
492    * each hop.
493    */
494   uint16_t total_hops GNUNET_PACKED;
495
496   /**
497    * Number of hops this messages includes. In NBO.
498    */
499   uint16_t num_hops GNUNET_PACKED;
500   
501   /**
502    * Identity of the peer that originated the message.
503    */
504   struct GNUNET_PeerIdentity origin;
505
506   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
507      excluding the @e origin and the current peer, the last must be
508      the ultimate target; if @e num_hops is zero, the receiver of this
509      message is the ultimate target. */
510
511   /* Followed by the actual message, which itself may be
512      another box, but not a DV_LEARN or DV_BOX message! */
513 };
514
515
516 GNUNET_NETWORK_STRUCT_END
517
518
519
520 /**
521  * What type of client is the `struct TransportClient` about?
522  */
523 enum ClientType
524 {
525   /**
526    * We do not know yet (client is fresh).
527    */
528   CT_NONE = 0,
529
530   /**
531    * Is the CORE service, we need to forward traffic to it.
532    */
533   CT_CORE = 1,
534
535   /**
536    * It is a monitor, forward monitor data.
537    */
538   CT_MONITOR = 2,
539
540   /**
541    * It is a communicator, use for communication.
542    */
543   CT_COMMUNICATOR = 3
544 };
545
546
547 /**
548  * Entry in our cache of ephemeral keys we currently use.
549  */
550 struct EphemeralCacheEntry
551 {
552
553   /**
554    * Target's peer identity (we don't re-use ephemerals
555    * to limit linkability of messages).
556    */
557   struct GNUNET_PeerIdentity target;
558
559   /**
560    * Signature affirming @e ephemeral_key of type
561    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
562    */
563   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
564
565   /**
566    * How long is @e sender_sig valid
567    */
568   struct GNUNET_TIME_Absolute ephemeral_validity;
569
570   /**
571    * Our ephemeral key.
572    */
573   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
574
575   /**
576    * Node in the ephemeral cache for this entry.
577    * Used for expiration.
578    */
579   struct GNUNET_CONTAINER_HeapNode *hn;
580 };
581
582
583 /**
584  * Client connected to the transport service.
585  */
586 struct TransportClient;
587
588
589 /**
590  * A neighbour that at least one communicator is connected to.
591  */
592 struct Neighbour;
593
594
595 /**
596  * Entry identifying transmission in one of our `struct
597  * GNUNET_ATS_Sessions` which still awaits an ACK.  This is used to
598  * ensure we do not overwhelm a communicator and limit the number of
599  * messages outstanding per communicator (say in case communicator is
600  * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
601  * what the communicator can actually provide towards a particular
602  * peer/target).
603  */
604 struct QueueEntry
605 {
606
607   /**
608    * Kept as a DLL.
609    */ 
610   struct QueueEntry *next;
611
612   /**
613    * Kept as a DLL.
614    */ 
615   struct QueueEntry *prev;
616
617   /**
618    * ATS session this entry is queued with.
619    */
620   struct GNUNET_ATS_Session *session;
621   
622   /**
623    * Message ID used for this message with the queue used for transmission.
624    */
625   uint64_t mid;
626 };
627
628
629 /**
630  * An ATS session is a message queue provided by a communicator
631  * via which we can reach a particular neighbour.
632  */
633 struct GNUNET_ATS_Session
634 {
635   /**
636    * Kept in a MDLL.
637    */
638   struct GNUNET_ATS_Session *next_neighbour;
639
640   /**
641    * Kept in a MDLL.
642    */
643   struct GNUNET_ATS_Session *prev_neighbour;
644
645   /**
646    * Kept in a MDLL.
647    */
648   struct GNUNET_ATS_Session *prev_client;
649
650   /**
651    * Kept in a MDLL.
652    */
653   struct GNUNET_ATS_Session *next_client;
654
655   /**
656    * Head of DLL of unacked transmission requests.
657    */ 
658   struct QueueEntry *queue_head;
659
660   /**
661    * End of DLL of unacked transmission requests.
662    */ 
663   struct QueueEntry *queue_tail;
664
665   /**
666    * Which neighbour is this ATS session for?
667    */
668   struct Neighbour *neighbour;
669
670   /**
671    * Which communicator offers this ATS session?
672    */
673   struct TransportClient *tc;
674
675   /**
676    * Address served by the ATS session.
677    */
678   const char *address;
679
680   /**
681    * Handle by which we inform ATS about this queue.
682    */
683   struct GNUNET_ATS_SessionRecord *sr;
684
685   /**
686    * Task scheduled for the time when this queue can (likely) transmit the
687    * next message. Still needs to check with the @e tracker_out to be sure.
688    */ 
689   struct GNUNET_SCHEDULER_Task *transmit_task;
690   
691   /**
692    * Our current RTT estimate for this ATS session.
693    */
694   struct GNUNET_TIME_Relative rtt;
695
696   /**
697    * Message ID generator for transmissions on this queue.
698    */ 
699   uint64_t mid_gen;
700   
701   /**
702    * Unique identifier of this ATS session with the communicator.
703    */
704   uint32_t qid;
705
706   /**
707    * Maximum transmission unit supported by this ATS session.
708    */
709   uint32_t mtu;
710
711   /**
712    * Distance to the target of this ATS session.
713    */
714   uint32_t distance;
715
716   /**
717    * Messages pending.
718    */
719   uint32_t num_msg_pending;
720
721   /**
722    * Bytes pending.
723    */
724   uint32_t num_bytes_pending;
725
726   /**
727    * Length of the DLL starting at @e queue_head.
728    */
729   unsigned int queue_length;
730   
731   /**
732    * Network type offered by this ATS session.
733    */
734   enum GNUNET_NetworkType nt;
735
736   /**
737    * Connection status for this ATS session.
738    */
739   enum GNUNET_TRANSPORT_ConnectionStatus cs;
740
741   /**
742    * How much outbound bandwidth do we have available for this session?
743    */
744   struct GNUNET_BANDWIDTH_Tracker tracker_out;
745
746   /**
747    * How much inbound bandwidth do we have available for this session?
748    */
749   struct GNUNET_BANDWIDTH_Tracker tracker_in;
750 };
751
752
753 /**
754  * A neighbour that at least one communicator is connected to.
755  */
756 struct Neighbour
757 {
758
759   /**
760    * Which peer is this about?
761    */
762   struct GNUNET_PeerIdentity pid;
763
764   /**
765    * Head of list of messages pending for this neighbour.
766    */
767   struct PendingMessage *pending_msg_head;
768
769   /**
770    * Tail of list of messages pending for this neighbour.
771    */
772   struct PendingMessage *pending_msg_tail;
773
774   /**
775    * Head of DLL of ATS sessions to this peer.
776    */
777   struct GNUNET_ATS_Session *session_head;
778
779   /**
780    * Tail of DLL of ATS sessions to this peer.
781    */
782   struct GNUNET_ATS_Session *session_tail;
783
784   /**
785    * Task run to cleanup pending messages that have exceeded their timeout.
786    */  
787   struct GNUNET_SCHEDULER_Task *timeout_task;
788
789   /**
790    * Quota at which CORE is allowed to transmit to this peer
791    * according to ATS.
792    *
793    * FIXME: not yet used, tricky to get right given multiple queues!
794    *        (=> Idea: let ATS set a quota per queue and we add them up here?)
795    * FIXME: how do we set this value initially when we tell CORE?
796    *    Options: start at a minimum value or at literally zero (before ATS?)
797    *         (=> Current thought: clean would be zero!)
798    */
799   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
800
801   /**
802    * What is the earliest timeout of any message in @e pending_msg_tail?
803    */ 
804   struct GNUNET_TIME_Absolute earliest_timeout;
805   
806 };
807
808
809 /**
810  * Types of different pending messages.
811  */ 
812 enum PendingMessageType
813 {
814
815   /**
816    * Ordinary message received from the CORE service.
817    */
818   PMT_CORE = 0,
819
820   /**
821    * Fragment box.
822    */
823   PMT_FRAGMENT_BOX = 1,
824
825   /**
826    * Reliability box.
827    */
828   PMT_RELIABILITY_BOX = 2,
829
830   /**
831    * Any type of acknowledgement.
832    */
833   PMT_ACKNOWLEDGEMENT = 3
834
835   
836 };
837
838
839 /**
840  * Transmission request that is awaiting delivery.  The original
841  * transmission requests from CORE may be too big for some queues.
842  * In this case, a *tree* of fragments is created.  At each
843  * level of the tree, fragments are kept in a DLL ordered by which
844  * fragment should be sent next (at the head).  The tree is searched
845  * top-down, with the original message at the root.
846  *
847  * To select a node for transmission, first it is checked if the
848  * current node's message fits with the MTU.  If it does not, we
849  * either calculate the next fragment (based on @e frag_off) from the
850  * current node, or, if all fragments have already been created,
851  * descend to the @e head_frag.  Even though the node was already
852  * fragmented, the fragment may be too big if the fragment was 
853  * generated for a queue with a larger MTU. In this case, the node
854  * may be fragmented again, thus creating a tree.
855  *
856  * When acknowledgements for fragments are received, the tree
857  * must be pruned, removing those parts that were already 
858  * acknowledged.  When fragments are sent over a reliable 
859  * channel, they can be immediately removed.
860  *
861  * If a message is ever fragmented, then the original "full" message
862  * is never again transmitted (even if it fits below the MTU), and
863  * only (remaining) fragments are sent.
864  */
865 struct PendingMessage
866 {
867   /**
868    * Kept in a MDLL of messages for this @a target.
869    */
870   struct PendingMessage *next_neighbour;
871
872   /**
873    * Kept in a MDLL of messages for this @a target.
874    */
875   struct PendingMessage *prev_neighbour;
876
877   /**
878    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
879    */
880   struct PendingMessage *next_client;
881   
882   /**
883    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
884    */
885   struct PendingMessage *prev_client;
886
887   /**
888    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
889    */
890   struct PendingMessage *next_frag;
891   
892   /**
893    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
894    */
895   struct PendingMessage *prev_frag;
896
897   /**
898    * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
899    */ 
900   struct PendingMessage *bpm;
901   
902   /**
903    * Target of the request.
904    */
905   struct Neighbour *target;
906       
907   /**
908    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
909    */
910   struct TransportClient *client;
911   
912   /**
913    * Head of a MDLL of fragments created for this core message.
914    */
915   struct PendingMessage *head_frag;
916   
917   /**
918    * Tail of a MDLL of fragments created for this core message.
919    */
920   struct PendingMessage *tail_frag;
921
922   /**
923    * Our parent in the fragmentation tree.
924    */
925   struct PendingMessage *frag_parent;
926     
927   /**
928    * At what time should we give up on the transmission (and no longer retry)?
929    */
930   struct GNUNET_TIME_Absolute timeout;
931
932   /**
933    * What is the earliest time for us to retry transmission of this message?
934    */
935   struct GNUNET_TIME_Absolute next_attempt;
936
937   /**
938    * UUID to use for this message (used for reassembly of fragments, only
939    * initialized if @e msg_uuid_set is #GNUNET_YES).
940    */
941   struct GNUNET_ShortHashCode msg_uuid;
942   
943   /**
944    * Counter incremented per generated fragment.
945    */ 
946   uint32_t frag_uuidgen;
947   
948   /**
949    * Type of the pending message.
950    */
951   enum PendingMessageType pmt;
952
953   /**
954    * Size of the original message.
955    */
956   uint16_t bytes_msg;
957
958   /**
959    * Offset at which we should generate the next fragment.
960    */ 
961   uint16_t frag_off;
962
963   /**
964    * #GNUNET_YES once @e msg_uuid was initialized
965    */
966   int16_t msg_uuid_set;
967   
968   /* Followed by @e bytes_msg to transmit */
969 };
970
971
972 /**
973  * One of the addresses of this peer.
974  */
975 struct AddressListEntry
976 {
977
978   /**
979    * Kept in a DLL.
980    */
981   struct AddressListEntry *next;
982
983   /**
984    * Kept in a DLL.
985    */
986   struct AddressListEntry *prev;
987
988   /**
989    * Which communicator provides this address?
990    */
991   struct TransportClient *tc;
992
993   /**
994    * The actual address.
995    */
996   const char *address;
997
998   /**
999    * Current context for storing this address in the peerstore.
1000    */
1001   struct GNUNET_PEERSTORE_StoreContext *sc;
1002
1003   /**
1004    * Task to periodically do @e st operation.
1005    */
1006   struct GNUNET_SCHEDULER_Task *st;
1007
1008   /**
1009    * What is a typical lifetime the communicator expects this
1010    * address to have? (Always from now.)
1011    */
1012   struct GNUNET_TIME_Relative expiration;
1013
1014   /**
1015    * Address identifier used by the communicator.
1016    */
1017   uint32_t aid;
1018
1019   /**
1020    * Network type offered by this address.
1021    */
1022   enum GNUNET_NetworkType nt;
1023
1024 };
1025
1026
1027 /**
1028  * Client connected to the transport service.
1029  */
1030 struct TransportClient
1031 {
1032
1033   /**
1034    * Kept in a DLL.
1035    */
1036   struct TransportClient *next;
1037
1038   /**
1039    * Kept in a DLL.
1040    */
1041   struct TransportClient *prev;
1042
1043   /**
1044    * Handle to the client.
1045    */
1046   struct GNUNET_SERVICE_Client *client;
1047
1048   /**
1049    * Message queue to the client.
1050    */
1051   struct GNUNET_MQ_Handle *mq;
1052
1053   /**
1054    * What type of client is this?
1055    */
1056   enum ClientType type;
1057
1058   union
1059   {
1060
1061     /**
1062      * Information for @e type #CT_CORE.
1063      */
1064     struct {
1065
1066       /**
1067        * Head of list of messages pending for this client, sorted by 
1068        * transmission time ("next_attempt" + possibly internal prioritization).
1069        */
1070       struct PendingMessage *pending_msg_head;
1071
1072       /**
1073        * Tail of list of messages pending for this client.
1074        */
1075       struct PendingMessage *pending_msg_tail;
1076
1077     } core;
1078
1079     /**
1080      * Information for @e type #CT_MONITOR.
1081      */
1082     struct {
1083
1084       /**
1085        * Peer identity to monitor the addresses of.
1086        * Zero to monitor all neighbours.  Valid if
1087        * @e type is #CT_MONITOR.
1088        */
1089       struct GNUNET_PeerIdentity peer;
1090
1091       /**
1092        * Is this a one-shot monitor?
1093        */
1094       int one_shot;
1095
1096     } monitor;
1097
1098
1099     /**
1100      * Information for @e type #CT_COMMUNICATOR.
1101      */
1102     struct {
1103       /**
1104        * If @e type is #CT_COMMUNICATOR, this communicator
1105        * supports communicating using these addresses.
1106        */
1107       char *address_prefix;
1108
1109       /**
1110        * Head of DLL of queues offered by this communicator.
1111        */
1112       struct GNUNET_ATS_Session *session_head;
1113
1114       /**
1115        * Tail of DLL of queues offered by this communicator.
1116        */
1117       struct GNUNET_ATS_Session *session_tail;
1118
1119       /**
1120        * Head of list of the addresses of this peer offered by this communicator.
1121        */
1122       struct AddressListEntry *addr_head;
1123
1124       /**
1125        * Tail of list of the addresses of this peer offered by this communicator.
1126        */
1127       struct AddressListEntry *addr_tail;
1128
1129       /**
1130        * Number of queue entries in all queues to this communicator. Used
1131        * throttle sending to a communicator if we see that the communicator
1132        * is globally unable to keep up.
1133        */
1134       unsigned int total_queue_length;
1135       
1136       /**
1137        * Characteristics of this communicator.
1138        */
1139       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1140
1141     } communicator;
1142
1143   } details;
1144
1145 };
1146
1147
1148 /**
1149  * Head of linked list of all clients to this service.
1150  */
1151 static struct TransportClient *clients_head;
1152
1153 /**
1154  * Tail of linked list of all clients to this service.
1155  */
1156 static struct TransportClient *clients_tail;
1157
1158 /**
1159  * Statistics handle.
1160  */
1161 static struct GNUNET_STATISTICS_Handle *GST_stats;
1162
1163 /**
1164  * Configuration handle.
1165  */
1166 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1167
1168 /**
1169  * Our public key.
1170  */
1171 static struct GNUNET_PeerIdentity GST_my_identity;
1172
1173 /**
1174  * Our private key.
1175  */
1176 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1177
1178 /**
1179  * Map from PIDs to `struct Neighbour` entries.  A peer is
1180  * a neighbour if we have an MQ to it from some communicator.
1181  */
1182 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1183
1184 /**
1185  * Database for peer's HELLOs.
1186  */
1187 static struct GNUNET_PEERSTORE_Handle *peerstore;
1188
1189 /**
1190  * Heap sorting `struct EphemeralCacheEntry` by their
1191  * key/signature validity.
1192  */
1193 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1194
1195 /**
1196  * Hash map for looking up `struct EphemeralCacheEntry`s
1197  * by peer identity. (We may have ephemerals in our
1198  * cache for which we do not have a neighbour entry,
1199  * and similar many neighbours may not need ephemerals,
1200  * so we use a second map.)
1201  */
1202 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1203
1204 /**
1205  * Our connection to ATS for allocation and bootstrapping.
1206  */
1207 static struct GNUNET_ATS_TransportHandle *ats;
1208
1209
1210 /**
1211  * Free cached ephemeral key.
1212  *
1213  * @param ece cached signature to free
1214  */
1215 static void
1216 free_ephemeral (struct EphemeralCacheEntry *ece)
1217 {
1218   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1219                                         &ece->target,
1220                                         ece);
1221   GNUNET_CONTAINER_heap_remove_node (ece->hn);
1222   GNUNET_free (ece);
1223 }
1224
1225
1226 /**
1227  * Lookup neighbour record for peer @a pid.
1228  *
1229  * @param pid neighbour to look for
1230  * @return NULL if we do not have this peer as a neighbour
1231  */
1232 static struct Neighbour *
1233 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1234 {
1235   return GNUNET_CONTAINER_multipeermap_get (neighbours,
1236                                             pid);
1237 }
1238
1239
1240 /**
1241  * Details about what to notify monitors about.
1242  */
1243 struct MonitorEvent
1244 {
1245   /**
1246    * @deprecated To be discussed if we keep these...
1247    */
1248   struct GNUNET_TIME_Absolute last_validation;
1249   struct GNUNET_TIME_Absolute valid_until;
1250   struct GNUNET_TIME_Absolute next_validation;
1251
1252   /**
1253    * Current round-trip time estimate.
1254    */
1255   struct GNUNET_TIME_Relative rtt;
1256
1257   /**
1258    * Connection status.
1259    */
1260   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1261
1262   /**
1263    * Messages pending.
1264    */
1265   uint32_t num_msg_pending;
1266
1267   /**
1268    * Bytes pending.
1269    */
1270   uint32_t num_bytes_pending;
1271
1272
1273 };
1274
1275
1276 /**
1277  * Notify monitor @a tc about an event.  That @a tc
1278  * cares about the event has already been checked.
1279  *
1280  * Send @a tc information in @a me about a @a peer's status with
1281  * respect to some @a address to all monitors that care.
1282  *
1283  * @param tc monitor to inform
1284  * @param peer peer the information is about
1285  * @param address address the information is about
1286  * @param nt network type associated with @a address
1287  * @param me detailed information to transmit
1288  */
1289 static void
1290 notify_monitor (struct TransportClient *tc,
1291                 const struct GNUNET_PeerIdentity *peer,
1292                 const char *address,
1293                 enum GNUNET_NetworkType nt,
1294                 const struct MonitorEvent *me)
1295 {
1296   struct GNUNET_MQ_Envelope *env;
1297   struct GNUNET_TRANSPORT_MonitorData *md;
1298   size_t addr_len = strlen (address) + 1;
1299
1300   env = GNUNET_MQ_msg_extra (md,
1301                              addr_len,
1302                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1303   md->nt = htonl ((uint32_t) nt);
1304   md->peer = *peer;
1305   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1306   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1307   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1308   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1309   md->cs = htonl ((uint32_t) me->cs);
1310   md->num_msg_pending = htonl (me->num_msg_pending);
1311   md->num_bytes_pending = htonl (me->num_bytes_pending);
1312   memcpy (&md[1],
1313           address,
1314           addr_len);
1315   GNUNET_MQ_send (tc->mq,
1316                   env);
1317 }
1318
1319
1320 /**
1321  * Send information in @a me about a @a peer's status with respect
1322  * to some @a address to all monitors that care.
1323  *
1324  * @param peer peer the information is about
1325  * @param address address the information is about
1326  * @param nt network type associated with @a address
1327  * @param me detailed information to transmit
1328  */
1329 static void
1330 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1331                  const char *address,
1332                  enum GNUNET_NetworkType nt,
1333                  const struct MonitorEvent *me)
1334 {
1335   static struct GNUNET_PeerIdentity zero;
1336
1337   for (struct TransportClient *tc = clients_head;
1338        NULL != tc;
1339        tc = tc->next)
1340   {
1341     if (CT_MONITOR != tc->type)
1342       continue;
1343     if (tc->details.monitor.one_shot)
1344       continue;
1345     if ( (0 != memcmp (&tc->details.monitor.peer,
1346                        &zero,
1347                        sizeof (zero))) &&
1348          (0 != memcmp (&tc->details.monitor.peer,
1349                        peer,
1350                        sizeof (*peer))) )
1351       continue;
1352     notify_monitor (tc,
1353                     peer,
1354                     address,
1355                     nt,
1356                     me);
1357   }
1358 }
1359
1360
1361 /**
1362  * Called whenever a client connects.  Allocates our
1363  * data structures associated with that client.
1364  *
1365  * @param cls closure, NULL
1366  * @param client identification of the client
1367  * @param mq message queue for the client
1368  * @return our `struct TransportClient`
1369  */
1370 static void *
1371 client_connect_cb (void *cls,
1372                    struct GNUNET_SERVICE_Client *client,
1373                    struct GNUNET_MQ_Handle *mq)
1374 {
1375   struct TransportClient *tc;
1376
1377   tc = GNUNET_new (struct TransportClient);
1378   tc->client = client;
1379   tc->mq = mq;
1380   GNUNET_CONTAINER_DLL_insert (clients_head,
1381                                clients_tail,
1382                                tc);
1383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1384               "Client %p connected\n",
1385               tc);
1386   return tc;
1387 }
1388
1389
1390 /**
1391  * Release memory used by @a neighbour.
1392  *
1393  * @param neighbour neighbour entry to free
1394  */
1395 static void
1396 free_neighbour (struct Neighbour *neighbour)
1397 {
1398   GNUNET_assert (NULL == neighbour->session_head);
1399   GNUNET_assert (GNUNET_YES ==
1400                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
1401                                                        &neighbour->pid,
1402                                                        neighbour));
1403   if (NULL != neighbour->timeout_task)
1404     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1405   GNUNET_free (neighbour);
1406 }
1407
1408
1409 /**
1410  * Send message to CORE clients that we lost a connection.
1411  *
1412  * @param tc client to inform (must be CORE client)
1413  * @param pid peer the connection is for
1414  * @param quota_out current quota for the peer
1415  */
1416 static void
1417 core_send_connect_info (struct TransportClient *tc,
1418                         const struct GNUNET_PeerIdentity *pid,
1419                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1420 {
1421   struct GNUNET_MQ_Envelope *env;
1422   struct ConnectInfoMessage *cim;
1423
1424   GNUNET_assert (CT_CORE == tc->type);
1425   env = GNUNET_MQ_msg (cim,
1426                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1427   cim->quota_out = quota_out;
1428   cim->id = *pid;
1429   GNUNET_MQ_send (tc->mq,
1430                   env);
1431 }
1432
1433
1434 /**
1435  * Send message to CORE clients that we gained a connection
1436  *
1437  * @param pid peer the queue was for
1438  * @param quota_out current quota for the peer
1439  */
1440 static void
1441 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1442                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1443 {
1444   for (struct TransportClient *tc = clients_head;
1445        NULL != tc;
1446        tc = tc->next)
1447   {
1448     if (CT_CORE != tc->type)
1449       continue;
1450     core_send_connect_info (tc,
1451                             pid,
1452                             quota_out);
1453   }
1454 }
1455
1456
1457 /**
1458  * Send message to CORE clients that we lost a connection.
1459  *
1460  * @param pid peer the connection was for
1461  */
1462 static void
1463 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1464 {
1465   for (struct TransportClient *tc = clients_head;
1466        NULL != tc;
1467        tc = tc->next)
1468   {
1469     struct GNUNET_MQ_Envelope *env;
1470     struct DisconnectInfoMessage *dim;
1471
1472     if (CT_CORE != tc->type)
1473       continue;
1474     env = GNUNET_MQ_msg (dim,
1475                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1476     dim->peer = *pid;
1477     GNUNET_MQ_send (tc->mq,
1478                     env);
1479   }
1480 }
1481
1482
1483 /**
1484  * We believe we are ready to transmit a message on a queue. Double-checks
1485  * with the queue's "tracker_out" and then gives the message to the 
1486  * communicator for transmission (updating the tracker, and re-scheduling
1487  * itself if applicable).  
1488  *
1489  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1490  */ 
1491 static void
1492 transmit_on_queue (void *cls);
1493
1494
1495 /**
1496  * Schedule next run of #transmit_on_queue().  Does NOTHING if 
1497  * we should run immediately or if the message queue is empty.
1498  * Test for no task being added AND queue not being empty to
1499  * transmit immediately afterwards!  This function must only
1500  * be called if the message queue is non-empty!
1501  *
1502  * @param queue the queue to do scheduling for
1503  */ 
1504 static void
1505 schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1506 {
1507   struct Neighbour *n = queue->neighbour;
1508   struct PendingMessage *pm = n->pending_msg_head;
1509   struct GNUNET_TIME_Relative out_delay;
1510   unsigned int wsize;
1511
1512   GNUNET_assert (NULL != pm);
1513   if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1514   {
1515     GNUNET_STATISTICS_update (GST_stats,
1516                               "# Transmission throttled due to communicator queue limit",
1517                               1,
1518                               GNUNET_NO);    
1519     return;
1520   }
1521   if (queue->queue_length >= SESSION_QUEUE_LIMIT)
1522   {
1523     GNUNET_STATISTICS_update (GST_stats,
1524                               "# Transmission throttled due to session queue limit",
1525                               1,
1526                               GNUNET_NO);    
1527     return;
1528   }
1529       
1530   wsize = (0 == queue->mtu)
1531     ? pm->bytes_msg /* FIXME: add overheads? */
1532     : queue->mtu;
1533   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1534                                                   wsize);
1535   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1536                                         out_delay);
1537   if (0 == out_delay.rel_value_us)
1538     return; /* we should run immediately! */
1539   /* queue has changed since we were scheduled, reschedule again */
1540   queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1541                                                        &transmit_on_queue,
1542                                                        queue);
1543   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1544     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1545                 "Next transmission on queue `%s' in %s (high delay)\n",
1546                 queue->address,
1547                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1548                                                         GNUNET_YES));
1549   else
1550     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1551                 "Next transmission on queue `%s' in %s\n",
1552                 queue->address,
1553                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1554                                                         GNUNET_YES));
1555 }
1556
1557
1558 /**
1559  * Free @a session.
1560  *
1561  * @param session the session to free
1562  */
1563 static void
1564 free_session (struct GNUNET_ATS_Session *session)
1565 {
1566   struct Neighbour *neighbour = session->neighbour;
1567   struct TransportClient *tc = session->tc;
1568   struct MonitorEvent me = {
1569     .cs = GNUNET_TRANSPORT_CS_DOWN,
1570     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
1571   };
1572   struct QueueEntry *qe;
1573   int maxxed;
1574
1575   if (NULL != session->transmit_task)
1576   {
1577     GNUNET_SCHEDULER_cancel (session->transmit_task);
1578     session->transmit_task = NULL;
1579   }
1580   GNUNET_CONTAINER_MDLL_remove (neighbour,
1581                                 neighbour->session_head,
1582                                 neighbour->session_tail,
1583                                 session);
1584   GNUNET_CONTAINER_MDLL_remove (client,
1585                                 tc->details.communicator.session_head,
1586                                 tc->details.communicator.session_tail,
1587                                 session);
1588   maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
1589   while (NULL != (qe = session->queue_head))
1590   {
1591     GNUNET_CONTAINER_DLL_remove (session->queue_head,
1592                                  session->queue_tail,
1593                                  qe);
1594     session->queue_length--;
1595     tc->details.communicator.total_queue_length--;
1596     GNUNET_free (qe);
1597   }
1598   GNUNET_assert (0 == session->queue_length);
1599   if ( (maxxed) &&
1600        (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
1601   {
1602     /* Communicator dropped below threshold, resume all queues */
1603     GNUNET_STATISTICS_update (GST_stats,
1604                               "# Transmission throttled due to communicator queue limit",
1605                               -1,
1606                               GNUNET_NO);
1607     for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
1608          NULL != s;
1609          s = s->next_client)
1610       schedule_transmit_on_queue (s);
1611   }
1612   notify_monitors (&neighbour->pid,
1613                    session->address,
1614                    session->nt,
1615                    &me);
1616   GNUNET_ATS_session_del (session->sr);
1617   GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
1618   GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
1619   GNUNET_free (session);
1620   if (NULL == neighbour->session_head)
1621   {
1622     cores_send_disconnect_info (&neighbour->pid);
1623     free_neighbour (neighbour);
1624   }
1625 }
1626
1627
1628 /**
1629  * Free @a ale
1630  *
1631  * @param ale address list entry to free
1632  */
1633 static void
1634 free_address_list_entry (struct AddressListEntry *ale)
1635 {
1636   struct TransportClient *tc = ale->tc;
1637
1638   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
1639                                tc->details.communicator.addr_tail,
1640                                ale);
1641   if (NULL != ale->sc)
1642   {
1643     GNUNET_PEERSTORE_store_cancel (ale->sc);
1644     ale->sc = NULL;
1645   }
1646   if (NULL != ale->st)
1647   {
1648     GNUNET_SCHEDULER_cancel (ale->st);
1649     ale->st = NULL;
1650   }
1651   GNUNET_free (ale);
1652 }
1653
1654
1655 /**
1656  * Called whenever a client is disconnected.  Frees our
1657  * resources associated with that client.
1658  *
1659  * @param cls closure, NULL
1660  * @param client identification of the client
1661  * @param app_ctx our `struct TransportClient`
1662  */
1663 static void
1664 client_disconnect_cb (void *cls,
1665                       struct GNUNET_SERVICE_Client *client,
1666                       void *app_ctx)
1667 {
1668   struct TransportClient *tc = app_ctx;
1669
1670   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1671               "Client %p disconnected, cleaning up.\n",
1672               tc);
1673   GNUNET_CONTAINER_DLL_remove (clients_head,
1674                                clients_tail,
1675                                tc);
1676   switch (tc->type)
1677   {
1678   case CT_NONE:
1679     break;
1680   case CT_CORE:
1681     {
1682       struct PendingMessage *pm;
1683
1684       while (NULL != (pm = tc->details.core.pending_msg_head))
1685       {
1686         GNUNET_CONTAINER_MDLL_remove (client,
1687                                       tc->details.core.pending_msg_head,
1688                                       tc->details.core.pending_msg_tail,
1689                                       pm);
1690         pm->client = NULL;
1691       }
1692     }
1693     break;
1694   case CT_MONITOR:
1695     break;
1696   case CT_COMMUNICATOR:
1697     {
1698       struct GNUNET_ATS_Session *q;
1699       struct AddressListEntry *ale;
1700
1701       while (NULL != (q = tc->details.communicator.session_head))
1702         free_session (q);
1703       while (NULL != (ale = tc->details.communicator.addr_head))
1704         free_address_list_entry (ale);
1705       GNUNET_free (tc->details.communicator.address_prefix);
1706     }
1707     break;
1708   }
1709   GNUNET_free (tc);
1710 }
1711
1712
1713 /**
1714  * Iterator telling new CORE client about all existing
1715  * connections to peers.
1716  *
1717  * @param cls the new `struct TransportClient`
1718  * @param pid a connected peer
1719  * @param value the `struct Neighbour` with more information
1720  * @return #GNUNET_OK (continue to iterate)
1721  */
1722 static int
1723 notify_client_connect_info (void *cls,
1724                             const struct GNUNET_PeerIdentity *pid,
1725                             void *value)
1726 {
1727   struct TransportClient *tc = cls;
1728   struct Neighbour *neighbour = value;
1729
1730   core_send_connect_info (tc,
1731                           pid,
1732                           neighbour->quota_out);
1733   return GNUNET_OK;
1734 }
1735
1736
1737 /**
1738  * Initialize a "CORE" client.  We got a start message from this
1739  * client, so add it to the list of clients for broadcasting of
1740  * inbound messages.
1741  *
1742  * @param cls the client
1743  * @param start the start message that was sent
1744  */
1745 static void
1746 handle_client_start (void *cls,
1747                      const struct StartMessage *start)
1748 {
1749   struct TransportClient *tc = cls;
1750   uint32_t options;
1751
1752   options = ntohl (start->options);
1753   if ( (0 != (1 & options)) &&
1754        (0 !=
1755         memcmp (&start->self,
1756                 &GST_my_identity,
1757                 sizeof (struct GNUNET_PeerIdentity)) ) )
1758   {
1759     /* client thinks this is a different peer, reject */
1760     GNUNET_break (0);
1761     GNUNET_SERVICE_client_drop (tc->client);
1762     return;
1763   }
1764   if (CT_NONE != tc->type)
1765   {
1766     GNUNET_break (0);
1767     GNUNET_SERVICE_client_drop (tc->client);
1768     return;
1769   }
1770   tc->type = CT_CORE;
1771   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1772                                          &notify_client_connect_info,
1773                                          tc);
1774   GNUNET_SERVICE_client_continue (tc->client);
1775 }
1776
1777
1778 /**
1779  * Client asked for transmission to a peer.  Process the request.
1780  *
1781  * @param cls the client
1782  * @param obm the send message that was sent
1783  */
1784 static int
1785 check_client_send (void *cls,
1786                    const struct OutboundMessage *obm)
1787 {
1788   struct TransportClient *tc = cls;
1789   uint16_t size;
1790   const struct GNUNET_MessageHeader *obmm;
1791
1792   if (CT_CORE != tc->type)
1793   {
1794     GNUNET_break (0);
1795     return GNUNET_SYSERR;
1796   }
1797   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
1798   if (size < sizeof (struct GNUNET_MessageHeader))
1799   {
1800     GNUNET_break (0);
1801     return GNUNET_SYSERR;
1802   }
1803   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1804   if (size != ntohs (obmm->size))
1805   {
1806     GNUNET_break (0);
1807     return GNUNET_SYSERR;
1808   }
1809   return GNUNET_OK;
1810 }
1811
1812
1813 /**
1814  * Free fragment tree below @e root, excluding @e root itself.
1815  *
1816  * @param root root of the tree to free
1817  */  
1818 static void
1819 free_fragment_tree (struct PendingMessage *root)
1820 {
1821   struct PendingMessage *frag;
1822
1823   while (NULL != (frag = root->head_frag))
1824   {
1825     free_fragment_tree (frag);
1826     GNUNET_CONTAINER_MDLL_remove (frag,
1827                                   root->head_frag,
1828                                   root->tail_frag,
1829                                   frag);
1830     GNUNET_free (frag);
1831   }
1832 }
1833
1834
1835 /**
1836  * Release memory associated with @a pm and remove @a pm from associated
1837  * data structures.  @a pm must be a top-level pending message and not
1838  * a fragment in the tree.  The entire tree is freed (if applicable).
1839  *
1840  * @param pm the pending message to free
1841  */
1842 static void
1843 free_pending_message (struct PendingMessage *pm)
1844 {
1845   struct TransportClient *tc = pm->client;
1846   struct Neighbour *target = pm->target;
1847
1848   if (NULL != tc)
1849   {
1850     GNUNET_CONTAINER_MDLL_remove (client,
1851                                   tc->details.core.pending_msg_head,
1852                                   tc->details.core.pending_msg_tail,
1853                                   pm);
1854   }
1855   GNUNET_CONTAINER_MDLL_remove (neighbour,
1856                                 target->pending_msg_head,
1857                                 target->pending_msg_tail,
1858                                 pm);
1859   free_fragment_tree (pm);
1860   GNUNET_free_non_null (pm->bpm);
1861   GNUNET_free (pm);
1862 }
1863
1864
1865 /**
1866  * Send a response to the @a pm that we have processed a
1867  * "send" request with status @a success. We
1868  * transmitted @a bytes_physical on the actual wire.
1869  * Sends a confirmation to the "core" client responsible
1870  * for the original request and free's @a pm.
1871  *
1872  * @param pm handle to the original pending message
1873  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
1874  *          for transmission failure
1875  * @param bytes_physical amount of bandwidth consumed
1876  */
1877 static void
1878 client_send_response (struct PendingMessage *pm,
1879                       int success,
1880                       uint32_t bytes_physical)
1881 {
1882   struct TransportClient *tc = pm->client;
1883   struct Neighbour *target = pm->target;
1884   struct GNUNET_MQ_Envelope *env;
1885   struct SendOkMessage *som;
1886
1887   if (NULL != tc)
1888   {
1889     env = GNUNET_MQ_msg (som,
1890                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1891     som->success = htonl ((uint32_t) success);
1892     som->bytes_msg = htons (pm->bytes_msg);
1893     som->bytes_physical = htonl (bytes_physical);
1894     som->peer = target->pid;
1895     GNUNET_MQ_send (tc->mq,
1896                     env);
1897   }
1898   free_pending_message (pm);
1899 }
1900
1901
1902 /**
1903  * Checks the message queue for a neighbour for messages that have timed
1904  * out and purges them.
1905  *
1906  * @param cls a `struct Neighbour`
1907  */
1908 static void
1909 check_queue_timeouts (void *cls)
1910 {
1911   struct Neighbour *n = cls;
1912   struct PendingMessage *pm;
1913   struct GNUNET_TIME_Absolute now;
1914   struct GNUNET_TIME_Absolute earliest_timeout;
1915
1916   n->timeout_task = NULL;
1917   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1918   now = GNUNET_TIME_absolute_get ();
1919   for (struct PendingMessage *pos = n->pending_msg_head;
1920        NULL != pos;
1921        pos = pm)
1922   {
1923     pm = pos->next_neighbour;
1924     if (pos->timeout.abs_value_us <= now.abs_value_us)
1925     {
1926       GNUNET_STATISTICS_update (GST_stats,
1927                                 "# messages dropped (timeout before confirmation)",
1928                                 1,
1929                                 GNUNET_NO);
1930       client_send_response (pm,
1931                             GNUNET_NO,
1932                             0);      
1933       continue;
1934     }
1935     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
1936                                                  pos->timeout);
1937   }
1938   n->earliest_timeout = earliest_timeout;
1939   if (NULL != n->pending_msg_head)
1940     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
1941                                                &check_queue_timeouts,
1942                                                n);
1943 }
1944
1945
1946 /**
1947  * Client asked for transmission to a peer.  Process the request.
1948  *
1949  * @param cls the client
1950  * @param obm the send message that was sent
1951  */
1952 static void
1953 handle_client_send (void *cls,
1954                     const struct OutboundMessage *obm)
1955 {
1956   struct TransportClient *tc = cls;
1957   struct PendingMessage *pm;
1958   const struct GNUNET_MessageHeader *obmm;
1959   struct Neighbour *target;
1960   uint32_t bytes_msg;
1961
1962   GNUNET_assert (CT_CORE == tc->type);
1963   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1964   bytes_msg = ntohs (obmm->size);
1965   target = lookup_neighbour (&obm->peer);
1966   if (NULL == target)
1967   {
1968     /* Failure: don't have this peer as a neighbour (anymore).
1969        Might have gone down asynchronously, so this is NOT
1970        a protocol violation by CORE. Still count the event,
1971        as this should be rare. */
1972     struct GNUNET_MQ_Envelope *env;
1973     struct SendOkMessage *som;
1974
1975     env = GNUNET_MQ_msg (som,
1976                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1977     som->success = htonl (GNUNET_SYSERR);
1978     som->bytes_msg = htonl (bytes_msg);
1979     som->bytes_physical = htonl (0);
1980     som->peer = obm->peer;
1981     GNUNET_MQ_send (tc->mq,
1982                     env);
1983     GNUNET_SERVICE_client_continue (tc->client);
1984     GNUNET_STATISTICS_update (GST_stats,
1985                               "# messages dropped (neighbour unknown)",
1986                               1,
1987                               GNUNET_NO);
1988     return;
1989   }
1990   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
1991   pm->client = tc;
1992   pm->target = target;
1993   pm->bytes_msg = bytes_msg;
1994   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
1995   memcpy (&pm[1],
1996           &obm[1],
1997           bytes_msg);
1998   GNUNET_CONTAINER_MDLL_insert (neighbour,
1999                                 target->pending_msg_head,
2000                                 target->pending_msg_tail,
2001                                 pm);
2002   GNUNET_CONTAINER_MDLL_insert (client,
2003                                 tc->details.core.pending_msg_head,
2004                                 tc->details.core.pending_msg_tail,
2005                                 pm);
2006   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2007   {
2008     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2009     if (NULL != target->timeout_task)
2010       GNUNET_SCHEDULER_cancel (target->timeout_task);
2011     target->timeout_task 
2012       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2013                                  &check_queue_timeouts,
2014                                  target);
2015   }
2016 }
2017
2018
2019 /**
2020  * Communicator started.  Test message is well-formed.
2021  *
2022  * @param cls the client
2023  * @param cam the send message that was sent
2024  */
2025 static int
2026 check_communicator_available (void *cls,
2027                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2028 {
2029   struct TransportClient *tc = cls;
2030   uint16_t size;
2031
2032   if (CT_NONE != tc->type)
2033   {
2034     GNUNET_break (0);
2035     return GNUNET_SYSERR;
2036   }
2037   tc->type = CT_COMMUNICATOR;
2038   size = ntohs (cam->header.size) - sizeof (*cam);
2039   if (0 == size)
2040     return GNUNET_OK; /* receive-only communicator */
2041   GNUNET_MQ_check_zero_termination (cam);
2042   return GNUNET_OK;
2043 }
2044
2045
2046 /**
2047  * Communicator started.  Process the request.
2048  *
2049  * @param cls the client
2050  * @param cam the send message that was sent
2051  */
2052 static void
2053 handle_communicator_available (void *cls,
2054                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2055 {
2056   struct TransportClient *tc = cls;
2057   uint16_t size;
2058
2059   size = ntohs (cam->header.size) - sizeof (*cam);
2060   if (0 == size)
2061     return; /* receive-only communicator */
2062   tc->details.communicator.address_prefix
2063     = GNUNET_strdup ((const char *) &cam[1]);
2064   tc->details.communicator.cc
2065     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2066   GNUNET_SERVICE_client_continue (tc->client);
2067 }
2068
2069
2070 /**
2071  * Communicator requests backchannel transmission.  Check the request.
2072  *
2073  * @param cls the client
2074  * @param cb the send message that was sent
2075  * @return #GNUNET_OK if message is well-formed
2076  */
2077 static int
2078 check_communicator_backchannel (void *cls,
2079                                 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2080 {
2081   // FIXME: check encapsulated message
2082   // FIXME: check 0-termination of communcator at target
2083   return GNUNET_OK;
2084 }
2085
2086   
2087 /**
2088  * Communicator requests backchannel transmission.  Process the request.
2089  *
2090  * @param cls the client
2091  * @param cb the send message that was sent
2092  */
2093 static void
2094 handle_communicator_backchannel (void *cls,
2095                                  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2096 {
2097   struct TransportClient *tc = cls;
2098
2099   // FIXME: determine path (possibly DV)! to target peer
2100   // FIXME: encapsulate message, encrypt message!
2101   // FIXME: possibly fragment message
2102   // FIXME: possibly DV-route message!
2103   GNUNET_SERVICE_client_continue (tc->client);
2104 }
2105
2106
2107 /**
2108  * Address of our peer added.  Test message is well-formed.
2109  *
2110  * @param cls the client
2111  * @param aam the send message that was sent
2112  * @return #GNUNET_OK if message is well-formed
2113  */
2114 static int
2115 check_add_address (void *cls,
2116                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2117 {
2118   struct TransportClient *tc = cls;
2119
2120   if (CT_COMMUNICATOR != tc->type)
2121   {
2122     GNUNET_break (0);
2123     return GNUNET_SYSERR;
2124   }
2125   GNUNET_MQ_check_zero_termination (aam);
2126   return GNUNET_OK;
2127 }
2128
2129
2130 /**
2131  * Ask peerstore to store our address.
2132  *
2133  * @param cls an `struct AddressListEntry *`
2134  */
2135 static void
2136 store_pi (void *cls);
2137
2138
2139 /**
2140  * Function called when peerstore is done storing our address.
2141  */
2142 static void
2143 peerstore_store_cb (void *cls,
2144                     int success)
2145 {
2146   struct AddressListEntry *ale = cls;
2147
2148   ale->sc = NULL;
2149   if (GNUNET_YES != success)
2150     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2151                 "Failed to store our own address `%s' in peerstore!\n",
2152                 ale->address);
2153   /* refresh period is 1/4 of expiration time, that should be plenty
2154      without being excessive. */
2155   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
2156                                                                        4ULL),
2157                                           &store_pi,
2158                                           ale);
2159 }
2160
2161
2162 /**
2163  * Ask peerstore to store our address.
2164  *
2165  * @param cls an `struct AddressListEntry *`
2166  */
2167 static void
2168 store_pi (void *cls)
2169 {
2170   struct AddressListEntry *ale = cls;
2171   void *addr;
2172   size_t addr_len;
2173   struct GNUNET_TIME_Absolute expiration;
2174
2175   ale->st = NULL;
2176   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
2177   GNUNET_HELLO_sign_address (ale->address,
2178                              ale->nt,
2179                              expiration,
2180                              GST_my_private_key,
2181                              &addr,
2182                              &addr_len);
2183   ale->sc = GNUNET_PEERSTORE_store (peerstore,
2184                                     "transport",
2185                                     &GST_my_identity,
2186                                     GNUNET_HELLO_PEERSTORE_KEY,
2187                                     addr,
2188                                     addr_len,
2189                                     expiration,
2190                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
2191                                     &peerstore_store_cb,
2192                                     ale);
2193   GNUNET_free (addr);
2194   if (NULL == ale->sc)
2195   {
2196     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2197                 "Failed to store our address `%s' with peerstore\n",
2198                 ale->address);
2199     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
2200                                             &store_pi,
2201                                             ale);
2202   }
2203 }
2204
2205
2206 /**
2207  * Address of our peer added.  Process the request.
2208  *
2209  * @param cls the client
2210  * @param aam the send message that was sent
2211  */
2212 static void
2213 handle_add_address (void *cls,
2214                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2215 {
2216   struct TransportClient *tc = cls;
2217   struct AddressListEntry *ale;
2218   size_t slen;
2219
2220   slen = ntohs (aam->header.size) - sizeof (*aam);
2221   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
2222   ale->tc = tc;
2223   ale->address = (const char *) &ale[1];
2224   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
2225   ale->aid = aam->aid;
2226   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
2227   memcpy (&ale[1],
2228           &aam[1],
2229           slen);
2230   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
2231                                tc->details.communicator.addr_tail,
2232                                ale);
2233   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
2234                                       ale);
2235   GNUNET_SERVICE_client_continue (tc->client);
2236 }
2237
2238
2239 /**
2240  * Address of our peer deleted.  Process the request.
2241  *
2242  * @param cls the client
2243  * @param dam the send message that was sent
2244  */
2245 static void
2246 handle_del_address (void *cls,
2247                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
2248 {
2249   struct TransportClient *tc = cls;
2250
2251   if (CT_COMMUNICATOR != tc->type)
2252   {
2253     GNUNET_break (0);
2254     GNUNET_SERVICE_client_drop (tc->client);
2255     return;
2256   }
2257   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
2258        NULL != ale;
2259        ale = ale->next)
2260   {
2261     if (dam->aid != ale->aid)
2262       continue;
2263     GNUNET_assert (ale->tc == tc);
2264     free_address_list_entry (ale);
2265     GNUNET_SERVICE_client_continue (tc->client);
2266   }
2267   GNUNET_break (0);
2268   GNUNET_SERVICE_client_drop (tc->client);
2269 }
2270
2271
2272 /**
2273  * Context from #handle_incoming_msg().  Closure for many
2274  * message handlers below.
2275  */
2276 struct CommunicatorMessageContext
2277 {
2278   /**
2279    * Which communicator provided us with the message.
2280    */
2281   struct TransportClient *tc;
2282
2283   /**
2284    * Additional information for flow control and about the sender.
2285    */
2286   struct GNUNET_TRANSPORT_IncomingMessage im;
2287 };
2288
2289
2290 /**
2291  * Send ACK to communicator (if requested) and free @a cmc.
2292  *
2293  * @param cmc context for which we are done handling the message
2294  */
2295 static void
2296 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
2297 {
2298   if (0 != ntohl (cmc->im.fc_on))
2299   {
2300     /* send ACK when done to communicator for flow control! */
2301     struct GNUNET_MQ_Envelope *env;
2302     struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
2303
2304     env = GNUNET_MQ_msg (ack,
2305                          GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
2306     ack->reserved = htonl (0);
2307     ack->fc_id = cmc->im.fc_id;
2308     ack->sender = cmc->im.sender;
2309     GNUNET_MQ_send (cmc->tc->mq,
2310                     env);
2311   }
2312   GNUNET_SERVICE_client_continue (cmc->tc->client);
2313   GNUNET_free (cmc);
2314 }
2315
2316
2317 /**
2318  * Communicator gave us an unencapsulated message to pass as-is to
2319  * CORE.  Process the request.
2320  *
2321  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2322  * @param mh the message that was received
2323  */
2324 static void
2325 handle_raw_message (void *cls,
2326                     const struct GNUNET_MessageHeader *mh)
2327 {
2328   struct CommunicatorMessageContext *cmc = cls;
2329   uint16_t size = ntohs (mh->size);
2330
2331   if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
2332        (size < sizeof (struct GNUNET_MessageHeader)) )
2333   {
2334     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
2335
2336     GNUNET_break (0);
2337     finish_cmc_handling (cmc);
2338     GNUNET_SERVICE_client_drop (client);
2339     return;
2340   }
2341   /* Forward to all CORE clients */
2342   for (struct TransportClient *tc = clients_head;
2343        NULL != tc;
2344        tc = tc->next)
2345   {
2346     struct GNUNET_MQ_Envelope *env;
2347     struct InboundMessage *im;
2348
2349     if (CT_CORE != tc->type)
2350       continue;
2351     env = GNUNET_MQ_msg_extra (im,
2352                                size,
2353                                GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2354     im->peer = cmc->im.sender;
2355     memcpy (&im[1],
2356             mh,
2357             size);
2358     GNUNET_MQ_send (tc->mq,
2359                     env);
2360   }
2361   /* FIXME: consider doing this _only_ once the message
2362      was drained from the CORE MQs to extend flow control to CORE! 
2363      (basically, increment counter in cmc, decrement on MQ send continuation! */
2364   finish_cmc_handling (cmc);
2365 }
2366
2367
2368 /**
2369  * Communicator gave us a fragment box.  Check the message.
2370  *
2371  * @param cls a `struct CommunicatorMessageContext`
2372  * @param fb the send message that was sent
2373  * @return #GNUNET_YES if message is well-formed
2374  */
2375 static int
2376 check_fragment_box (void *cls,
2377                     const struct TransportFragmentBox *fb)
2378 {
2379   uint16_t size = ntohs (fb->header.size);
2380   uint16_t bsize = size - sizeof (*fb);
2381
2382   if (0 == bsize)
2383   {
2384     GNUNET_break_op (0);
2385     return GNUNET_SYSERR;
2386   }
2387   if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size)) 
2388   {
2389     GNUNET_break_op (0);
2390     return GNUNET_SYSERR;
2391   }
2392   if (ntohs (fb->frag_off) >= ntohs (fb->msg_size)) 
2393   {
2394     GNUNET_break_op (0);
2395     return GNUNET_SYSERR;
2396   }
2397   return GNUNET_YES;
2398 }
2399
2400
2401 /**
2402  * Communicator gave us a fragment.  Process the request.
2403  *
2404  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2405  * @param fb the message that was received
2406  */
2407 static void
2408 handle_fragment_box (void *cls,
2409                      const struct TransportFragmentBox *fb)
2410 {
2411   struct CommunicatorMessageContext *cmc = cls;
2412   
2413   // FIXME: do work!
2414   finish_cmc_handling (cmc);
2415 }
2416
2417
2418 /**
2419  * Communicator gave us a fragment acknowledgement.  Process the request.
2420  *
2421  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2422  * @param fa the message that was received
2423  */
2424 static void
2425 handle_fragment_ack (void *cls,
2426                      const struct TransportFragmentAckMessage *fa)
2427 {
2428   struct CommunicatorMessageContext *cmc = cls;
2429   
2430   // FIXME: do work!
2431   finish_cmc_handling (cmc);
2432 }
2433
2434
2435 /**
2436  * Communicator gave us a reliability box.  Check the message.
2437  *
2438  * @param cls a `struct CommunicatorMessageContext`
2439  * @param rb the send message that was sent
2440  * @return #GNUNET_YES if message is well-formed
2441  */
2442 static int
2443 check_reliability_box (void *cls,
2444                        const struct TransportReliabilityBox *rb)
2445 {
2446   GNUNET_MQ_check_boxed_message (rb);
2447   return GNUNET_YES;
2448 }
2449
2450
2451 /**
2452  * Communicator gave us a reliability box.  Process the request.
2453  *
2454  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2455  * @param rb the message that was received
2456  */
2457 static void
2458 handle_reliability_box (void *cls,
2459                         const struct TransportReliabilityBox *rb)
2460 {
2461   struct CommunicatorMessageContext *cmc = cls;
2462   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
2463   
2464   // FIXME: send back reliability ACK (possibly conditional)
2465   /* forward encapsulated message to CORE */
2466   handle_raw_message (cmc,
2467                       inbox);
2468 }
2469
2470
2471 /**
2472  * Communicator gave us a reliability ack.  Process the request.
2473  *
2474  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2475  * @param ra the message that was received
2476  */
2477 static void
2478 handle_reliability_ack (void *cls,
2479                         const struct TransportReliabilityAckMessage *ra)
2480 {
2481   struct CommunicatorMessageContext *cmc = cls;
2482   
2483   // FIXME: do work: find message that was acknowledged, and
2484   // remove from transmission queue; update RTT.
2485   finish_cmc_handling (cmc);
2486 }
2487
2488
2489 /**
2490  * Communicator gave us a backchannel encapsulation.  Check the message.
2491  *
2492  * @param cls a `struct CommunicatorMessageContext`
2493  * @param be the send message that was sent
2494  * @return #GNUNET_YES if message is well-formed
2495  */
2496 static int
2497 check_backchannel_encapsulation (void *cls,
2498                                  const struct TransportBackchannelEncapsulationMessage *be)
2499 {
2500   uint16_t size = ntohs (be->header.size);
2501
2502   if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
2503   {
2504     GNUNET_break_op (0);
2505     return GNUNET_SYSERR;
2506   }
2507   return GNUNET_YES;
2508 }
2509
2510
2511 /**
2512  * Communicator gave us a backchannel encapsulation.  Process the request.
2513  *
2514  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2515  * @param be the message that was received
2516  */
2517 static void
2518 handle_backchannel_encapsulation (void *cls,
2519                                   const struct TransportBackchannelEncapsulationMessage *be)
2520 {
2521   struct CommunicatorMessageContext *cmc = cls;
2522
2523   // FIMXE: test if it is for me, if not, try to forward to target (DV routes!)
2524   // FIXME: compute shared secret
2525   // FIXME: check HMAC
2526   // FIXME: decrypt payload
2527   // FIXME: forward to specified communicator!
2528   
2529   finish_cmc_handling (cmc);
2530 }
2531
2532
2533 /**
2534  * Communicator gave us an ephemeral confirmation.  Process the request.
2535  *
2536  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2537  * @param ec the message that was received
2538  */
2539 static void
2540 handle_ephemeral_confirmation (void *cls,
2541                                const struct EphemeralConfirmationMessage *ec)
2542 {
2543   struct CommunicatorMessageContext *cmc = cls;
2544
2545   // FIXME: notify communicator (?) about ephemeral confirmation!?
2546   // FIXME: or does this have something to do with the ephemeral_map?
2547   //        where did I plan to use this message again!?
2548   // FIXME: communicator API has a very general notification API,
2549   //        nothing specific for ephemeral keys;
2550   //        why do we have a ephemeral key-specific message here?
2551   // => first revise where we get such messages from communicator
2552   //    before processing further here!
2553   finish_cmc_handling (cmc);
2554 }
2555
2556
2557 /**
2558  * Communicator gave us a DV learn message.  Check the message.
2559  *
2560  * @param cls a `struct CommunicatorMessageContext`
2561  * @param dvl the send message that was sent
2562  * @return #GNUNET_YES if message is well-formed
2563  */
2564 static int
2565 check_dv_learn (void *cls,
2566                 const struct TransportDVLearn *dvl)
2567 {
2568   uint16_t size = ntohs (dvl->header.size);
2569   uint16_t num_hops = ntohs (dvl->num_hops);
2570   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvl[1];
2571
2572   if (size != sizeof (*dvl) + num_hops * sizeof (struct GNUNET_PeerIdentity))
2573   {
2574     GNUNET_break_op (0);
2575     return GNUNET_SYSERR;
2576   }
2577   for (unsigned int i=0;i<num_hops;i++)
2578   {
2579     if (0 == memcmp (&dvl->initiator,
2580                      &hops[i],
2581                      sizeof (struct GNUNET_PeerIdentity)))
2582     {
2583       GNUNET_break_op (0);
2584       return GNUNET_SYSERR;
2585     }
2586     if (0 == memcmp (&GST_my_identity,
2587                      &hops[i],
2588                      sizeof (struct GNUNET_PeerIdentity)))
2589     {
2590       GNUNET_break_op (0);
2591       return GNUNET_SYSERR;
2592     }
2593   }
2594   return GNUNET_YES;
2595 }
2596
2597
2598 /**
2599  * Communicator gave us a DV learn message.  Process the request.
2600  *
2601  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2602  * @param dvl the message that was received
2603  */
2604 static void
2605 handle_dv_learn (void *cls,
2606                  const struct TransportDVLearn *dvl)
2607 {
2608   struct CommunicatorMessageContext *cmc = cls;
2609   
2610   // FIXME: learn path from DV message (if bi-directional flags are set)
2611   // FIXME: expand DV message, forward on (unless path is getting too long)
2612   finish_cmc_handling (cmc);
2613 }
2614
2615
2616 /**
2617  * Communicator gave us a DV box.  Check the message.
2618  *
2619  * @param cls a `struct CommunicatorMessageContext`
2620  * @param dvb the send message that was sent
2621  * @return #GNUNET_YES if message is well-formed
2622  */
2623 static int
2624 check_dv_box (void *cls,
2625               const struct TransportDVBox *dvb)
2626 {
2627   uint16_t size = ntohs (dvb->header.size);
2628   uint16_t num_hops = ntohs (dvb->num_hops);
2629   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
2630   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
2631   uint16_t isize;
2632   uint16_t itype;
2633
2634   if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
2635   {
2636     GNUNET_break_op (0);
2637     return GNUNET_SYSERR;
2638   }
2639   isize = ntohs (inbox->size);
2640   if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize) 
2641   {
2642     GNUNET_break_op (0);
2643     return GNUNET_SYSERR;
2644   }
2645   itype = ntohs (inbox->type);
2646   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
2647        (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
2648   {
2649     GNUNET_break_op (0);
2650     return GNUNET_SYSERR;
2651   }
2652   return GNUNET_YES;
2653 }
2654
2655
2656 /**
2657  * Communicator gave us a DV box.  Process the request.
2658  *
2659  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2660  * @param dvb the message that was received
2661  */
2662 static void
2663 handle_dv_box (void *cls,
2664                const struct TransportDVBox *dvb)
2665 {
2666   struct CommunicatorMessageContext *cmc = cls;
2667   
2668   // FIXME: are we the target? Then unbox and handle message.
2669   // FIXME: if we are not the target, shorten path and forward along.
2670   finish_cmc_handling (cmc);
2671 }
2672
2673
2674 /**
2675  * Client notified us about transmission from a peer.  Process the request.
2676  *
2677  * @param cls a `struct TransportClient` which sent us the message
2678  * @param obm the send message that was sent
2679  * @return #GNUNET_YES if message is well-formed
2680  */
2681 static int
2682 check_incoming_msg (void *cls,
2683                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
2684 {
2685   struct TransportClient *tc = cls;
2686
2687   if (CT_COMMUNICATOR != tc->type)
2688   {
2689     GNUNET_break (0);
2690     return GNUNET_SYSERR;
2691   }
2692   GNUNET_MQ_check_boxed_message (im);
2693   return GNUNET_OK;
2694 }
2695
2696
2697 /**
2698  * Incoming meessage.  Process the request.
2699  *
2700  * @param im the send message that was received
2701  */
2702 static void
2703 handle_incoming_msg (void *cls,
2704                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
2705 {
2706   struct TransportClient *tc = cls;
2707   struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
2708   struct GNUNET_MQ_MessageHandler handlers[] = {
2709     GNUNET_MQ_hd_var_size (fragment_box,
2710                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
2711                            struct TransportFragmentBox,
2712                            &cmc),
2713     GNUNET_MQ_hd_fixed_size (fragment_ack,
2714                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
2715                              struct TransportFragmentAckMessage,
2716                              &cmc),
2717     GNUNET_MQ_hd_var_size (reliability_box,
2718                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
2719                            struct TransportReliabilityBox,
2720                            &cmc),
2721     GNUNET_MQ_hd_fixed_size (reliability_ack,
2722                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
2723                              struct TransportReliabilityAckMessage,
2724                              &cmc),
2725     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
2726                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
2727                            struct TransportBackchannelEncapsulationMessage,
2728                            &cmc),
2729     GNUNET_MQ_hd_fixed_size (ephemeral_confirmation,
2730                              GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION,
2731                              struct EphemeralConfirmationMessage,
2732                              &cmc),
2733     GNUNET_MQ_hd_var_size (dv_learn,
2734                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
2735                            struct TransportDVLearn,
2736                            &cmc),
2737     GNUNET_MQ_hd_var_size (dv_box,
2738                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
2739                            struct TransportDVBox,
2740                            &cmc),
2741     GNUNET_MQ_handler_end()
2742   };
2743   int ret;
2744
2745   cmc->tc = tc;
2746   cmc->im = *im;
2747   ret = GNUNET_MQ_handle_message (handlers,
2748                                   (const struct GNUNET_MessageHeader *) &im[1]);
2749   if (GNUNET_SYSERR == ret)
2750   {
2751     GNUNET_break (0);
2752     GNUNET_SERVICE_client_drop (tc->client);
2753     GNUNET_free (cmc);
2754     return;
2755   }
2756   if (GNUNET_NO == ret)
2757   {
2758     /* unencapsulated 'raw' message */
2759     handle_raw_message (&cmc,
2760                         (const struct GNUNET_MessageHeader *) &im[1]);
2761   }
2762 }
2763
2764
2765 /**
2766  * New queue became available.  Check message.
2767  *
2768  * @param cls the client
2769  * @param aqm the send message that was sent
2770  */
2771 static int
2772 check_add_queue_message (void *cls,
2773                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
2774 {
2775   struct TransportClient *tc = cls;
2776
2777   if (CT_COMMUNICATOR != tc->type)
2778   {
2779     GNUNET_break (0);
2780     return GNUNET_SYSERR;
2781   }
2782   GNUNET_MQ_check_zero_termination (aqm);
2783   return GNUNET_OK;
2784 }
2785
2786
2787 /**
2788  * Bandwidth tracker informs us that the delay until we should receive
2789  * more has changed.
2790  *
2791  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
2792  */
2793 static void
2794 tracker_update_in_cb (void *cls)
2795 {
2796   struct GNUNET_ATS_Session *queue = cls;
2797   struct GNUNET_TIME_Relative in_delay;
2798   unsigned int rsize;
2799   
2800   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
2801   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
2802                                                  rsize);
2803   // FIXME: how exactly do we do inbound flow control?
2804 }
2805
2806
2807 /**
2808  * If necessary, generates the UUID for a @a pm
2809  *
2810  * @param pm pending message to generate UUID for.
2811  */
2812 static void
2813 set_pending_message_uuid (struct PendingMessage *pm)
2814 {
2815   if (pm->msg_uuid_set)
2816     return;
2817   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
2818                               &pm->msg_uuid,
2819                               sizeof (pm->msg_uuid));
2820   pm->msg_uuid_set = GNUNET_YES;
2821 }
2822
2823
2824 /**
2825  * Fragment the given @a pm to the given @a mtu.  Adds 
2826  * additional fragments to the neighbour as well. If the
2827  * @a mtu is too small, generates and error for the @a pm
2828  * and returns NULL.
2829  *
2830  * @param pm pending message to fragment for transmission
2831  * @param mtu MTU to apply
2832  * @return new message to transmit
2833  */
2834 static struct PendingMessage *
2835 fragment_message (struct PendingMessage *pm,
2836                   uint16_t mtu)
2837 {
2838   struct PendingMessage *ff;
2839
2840   set_pending_message_uuid (pm);
2841   
2842   /* This invariant is established in #handle_add_queue_message() */
2843   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
2844
2845   /* select fragment for transmission, descending the tree if it has
2846      been expanded until we are at a leaf or at a fragment that is small enough */
2847   ff = pm;
2848   while ( ( (ff->bytes_msg > mtu) ||
2849             (pm == ff) ) &&
2850           (ff->frag_off == ff->bytes_msg) &&
2851           (NULL != ff->head_frag) )
2852   {
2853     ff = ff->head_frag; /* descent into fragmented fragments */
2854   }
2855
2856   if ( ( (ff->bytes_msg > mtu) ||
2857          (pm == ff) ) &&
2858        (pm->frag_off < pm->bytes_msg) )
2859   {
2860     /* Did not yet calculate all fragments, calculate next fragment */
2861     struct PendingMessage *frag;
2862     struct TransportFragmentBox tfb;
2863     const char *orig;
2864     char *msg;
2865     uint16_t fragmax;
2866     uint16_t fragsize;
2867     uint16_t msize;
2868     uint16_t xoff = 0;
2869
2870     orig = (const char *) &ff[1];
2871     msize = ff->bytes_msg;
2872     if (pm != ff)
2873     {
2874       const struct TransportFragmentBox *tfbo;
2875
2876       tfbo = (const struct TransportFragmentBox *) orig;
2877       orig += sizeof (struct TransportFragmentBox);
2878       msize -= sizeof (struct TransportFragmentBox);
2879       xoff = ntohs (tfbo->frag_off);
2880     }
2881     fragmax = mtu - sizeof (struct TransportFragmentBox);
2882     fragsize = GNUNET_MIN (msize - ff->frag_off,
2883                            fragmax);
2884     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
2885                           sizeof (struct TransportFragmentBox) +
2886                           fragsize);
2887     frag->target = pm->target;
2888     frag->frag_parent = ff;
2889     frag->timeout = pm->timeout;
2890     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
2891     frag->pmt = PMT_FRAGMENT_BOX;
2892     msg = (char *) &frag[1];
2893     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
2894     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
2895                              fragsize);
2896     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
2897     tfb.msg_uuid = pm->msg_uuid;
2898     tfb.frag_off = htons (ff->frag_off + xoff);
2899     tfb.msg_size = htons (pm->bytes_msg);
2900     memcpy (msg,
2901             &tfb,
2902             sizeof (tfb));
2903     memcpy (&msg[sizeof (tfb)],
2904             &orig[ff->frag_off],
2905             fragsize);
2906     GNUNET_CONTAINER_MDLL_insert (frag,
2907                                   ff->head_frag,
2908                                   ff->tail_frag,
2909                                   frag);
2910     ff->frag_off += fragsize;
2911     ff = frag;
2912   }
2913
2914   /* Move head to the tail and return it */
2915   GNUNET_CONTAINER_MDLL_remove (frag,
2916                                 ff->frag_parent->head_frag,
2917                                 ff->frag_parent->tail_frag,
2918                                 ff);
2919   GNUNET_CONTAINER_MDLL_insert_tail (frag,
2920                                      ff->frag_parent->head_frag,
2921                                      ff->frag_parent->tail_frag,
2922                                      ff);
2923   return ff;
2924 }
2925
2926
2927 /**
2928  * Reliability-box the given @a pm. On error (can there be any), NULL
2929  * may be returned, otherwise the "replacement" for @a pm (which
2930  * should then be added to the respective neighbour's queue instead of
2931  * @a pm).  If the @a pm is already fragmented or reliability boxed,
2932  * or itself an ACK, this function simply returns @a pm.
2933  *
2934  * @param pm pending message to box for transmission over unreliabile queue
2935  * @return new message to transmit
2936  */
2937 static struct PendingMessage *
2938 reliability_box_message (struct PendingMessage *pm)
2939 {
2940   struct TransportReliabilityBox rbox;
2941   struct PendingMessage *bpm;
2942   char *msg;
2943
2944   if (PMT_CORE != pm->pmt)
2945     return pm;  /* already fragmented or reliability boxed, or control message: do nothing */
2946   if (NULL != pm->bpm)
2947     return pm->bpm; /* already computed earlier: do nothing */
2948   GNUNET_assert (NULL == pm->head_frag);
2949   if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX) 
2950   {
2951     /* failed hard */
2952     GNUNET_break (0);
2953     client_send_response (pm,
2954                           GNUNET_NO,
2955                           0);
2956     return NULL;
2957   }
2958   bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
2959                        sizeof (rbox) + 
2960                        pm->bytes_msg);
2961   bpm->target = pm->target;
2962   bpm->frag_parent = pm;
2963   GNUNET_CONTAINER_MDLL_insert (frag,
2964                                 pm->head_frag,
2965                                 pm->tail_frag,
2966                                 bpm);
2967   bpm->timeout = pm->timeout;
2968   bpm->pmt = PMT_RELIABILITY_BOX;
2969   bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
2970   set_pending_message_uuid (bpm);
2971   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
2972   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
2973   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
2974   rbox.msg_uuid = pm->msg_uuid;
2975   msg = (char *) &bpm[1];
2976   memcpy (msg,
2977           &rbox,
2978           sizeof (rbox));
2979   memcpy (&msg[sizeof (rbox)],
2980           &pm[1],
2981           pm->bytes_msg);
2982   pm->bpm = bpm;
2983   return bpm;
2984 }
2985
2986
2987 /**
2988  * We believe we are ready to transmit a message on a queue. Double-checks
2989  * with the queue's "tracker_out" and then gives the message to the 
2990  * communicator for transmission (updating the tracker, and re-scheduling
2991  * itself if applicable).  
2992  *
2993  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
2994  */ 
2995 static void
2996 transmit_on_queue (void *cls)
2997 {
2998   struct GNUNET_ATS_Session *queue = cls;
2999   struct Neighbour *n = queue->neighbour;
3000   struct QueueEntry *qe;
3001   struct PendingMessage *pm;
3002   struct PendingMessage *s;
3003   uint32_t overhead;
3004   struct GNUNET_TRANSPORT_SendMessageTo *smt;
3005   struct GNUNET_MQ_Envelope *env;
3006
3007   queue->transmit_task = NULL;
3008   if (NULL == (pm = n->pending_msg_head))
3009   {
3010     /* no message pending, nothing to do here! */
3011     return; 
3012   }
3013   schedule_transmit_on_queue (queue);
3014   if (NULL != queue->transmit_task)
3015     return; /* do it later */
3016   overhead = 0;
3017   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3018     overhead += sizeof (struct TransportReliabilityBox);
3019   s = pm;
3020   if ( ( (0 != queue->mtu) &&
3021          (pm->bytes_msg + overhead > queue->mtu) ) ||
3022        (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
3023        (NULL != pm->head_frag /* fragments already exist, should
3024                                  respect that even if MTU is 0 for
3025                                  this queue */) )
3026     s = fragment_message (s,
3027                           (0 == queue->mtu)
3028                           ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
3029                           : queue->mtu);
3030   if (NULL == s)
3031   {
3032     /* Fragmentation failed, try next message... */
3033     schedule_transmit_on_queue (queue);
3034     return;
3035   }
3036   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3037     s = reliability_box_message (s);
3038   if (NULL == s)
3039   {
3040     /* Reliability boxing failed, try next message... */
3041     schedule_transmit_on_queue (queue);
3042     return;
3043   }
3044
3045   /* Pass 's' for transission to the communicator */
3046   qe = GNUNET_new (struct QueueEntry);
3047   qe->mid = queue->mid_gen++;
3048   qe->session = queue;
3049   // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
3050   GNUNET_CONTAINER_DLL_insert (queue->queue_head,
3051                                queue->queue_tail,
3052                                qe);
3053   env = GNUNET_MQ_msg_extra (smt,
3054                              s->bytes_msg,
3055                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
3056   smt->qid = queue->qid;
3057   smt->mid = qe->mid;
3058   smt->receiver = n->pid;
3059   memcpy (&smt[1],
3060           &s[1],
3061           s->bytes_msg);
3062   GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3063   queue->queue_length++;
3064   queue->tc->details.communicator.total_queue_length++;
3065   GNUNET_MQ_send (queue->tc->mq,
3066                   env);
3067   
3068   // FIXME: do something similar to the logic below
3069   // in defragmentation / reliability ACK handling!
3070
3071   /* Check if this transmission somehow conclusively finished handing 'pm'
3072      even without any explicit ACKs */
3073   if ( (PMT_CORE == s->pmt) &&
3074        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
3075   {
3076     /* Full message sent, and over reliabile channel */
3077     client_send_response (pm,
3078                           GNUNET_YES,
3079                           pm->bytes_msg);
3080   }
3081   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
3082             (PMT_FRAGMENT_BOX == s->pmt) )
3083   {
3084     struct PendingMessage *pos;
3085     
3086     /* Fragment sent over reliabile channel */
3087     free_fragment_tree (s);
3088     pos = s->frag_parent;
3089     GNUNET_CONTAINER_MDLL_remove (frag,
3090                                   pos->head_frag,
3091                                   pos->tail_frag,
3092                                   s);
3093     GNUNET_free (s);
3094     /* check if subtree is done */
3095     while ( (NULL == pos->head_frag) &&
3096             (pos->frag_off == pos->bytes_msg) &&
3097             (pos != pm) )
3098     {
3099       s = pos;
3100       pos = s->frag_parent;
3101       GNUNET_CONTAINER_MDLL_remove (frag,
3102                                     pos->head_frag,
3103                                     pos->tail_frag,
3104                                     s);
3105       GNUNET_free (s);      
3106     }
3107     
3108     /* Was this the last applicable fragmment? */
3109     if ( (NULL == pm->head_frag) &&
3110          (pm->frag_off == pm->bytes_msg) )
3111       client_send_response (pm,
3112                             GNUNET_YES,
3113                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
3114   }
3115   else if (PMT_CORE != pm->pmt)
3116   {
3117     /* This was an acknowledgement of some type, always free */
3118     free_pending_message (pm);
3119   }
3120   else
3121   {
3122     /* message not finished, waiting for acknowledgement */
3123     struct Neighbour *neighbour = pm->target;
3124     /* Update time by which we might retransmit 's' based on queue
3125        characteristics (i.e. RTT); it takes one RTT for the message to
3126        arrive and the ACK to come back in the best case; but the other
3127        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
3128        retransmitting.  Note that in the future this heuristic should
3129        likely be improved further (measure RTT stability, consider
3130        message urgency and size when delaying ACKs, etc.) */
3131     s->next_attempt = GNUNET_TIME_relative_to_absolute
3132       (GNUNET_TIME_relative_multiply (queue->rtt,
3133                                       4));
3134     if (s == pm)
3135     {
3136       struct PendingMessage *pos;
3137
3138       /* re-insert sort in neighbour list */
3139       GNUNET_CONTAINER_MDLL_remove (neighbour,
3140                                     neighbour->pending_msg_head,
3141                                     neighbour->pending_msg_tail,
3142                                     pm);
3143       pos = neighbour->pending_msg_tail;
3144       while ( (NULL != pos) &&
3145               (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3146         pos = pos->prev_neighbour;
3147       GNUNET_CONTAINER_MDLL_insert_after (neighbour,
3148                                           neighbour->pending_msg_head,
3149                                           neighbour->pending_msg_tail,
3150                                           pos,
3151                                           pm);
3152     }
3153     else
3154     {
3155       /* re-insert sort in fragment list */
3156       struct PendingMessage *fp = s->frag_parent;
3157       struct PendingMessage *pos;
3158
3159       GNUNET_CONTAINER_MDLL_remove (frag,
3160                                     fp->head_frag,
3161                                     fp->tail_frag,
3162                                     s);
3163       pos = fp->tail_frag;
3164       while ( (NULL != pos) &&
3165               (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3166         pos = pos->prev_frag;
3167       GNUNET_CONTAINER_MDLL_insert_after (frag,
3168                                           fp->head_frag,
3169                                           fp->tail_frag,
3170                                           pos,
3171                                           s);
3172     }
3173   }
3174   
3175   /* finally, re-schedule queue transmission task itself */
3176   schedule_transmit_on_queue (queue);
3177 }
3178
3179
3180 /**
3181  * Bandwidth tracker informs us that the delay until we
3182  * can transmit again changed.
3183  *
3184  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
3185  */
3186 static void
3187 tracker_update_out_cb (void *cls)
3188 {
3189   struct GNUNET_ATS_Session *queue = cls;
3190   struct Neighbour *n = queue->neighbour;
3191
3192   if (NULL == n->pending_msg_head)
3193   {
3194     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3195                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
3196                 queue->address);
3197     return; /* no message pending, nothing to do here! */
3198   }
3199   GNUNET_SCHEDULER_cancel (queue->transmit_task);
3200   queue->transmit_task = NULL;
3201   schedule_transmit_on_queue (queue);
3202 }
3203
3204
3205 /**
3206  * Bandwidth tracker informs us that excessive outbound bandwidth was
3207  * allocated which is not being used.
3208  *
3209  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
3210  */
3211 static void
3212 tracker_excess_out_cb (void *cls)
3213 {
3214   /* FIXME: trigger excess bandwidth report to core? Right now,
3215      this is done internally within transport_api2_core already,
3216      but we probably want to change the logic and trigger it 
3217      from here via a message instead! */
3218   /* TODO: maybe inform ATS at this point? */
3219   GNUNET_STATISTICS_update (GST_stats,
3220                             "# Excess outbound bandwidth reported",
3221                             1,
3222                             GNUNET_NO);    
3223 }
3224
3225
3226
3227 /**
3228  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
3229  * which is not being used.
3230  *
3231  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
3232  */
3233 static void
3234 tracker_excess_in_cb (void *cls)
3235 {
3236   /* TODO: maybe inform ATS at this point? */
3237   GNUNET_STATISTICS_update (GST_stats,
3238                             "# Excess inbound bandwidth reported",
3239                             1,
3240                             GNUNET_NO);    
3241 }
3242
3243
3244 /**
3245  * New queue became available.  Process the request.
3246  *
3247  * @param cls the client
3248  * @param aqm the send message that was sent
3249  */
3250 static void
3251 handle_add_queue_message (void *cls,
3252                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
3253 {
3254   struct TransportClient *tc = cls;
3255   struct GNUNET_ATS_Session *queue;
3256   struct Neighbour *neighbour;
3257   const char *addr;
3258   uint16_t addr_len;
3259
3260   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
3261   {
3262     /* MTU so small as to be useless for transmissions,
3263        required for #fragment_message()! */
3264     GNUNET_break_op (0);
3265     GNUNET_SERVICE_client_drop (tc->client);
3266     return;
3267   }
3268   neighbour = lookup_neighbour (&aqm->receiver);
3269   if (NULL == neighbour)
3270   {
3271     neighbour = GNUNET_new (struct Neighbour);
3272     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
3273     neighbour->pid = aqm->receiver;
3274     GNUNET_assert (GNUNET_OK ==
3275                    GNUNET_CONTAINER_multipeermap_put (neighbours,
3276                                                       &neighbour->pid,
3277                                                       neighbour,
3278                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3279     cores_send_connect_info (&neighbour->pid,
3280                              GNUNET_BANDWIDTH_ZERO);
3281   }
3282   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
3283   addr = (const char *) &aqm[1];
3284
3285   queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
3286   queue->tc = tc;
3287   queue->address = (const char *) &queue[1];
3288   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
3289   queue->qid = aqm->qid;
3290   queue->mtu = ntohl (aqm->mtu);
3291   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
3292   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
3293   queue->neighbour = neighbour;
3294   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
3295                                   &tracker_update_in_cb,
3296                                   queue,
3297                                   GNUNET_BANDWIDTH_ZERO,
3298                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
3299                                   &tracker_excess_in_cb,
3300                                   queue);
3301   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
3302                                   &tracker_update_out_cb,
3303                                   queue,
3304                                   GNUNET_BANDWIDTH_ZERO,
3305                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
3306                                   &tracker_excess_out_cb,
3307                                   queue);
3308   memcpy (&queue[1],
3309           addr,
3310           addr_len);
3311   /* notify ATS about new queue */
3312   {
3313     struct GNUNET_ATS_Properties prop = {
3314       .delay = GNUNET_TIME_UNIT_FOREVER_REL,
3315       .mtu = queue->mtu,
3316       .nt = queue->nt,
3317       .cc = tc->details.communicator.cc
3318     };
3319     
3320     queue->sr = GNUNET_ATS_session_add (ats,
3321                                         &neighbour->pid,
3322                                         queue->address,
3323                                         queue,
3324                                         &prop);
3325     if  (NULL == queue->sr)
3326     {
3327       /* This can only happen if the 'address' was way too long for ATS
3328          (approaching 64k in strlen()!). In this case, the communicator
3329          must be buggy and we drop it. */
3330       GNUNET_break (0);
3331       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
3332       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
3333       GNUNET_free (queue);
3334       if (NULL == neighbour->session_head)
3335       {
3336         cores_send_disconnect_info (&neighbour->pid);
3337         free_neighbour (neighbour);
3338       }
3339       GNUNET_SERVICE_client_drop (tc->client);
3340       return;
3341     }
3342   }
3343   /* notify monitors about new queue */
3344   {
3345     struct MonitorEvent me = {
3346       .rtt = queue->rtt,
3347       .cs = queue->cs
3348     };
3349
3350     notify_monitors (&neighbour->pid,
3351                      queue->address,
3352                      queue->nt,
3353                      &me);
3354   }
3355   GNUNET_CONTAINER_MDLL_insert (neighbour,
3356                                 neighbour->session_head,
3357                                 neighbour->session_tail,
3358                                 queue);
3359   GNUNET_CONTAINER_MDLL_insert (client,
3360                                 tc->details.communicator.session_head,
3361                                 tc->details.communicator.session_tail,
3362                                 queue);
3363   GNUNET_SERVICE_client_continue (tc->client);
3364 }
3365
3366
3367 /**
3368  * Queue to a peer went down.  Process the request.
3369  *
3370  * @param cls the client
3371  * @param dqm the send message that was sent
3372  */
3373 static void
3374 handle_del_queue_message (void *cls,
3375                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
3376 {
3377   struct TransportClient *tc = cls;
3378
3379   if (CT_COMMUNICATOR != tc->type)
3380   {
3381     GNUNET_break (0);
3382     GNUNET_SERVICE_client_drop (tc->client);
3383     return;
3384   }
3385   for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
3386        NULL != session;
3387        session = session->next_client)
3388   {
3389     struct Neighbour *neighbour = session->neighbour;
3390
3391     if ( (dqm->qid != session->qid) ||
3392          (0 != memcmp (&dqm->receiver,
3393                        &neighbour->pid,
3394                        sizeof (struct GNUNET_PeerIdentity))) )
3395       continue;
3396     free_session (session);
3397     GNUNET_SERVICE_client_continue (tc->client);
3398     return;
3399   }
3400   GNUNET_break (0);
3401   GNUNET_SERVICE_client_drop (tc->client);
3402 }
3403
3404
3405 /**
3406  * Message was transmitted.  Process the request.
3407  *
3408  * @param cls the client
3409  * @param sma the send message that was sent
3410  */
3411 static void
3412 handle_send_message_ack (void *cls,
3413                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
3414 {
3415   struct TransportClient *tc = cls;
3416   struct QueueEntry *queue;
3417   
3418   if (CT_COMMUNICATOR != tc->type)
3419   {
3420     GNUNET_break (0);
3421     GNUNET_SERVICE_client_drop (tc->client);
3422     return;
3423   }
3424
3425   /* find our queue entry matching the ACK */
3426   queue = NULL;
3427   for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
3428        NULL != session;
3429        session = session->next_client)
3430   {
3431     if (0 != memcmp (&session->neighbour->pid,
3432                      &sma->receiver,
3433                      sizeof (struct GNUNET_PeerIdentity)))
3434       continue;
3435     for (struct QueueEntry *qe = session->queue_head;
3436          NULL != qe;
3437          qe = qe->next)
3438     {
3439       if (qe->mid != sma->mid)
3440         continue;
3441       queue = qe;
3442       break;
3443     }
3444     break;      
3445   }
3446   if (NULL == queue)
3447   {
3448     /* this should never happen */
3449     GNUNET_break (0); 
3450     GNUNET_SERVICE_client_drop (tc->client);
3451     return;
3452   }
3453   GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
3454                                queue->session->queue_tail,
3455                                queue);
3456   queue->session->queue_length--;
3457   tc->details.communicator.total_queue_length--;
3458   GNUNET_SERVICE_client_continue (tc->client);
3459
3460   /* if applicable, resume transmissions that waited on ACK */
3461   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
3462   {
3463     /* Communicator dropped below threshold, resume all queues */
3464     GNUNET_STATISTICS_update (GST_stats,
3465                               "# Transmission throttled due to communicator queue limit",
3466                               -1,
3467                               GNUNET_NO);
3468     for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
3469          NULL != session;
3470          session = session->next_client)
3471       schedule_transmit_on_queue (session);
3472   }
3473   else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
3474   {
3475     /* queue dropped below threshold; only resume this one queue */
3476     GNUNET_STATISTICS_update (GST_stats,
3477                               "# Transmission throttled due to session queue limit",
3478                               -1,
3479                               GNUNET_NO);    
3480     schedule_transmit_on_queue (queue->session);
3481   }
3482  
3483   /* TODO: we also should react on the status! */
3484   // FIXME: this probably requires queue->pm = s assignment!
3485   // FIXME: react to communicator status about transmission request. We got:
3486   sma->status; // OK success, SYSERR failure
3487
3488   GNUNET_free (queue);
3489 }
3490
3491
3492 /**
3493  * Iterator telling new MONITOR client about all existing
3494  * queues to peers.
3495  *
3496  * @param cls the new `struct TransportClient`
3497  * @param pid a connected peer
3498  * @param value the `struct Neighbour` with more information
3499  * @return #GNUNET_OK (continue to iterate)
3500  */
3501 static int
3502 notify_client_queues (void *cls,
3503                       const struct GNUNET_PeerIdentity *pid,
3504                       void *value)
3505 {
3506   struct TransportClient *tc = cls;
3507   struct Neighbour *neighbour = value;
3508
3509   GNUNET_assert (CT_MONITOR == tc->type);
3510   for (struct GNUNET_ATS_Session *q = neighbour->session_head;
3511        NULL != q;
3512        q = q->next_neighbour)
3513   {
3514     struct MonitorEvent me = {
3515       .rtt = q->rtt,
3516       .cs = q->cs,
3517       .num_msg_pending = q->num_msg_pending,
3518       .num_bytes_pending = q->num_bytes_pending
3519     };
3520
3521     notify_monitor (tc,
3522                     pid,
3523                     q->address,
3524                     q->nt,
3525                     &me);
3526   }
3527   return GNUNET_OK;
3528 }
3529
3530
3531 /**
3532  * Initialize a monitor client.
3533  *
3534  * @param cls the client
3535  * @param start the start message that was sent
3536  */
3537 static void
3538 handle_monitor_start (void *cls,
3539                       const struct GNUNET_TRANSPORT_MonitorStart *start)
3540 {
3541   struct TransportClient *tc = cls;
3542
3543   if (CT_NONE != tc->type)
3544   {
3545     GNUNET_break (0);
3546     GNUNET_SERVICE_client_drop (tc->client);
3547     return;
3548   }
3549   tc->type = CT_MONITOR;
3550   tc->details.monitor.peer = start->peer;
3551   tc->details.monitor.one_shot = ntohl (start->one_shot);
3552   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
3553                                          &notify_client_queues,
3554                                          tc);
3555   GNUNET_SERVICE_client_mark_monitor (tc->client);
3556   GNUNET_SERVICE_client_continue (tc->client);
3557 }
3558
3559
3560 /**
3561  * Signature of a function called by ATS with the current bandwidth
3562  * allocation to be used as determined by ATS.
3563  *
3564  * @param cls closure, NULL
3565  * @param session session this is about
3566  * @param bandwidth_out assigned outbound bandwidth for the connection,
3567  *        0 to signal disconnect
3568  * @param bandwidth_in assigned inbound bandwidth for the connection,
3569  *        0 to signal disconnect
3570  */
3571 static void
3572 ats_allocation_cb (void *cls,
3573                    struct GNUNET_ATS_Session *session,
3574                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
3575                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
3576 {
3577   (void) cls;
3578   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
3579                                          bandwidth_out);
3580   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
3581                                          bandwidth_in);
3582 }
3583
3584
3585 /**
3586  * Find transport client providing communication service
3587  * for the protocol @a prefix.
3588  *
3589  * @param prefix communicator name
3590  * @return NULL if no such transport client is available
3591  */
3592 static struct TransportClient *
3593 lookup_communicator (const char *prefix)
3594 {
3595   for (struct TransportClient *tc = clients_head;
3596        NULL != tc;
3597        tc = tc->next)
3598   {
3599     if (CT_COMMUNICATOR != tc->type)
3600       continue;
3601     if (0 == strcmp (prefix,
3602                      tc->details.communicator.address_prefix))
3603       return tc;
3604   }
3605   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3606               "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
3607               prefix);
3608   return NULL;
3609 }
3610
3611
3612 /**
3613  * Signature of a function called by ATS suggesting transport to
3614  * try connecting with a particular address.
3615  *
3616  * @param cls closure, NULL
3617  * @param pid target peer
3618  * @param address the address to try
3619  */
3620 static void
3621 ats_suggestion_cb (void *cls,
3622                    const struct GNUNET_PeerIdentity *pid,
3623                    const char *address)
3624 {
3625   static uint32_t idgen;
3626   struct TransportClient *tc;
3627   char *prefix;
3628   struct GNUNET_TRANSPORT_CreateQueue *cqm;
3629   struct GNUNET_MQ_Envelope *env;
3630   size_t alen;
3631
3632   (void) cls;
3633   prefix = GNUNET_HELLO_address_to_prefix (address);
3634   if (NULL == prefix)
3635   {
3636     GNUNET_break (0); /* ATS gave invalid address!? */
3637     return;
3638   }
3639   tc = lookup_communicator (prefix);
3640   if (NULL == tc)
3641   {
3642     GNUNET_STATISTICS_update (GST_stats,
3643                               "# ATS suggestions ignored due to missing communicator",
3644                               1,
3645                               GNUNET_NO);    
3646     return;
3647   }
3648   /* forward suggestion for queue creation to communicator */
3649   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3650               "Request #%u for `%s' communicator to create queue to `%s'\n",
3651               (unsigned int) idgen,
3652               prefix,
3653               address);
3654   alen = strlen (address) + 1;
3655   env = GNUNET_MQ_msg_extra (cqm,
3656                              alen,
3657                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
3658   cqm->request_id = htonl (idgen++);
3659   cqm->receiver = *pid;
3660   memcpy (&cqm[1],
3661           address,
3662           alen);
3663   GNUNET_MQ_send (tc->mq,
3664                   env);
3665 }
3666
3667
3668 /**
3669  * Communicator tells us that our request to create a queue "worked", that
3670  * is setting up the queue is now in process.
3671  *
3672  * @param cls the `struct TransportClient`
3673  * @param cqr confirmation message
3674  */ 
3675 static void
3676 handle_queue_create_ok (void *cls,
3677                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
3678 {
3679   struct TransportClient *tc = cls;
3680
3681   if (CT_COMMUNICATOR != tc->type)
3682   {
3683     GNUNET_break (0);
3684     GNUNET_SERVICE_client_drop (tc->client);
3685     return;
3686   }
3687   GNUNET_STATISTICS_update (GST_stats,
3688                             "# ATS suggestions succeeded at communicator",
3689                             1,
3690                             GNUNET_NO);
3691   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3692               "Request #%u for communicator to create queue succeeded\n",
3693               (unsigned int) ntohs (cqr->request_id));
3694   GNUNET_SERVICE_client_continue (tc->client);    
3695 }
3696
3697
3698 /**
3699  * Communicator tells us that our request to create a queue failed. This usually
3700  * indicates that the provided address is simply invalid or that the communicator's
3701  * resources are exhausted.
3702  *
3703  * @param cls the `struct TransportClient`
3704  * @param cqr failure message
3705  */ 
3706 static void
3707 handle_queue_create_fail (void *cls,
3708                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
3709 {
3710   struct TransportClient *tc = cls;
3711
3712   if (CT_COMMUNICATOR != tc->type)
3713   {
3714     GNUNET_break (0);
3715     GNUNET_SERVICE_client_drop (tc->client);
3716     return;
3717   }
3718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3719               "Request #%u for communicator to create queue failed\n",
3720               (unsigned int) ntohs (cqr->request_id));
3721   GNUNET_STATISTICS_update (GST_stats,
3722                             "# ATS suggestions failed in queue creation at communicator",
3723                             1,
3724                             GNUNET_NO);
3725   GNUNET_SERVICE_client_continue (tc->client);    
3726 }
3727
3728
3729 /**
3730  * Free neighbour entry.
3731  *
3732  * @param cls NULL
3733  * @param pid unused
3734  * @param value a `struct Neighbour`
3735  * @return #GNUNET_OK (always)
3736  */
3737 static int
3738 free_neighbour_cb (void *cls,
3739                    const struct GNUNET_PeerIdentity *pid,
3740                    void *value)
3741 {
3742   struct Neighbour *neighbour = value;
3743
3744   (void) cls;
3745   (void) pid;
3746   GNUNET_break (0); // should this ever happen?
3747   free_neighbour (neighbour);
3748
3749   return GNUNET_OK;
3750 }
3751
3752
3753 /**
3754  * Free ephemeral entry.
3755  *
3756  * @param cls NULL
3757  * @param pid unused
3758  * @param value a `struct Neighbour`
3759  * @return #GNUNET_OK (always)
3760  */
3761 static int
3762 free_ephemeral_cb (void *cls,
3763                    const struct GNUNET_PeerIdentity *pid,
3764                    void *value)
3765 {
3766   struct EphemeralCacheEntry *ece = value;
3767
3768   (void) cls;
3769   (void) pid;
3770   free_ephemeral (ece);
3771   return GNUNET_OK;
3772 }
3773
3774
3775 /**
3776  * Function called when the service shuts down.  Unloads our plugins
3777  * and cancels pending validations.
3778  *
3779  * @param cls closure, unused
3780  */
3781 static void
3782 do_shutdown (void *cls)
3783 {
3784   (void) cls;
3785
3786   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
3787                                          &free_neighbour_cb,
3788                                          NULL);
3789   if (NULL != ats)
3790   {
3791     GNUNET_ATS_transport_done (ats);
3792     ats = NULL;
3793   }
3794   if (NULL != peerstore)
3795   {
3796     GNUNET_PEERSTORE_disconnect (peerstore,
3797                                  GNUNET_NO);
3798     peerstore = NULL;
3799   }
3800   if (NULL != GST_stats)
3801   {
3802     GNUNET_STATISTICS_destroy (GST_stats,
3803                                GNUNET_NO);
3804     GST_stats = NULL;
3805   }
3806   if (NULL != GST_my_private_key)
3807   {
3808     GNUNET_free (GST_my_private_key);
3809     GST_my_private_key = NULL;
3810   }
3811   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
3812   neighbours = NULL;
3813   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
3814                                          &free_ephemeral_cb,
3815                                          NULL);
3816   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
3817   ephemeral_map = NULL;
3818   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
3819   ephemeral_heap = NULL;
3820 }
3821
3822
3823 /**
3824  * Initiate transport service.
3825  *
3826  * @param cls closure
3827  * @param c configuration to use
3828  * @param service the initialized service
3829  */
3830 static void
3831 run (void *cls,
3832      const struct GNUNET_CONFIGURATION_Handle *c,
3833      struct GNUNET_SERVICE_Handle *service)
3834 {
3835   (void) cls;
3836   /* setup globals */
3837   GST_cfg = c;
3838   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
3839                                                      GNUNET_YES);
3840   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
3841                                                         GNUNET_YES);
3842   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3843   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
3844   if (NULL == GST_my_private_key)
3845   {
3846     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3847                 _("Transport service is lacking key configuration settings. Exiting.\n"));
3848     GNUNET_SCHEDULER_shutdown ();
3849     return;
3850   }
3851   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
3852                                       &GST_my_identity.public_key);
3853   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
3854              "My identity is `%s'\n",
3855              GNUNET_i2s_full (&GST_my_identity));
3856   GST_stats = GNUNET_STATISTICS_create ("transport",
3857                                         GST_cfg);
3858   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
3859                                  NULL);
3860   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
3861   if (NULL == peerstore)
3862   {
3863     GNUNET_break (0);
3864     GNUNET_SCHEDULER_shutdown ();
3865     return;
3866   }
3867   ats = GNUNET_ATS_transport_init (GST_cfg,
3868                                    &ats_allocation_cb,
3869                                    NULL,
3870                                    &ats_suggestion_cb,
3871                                    NULL);
3872   if (NULL == ats)
3873   {
3874     GNUNET_break (0);
3875     GNUNET_SCHEDULER_shutdown ();
3876     return;
3877   }
3878 }
3879
3880
3881 /**
3882  * Define "main" method using service macro.
3883  */
3884 GNUNET_SERVICE_MAIN
3885 ("transport",
3886  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
3887  &run,
3888  &client_connect_cb,
3889  &client_disconnect_cb,
3890  NULL,
3891  /* communication with core */
3892  GNUNET_MQ_hd_fixed_size (client_start,
3893                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
3894                           struct StartMessage,
3895                           NULL),
3896  GNUNET_MQ_hd_var_size (client_send,
3897                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
3898                         struct OutboundMessage,
3899                         NULL),
3900  /* communication with communicators */
3901  GNUNET_MQ_hd_var_size (communicator_available,
3902                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
3903                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
3904                         NULL),
3905  GNUNET_MQ_hd_var_size (communicator_backchannel,
3906                         GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
3907                         struct GNUNET_TRANSPORT_CommunicatorBackchannel,
3908                         NULL),
3909  GNUNET_MQ_hd_var_size (add_address,
3910                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
3911                         struct GNUNET_TRANSPORT_AddAddressMessage,
3912                         NULL),
3913  GNUNET_MQ_hd_fixed_size (del_address,
3914                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
3915                           struct GNUNET_TRANSPORT_DelAddressMessage,
3916                           NULL),
3917  GNUNET_MQ_hd_var_size (incoming_msg,
3918                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
3919                         struct GNUNET_TRANSPORT_IncomingMessage,
3920                         NULL),
3921  GNUNET_MQ_hd_fixed_size (queue_create_ok,
3922                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
3923                           struct GNUNET_TRANSPORT_CreateQueueResponse,
3924                           NULL),
3925  GNUNET_MQ_hd_fixed_size (queue_create_fail,
3926                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
3927                           struct GNUNET_TRANSPORT_CreateQueueResponse,
3928                           NULL),
3929  GNUNET_MQ_hd_var_size (add_queue_message,
3930                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
3931                         struct GNUNET_TRANSPORT_AddQueueMessage,
3932                         NULL),
3933  GNUNET_MQ_hd_fixed_size (del_queue_message,
3934                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
3935                           struct GNUNET_TRANSPORT_DelQueueMessage,
3936                           NULL),
3937  GNUNET_MQ_hd_fixed_size (send_message_ack,
3938                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
3939                           struct GNUNET_TRANSPORT_SendMessageToAck,
3940                           NULL),
3941  /* communication with monitors */
3942  GNUNET_MQ_hd_fixed_size (monitor_start,
3943                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
3944                           struct GNUNET_TRANSPORT_MonitorStart,
3945                           NULL),
3946  GNUNET_MQ_handler_end ());
3947
3948
3949 /* end of file gnunet-service-transport.c */