working on crazy fragmentation logic
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement:
36  * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
37  *
38  * Easy:
39  * - use ATS bandwidth allocation callback and schedule transmissions!
40  *
41  * Plan:
42  * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
43  *
44  * Later:
45  * - change transport-core API to provide proper flow control in both
46  *   directions, allow multiple messages per peer simultaneously (tag
47  *   confirmations with unique message ID), and replace quota-out with
48  *   proper flow control;
49  *
50  * Design realizations / discussion:
51  * - communicators do flow control by calling MQ "notify sent"
52  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
53  *   or explicitly via background channel FC ACKs.  As long as the
54  *   channel is not full, they may 'notify sent' even if the other
55  *   peer has not yet confirmed receipt. The other peer confirming
56  *   is _only_ for FC, not for more reliable transmission; reliable
57  *   transmission (i.e. of fragments) is left to _transport_.
58  * - ACKs sent back in uni-directional communicators are done via
59  *   the background channel API; here transport _may_ initially
60  *   broadcast (with bounded # hops) if no path is known;
61  * - transport should _integrate_ DV-routing and build a view of
62  *   the network; then background channel traffic can be
63  *   routed via DV as well as explicit "DV" traffic.
64  * - background channel is also used for ACKs and NAT traversal support
65  * - transport service is responsible for AEAD'ing the background
66  *   channel, timestamps and monotonic time are used against replay
67  *   of old messages -> peerstore needs to be supplied with
68  *   "latest timestamps seen" data
69  * - if transport implements DV, we likely need a 3rd peermap
70  *   in addition to ephemerals and (direct) neighbours
71  *   => in this data structure, we should track ATS metrics (distance, RTT, etc.)
72  *   as well as latest timestamps seen, goodput, fragments for transmission, etc.
73  *   ==> check if stuff needs to be moved out of "Neighbour"
74  * - transport should encapsualte core-level messages and do its
75  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
76  *   for retransmission
77  */
78 #include "platform.h"
79 #include "gnunet_util_lib.h"
80 #include "gnunet_statistics_service.h"
81 #include "gnunet_transport_monitor_service.h"
82 #include "gnunet_peerstore_service.h"
83 #include "gnunet_hello_lib.h"
84 #include "gnunet_ats_transport_service.h"
85 #include "transport.h"
86
87
88 /**
89  * What is the size we assume for a read operation in the
90  * absence of an MTU for the purpose of flow control?
91  */
92 #define IN_PACKET_SIZE_WITHOUT_MTU 128
93
94 /**
95  * If a queue delays the next message by more than this number
96  * of seconds we log a warning. Note: this is for testing,
97  * the value chosen here might be too aggressively low!
98  */
99 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
100
101 /**
102  * How many messages can we have pending for a given client process
103  * before we start to drop incoming messages?  We typically should
104  * have only one client and so this would be the primary buffer for
105  * messages, so the number should be chosen rather generously.
106  *
107  * The expectation here is that most of the time the queue is large
108  * enough so that a drop is virtually never required.  Note that
109  * this value must be about as large as 'TOTAL_MSGS' in the
110  * 'test_transport_api_reliability.c', otherwise that testcase may
111  * fail.
112  */
113 #define MAX_PENDING (128 * 1024)
114
115
116 GNUNET_NETWORK_STRUCT_BEGIN
117
118 /**
119  * Outer layer of an encapsulated backchannel message.
120  */
121 struct TransportBackchannelEncapsulationMessage
122 {
123   /**
124    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
125    */
126   struct GNUNET_MessageHeader header;
127
128   /**
129    * Distance the backchannel message has traveled, to be updated at
130    * each hop.  Used to bound the number of hops in case a backchannel
131    * message is broadcast and thus travels without routing
132    * information (during initial backchannel discovery).
133    */
134   uint32_t distance;
135
136   /**
137    * Target's peer identity (as backchannels may be transmitted
138    * indirectly, or even be broadcast).
139    */
140   struct GNUNET_PeerIdentity target;
141
142   /**
143    * Ephemeral key setup by the sender for @e target, used
144    * to encrypt the payload.
145    */
146   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
147
148   /**
149    * HMAC over the ciphertext of the encrypted, variable-size
150    * body that follows.  Verified via DH of @e target and
151    * @e ephemeral_key
152    */
153   struct GNUNET_HashCode hmac;
154
155   /* Followed by encrypted, variable-size payload */
156 };
157
158
159 /**
160  * Message by which a peer confirms that it is using an
161  * ephemeral key.
162  */
163 struct EphemeralConfirmation
164 {
165
166   /**
167    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
168    */
169   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
170
171   /**
172    * How long is this signature over the ephemeral key
173    * valid?
174    */
175   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
176
177   /**
178    * Ephemeral key setup by the sender for @e target, used
179    * to encrypt the payload.
180    */
181   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
182
183 };
184
185
186 /**
187  * Plaintext of the variable-size payload that is encrypted
188  * within a `struct TransportBackchannelEncapsulationMessage`
189  */
190 struct TransportBackchannelRequestPayload
191 {
192
193   /**
194    * Sender's peer identity.
195    */
196   struct GNUNET_PeerIdentity sender;
197
198   /**
199    * Signature of the sender over an
200    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
201    */
202   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
203
204   /**
205    * How long is this signature over the ephemeral key
206    * valid?
207    */
208   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
209
210   /**
211    * Current monotonic time of the sending transport service.  Used to
212    * detect replayed messages.  Note that the receiver should remember
213    * a list of the recently seen timestamps and only reject messages
214    * if the timestamp is in the list, or the list is "full" and the
215    * timestamp is smaller than the lowest in the list.  This list of
216    * timestamps per peer should be persisted to guard against replays
217    * after restarts.
218    */
219   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
220
221   /* Followed by a `struct GNUNET_MessageHeader` with a message
222      for a communicator */
223
224   /* Followed by a 0-termianted string specifying the name of
225      the communicator which is to receive the message */
226
227 };
228
229
230 /**
231  * Outer layer of an encapsulated unfragmented application message sent
232  * over an unreliable channel.
233  */
234 struct TransportReliabilityBox
235 {
236   /**
237    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
238    */
239   struct GNUNET_MessageHeader header;
240
241   /**
242    * Number of messages still to be sent before a commulative
243    * ACK is requested.  Zero if an ACK is requested immediately.
244    * In NBO.  Note that the receiver may send the ACK faster
245    * if it believes that is reasonable.
246    */
247   uint32_t ack_countdown GNUNET_PACKED;
248
249   /**
250    * Unique ID of the message used for signalling receipt of
251    * messages sent over possibly unreliable channels.  Should
252    * be a random.
253    */
254   struct GNUNET_ShortHashCode msg_uuid;
255 };
256
257
258 /**
259  * Confirmation that the receiver got a
260  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
261  * confirmation may be transmitted over a completely different queue,
262  * so ACKs are identified by a combination of PID of sender and
263  * message UUID, without the queue playing any role!
264  */
265 struct TransportReliabilityAckMessage
266 {
267   /**
268    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
269    */
270   struct GNUNET_MessageHeader header;
271
272   /**
273    * Reserved. Zero.
274    */
275   uint32_t reserved GNUNET_PACKED;
276
277   /**
278    * How long was the ACK delayed relative to the average time of
279    * receipt of the messages being acknowledged?  Used to calculate
280    * the average RTT by taking the receipt time of the ack minus the
281    * average transmission time of the sender minus this value.
282    */
283   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
284
285   /* followed by any number of `struct GNUNET_ShortHashCode`
286      messages providing ACKs */
287 };
288
289
290 /**
291  * Outer layer of an encapsulated fragmented application message.
292  */
293 struct TransportFragmentBox
294 {
295   /**
296    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
297    */
298   struct GNUNET_MessageHeader header;
299
300   /**
301    * Unique ID of this fragment (and fragment transmission!). Will
302    * change even if a fragement is retransmitted to make each
303    * transmission attempt unique! Should be incremented by one for
304    * each fragment transmission. If a client receives a duplicate
305    * fragment (same @e frag_off), it must send
306    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
307    */
308   uint32_t frag_uuid GNUNET_PACKED;
309
310   /**
311    * Original message ID for of the message that all the1
312    * fragments belong to.  Must be the same for all fragments.
313    */ 
314   struct GNUNET_ShortHashCode msg_uuid;
315
316   /**
317    * Offset of this fragment in the overall message.
318    */ 
319   uint16_t frag_off GNUNET_PACKED;
320
321   /**
322    * Total size of the message that is being fragmented.
323    */ 
324   uint16_t msg_size GNUNET_PACKED;
325
326 };
327
328
329 /**
330  * Outer layer of an fragmented application message sent over a queue
331  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
332  * received, the receiver has two RTTs or 64 further fragments with
333  * the same basic message time to send an acknowledgement, possibly
334  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
335  * sent immediately once all fragments were sent.
336  */
337 struct TransportFragmentAckMessage
338 {
339   /**
340    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
341    */
342   struct GNUNET_MessageHeader header;
343
344   /**
345    * Unique ID of the lowest fragment UUID being acknowledged.
346    */
347   uint32_t frag_uuid GNUNET_PACKED;
348
349   /**
350    * Bitfield of up to 64 additional fragments following the
351    * @e msg_uuid being acknowledged by this message.
352    */ 
353   uint64_t extra_acks GNUNET_PACKED;
354
355   /**
356    * Original message ID for of the message that all the
357    * fragments belong to.
358    */ 
359   struct GNUNET_ShortHashCode msg_uuid;
360
361   /**
362    * How long was the ACK delayed relative to the average time of
363    * receipt of the fragments being acknowledged?  Used to calculate
364    * the average RTT by taking the receipt time of the ack minus the
365    * average transmission time of the sender minus this value.
366    */
367   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
368 };
369
370
371 /**
372  * Internal message used by transport for distance vector learning.
373  * If @e num_hops does not exceed the threshold, peers should append
374  * themselves to the peer list and flood the message (possibly only
375  * to a subset of their neighbours to limit discoverability of the
376  * network topology).  To the extend that the @e bidirectional bits
377  * are set, peers may learn the inverse paths even if they did not
378  * initiate. 
379  *
380  * Unless received on a bidirectional queue and @e num_hops just
381  * zero, peers that can forward to the initator should always try to
382  * forward to the initiator.
383  */
384 struct TransportDVLearn
385 {
386   /**
387    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
388    */
389   struct GNUNET_MessageHeader header;
390
391   /**
392    * Number of hops this messages has travelled, in NBO. Zero if
393    * sent by initiator.
394    */
395   uint16_t num_hops GNUNET_PACKED;
396
397   /**
398    * Bitmask of the last 16 hops indicating whether they are confirmed
399    * available (without DV) in both directions or not, in NBO.  Used
400    * to possibly instantly learn a path in both directions.  Each peer
401    * should shift this value by one to the left, and then set the
402    * lowest bit IF the current sender can be reached from it (without
403    * DV routing).  
404    */ 
405   uint16_t bidirectional GNUNET_PACKED;
406
407   /**
408    * Peers receiving this message and delaying forwarding to other
409    * peers for any reason should increment this value such as to 
410    * enable the origin to determine the actual network-only delay
411    * in addition to the real-time delay (assuming the message loops
412    * back to the origin).
413    */
414   struct GNUNET_TIME_Relative cummulative_non_network_delay;
415
416   /**
417    * Identity of the peer that started this learning activity.
418    */
419   struct GNUNET_PeerIdentity initiator;
420   
421   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
422      excluding the initiator of the DV trace; the last entry is the
423      current sender; the current peer must not be included except if
424      it is the sender. */
425   
426 };
427
428
429 /**
430  * Outer layer of an encapsulated message send over multiple hops.
431  * The path given only includes the identities of the subsequent
432  * peers, i.e. it will be empty if we are the receiver. Each
433  * forwarding peer should scan the list from the end, and if it can,
434  * forward to the respective peer. The list should then be shortened
435  * by all the entries up to and including that peer.  Each hop should
436  * also increment @e total_hops to allow the receiver to get a precise
437  * estimate on the number of hops the message travelled.  Senders must
438  * provide a learned path that thus should work, but intermediaries
439  * know of a shortcut, they are allowed to send the message via that
440  * shortcut.
441  *
442  * If a peer finds itself still on the list, it must drop the message.
443  */
444 struct TransportDVBox
445 {
446   /**
447    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
448    */
449   struct GNUNET_MessageHeader header;
450
451   /**
452    * Number of total hops this messages travelled. In NBO.
453    * @e origin sets this to zero, to be incremented at
454    * each hop.
455    */
456   uint16_t total_hops GNUNET_PACKED;
457
458   /**
459    * Number of hops this messages includes. In NBO.
460    */
461   uint16_t num_hops GNUNET_PACKED;
462   
463   /**
464    * Identity of the peer that originated the message.
465    */
466   struct GNUNET_PeerIdentity origin;
467
468   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
469      excluding the @e origin and the current peer, the last must be
470      the ultimate target; if @e num_hops is zero, the receiver of this
471      message is the ultimate target. */
472
473   /* Followed by the actual message, which itself may be
474      another box, but not a DV_LEARN or DV_BOX message! */
475 };
476
477
478 GNUNET_NETWORK_STRUCT_END
479
480
481
482 /**
483  * What type of client is the `struct TransportClient` about?
484  */
485 enum ClientType
486 {
487   /**
488    * We do not know yet (client is fresh).
489    */
490   CT_NONE = 0,
491
492   /**
493    * Is the CORE service, we need to forward traffic to it.
494    */
495   CT_CORE = 1,
496
497   /**
498    * It is a monitor, forward monitor data.
499    */
500   CT_MONITOR = 2,
501
502   /**
503    * It is a communicator, use for communication.
504    */
505   CT_COMMUNICATOR = 3
506 };
507
508
509 /**
510  * Entry in our cache of ephemeral keys we currently use.
511  */
512 struct EphemeralCacheEntry
513 {
514
515   /**
516    * Target's peer identity (we don't re-use ephemerals
517    * to limit linkability of messages).
518    */
519   struct GNUNET_PeerIdentity target;
520
521   /**
522    * Signature affirming @e ephemeral_key of type
523    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
524    */
525   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
526
527   /**
528    * How long is @e sender_sig valid
529    */
530   struct GNUNET_TIME_Absolute ephemeral_validity;
531
532   /**
533    * Our ephemeral key.
534    */
535   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
536
537   /**
538    * Node in the ephemeral cache for this entry.
539    * Used for expiration.
540    */
541   struct GNUNET_CONTAINER_HeapNode *hn;
542 };
543
544
545 /**
546  * Client connected to the transport service.
547  */
548 struct TransportClient;
549
550
551 /**
552  * A neighbour that at least one communicator is connected to.
553  */
554 struct Neighbour;
555
556
557 /**
558  * An ATS session is a message queue provided by a communicator
559  * via which we can reach a particular neighbour.
560  */
561 struct GNUNET_ATS_Session
562 {
563   /**
564    * Kept in a MDLL.
565    */
566   struct GNUNET_ATS_Session *next_neighbour;
567
568   /**
569    * Kept in a MDLL.
570    */
571   struct GNUNET_ATS_Session *prev_neighbour;
572
573   /**
574    * Kept in a MDLL.
575    */
576   struct GNUNET_ATS_Session *prev_client;
577
578   /**
579    * Kept in a MDLL.
580    */
581   struct GNUNET_ATS_Session *next_client;
582
583   /**
584    * Which neighbour is this ATS session for?
585    */
586   struct Neighbour *neighbour;
587
588   /**
589    * Which communicator offers this ATS session?
590    */
591   struct TransportClient *tc;
592
593   /**
594    * Address served by the ATS session.
595    */
596   const char *address;
597
598   /**
599    * Handle by which we inform ATS about this queue.
600    */
601   struct GNUNET_ATS_SessionRecord *sr;
602
603   /**
604    * Task scheduled for the time when this queue can (likely) transmit the
605    * next message. Still needs to check with the @e tracker_out to be sure.
606    */ 
607   struct GNUNET_SCHEDULER_Task *transmit_task;
608   
609   /**
610    * Our current RTT estimate for this ATS session.
611    */
612   struct GNUNET_TIME_Relative rtt;
613
614   /**
615    * Unique identifier of this ATS session with the communicator.
616    */
617   uint32_t qid;
618
619   /**
620    * Maximum transmission unit supported by this ATS session.
621    */
622   uint32_t mtu;
623
624   /**
625    * Distance to the target of this ATS session.
626    */
627   uint32_t distance;
628
629   /**
630    * Network type offered by this ATS session.
631    */
632   enum GNUNET_NetworkType nt;
633
634   /**
635    * Connection status for this ATS session.
636    */
637   enum GNUNET_TRANSPORT_ConnectionStatus cs;
638
639   /**
640    * Messages pending.
641    */
642   uint32_t num_msg_pending;
643
644   /**
645    * Bytes pending.
646    */
647   uint32_t num_bytes_pending;
648
649   /**
650    * How much outbound bandwidth do we have available for this session?
651    */
652   struct GNUNET_BANDWIDTH_Tracker tracker_out;
653
654   /**
655    * How much inbound bandwidth do we have available for this session?
656    */
657   struct GNUNET_BANDWIDTH_Tracker tracker_in;
658 };
659
660
661 /**
662  * A neighbour that at least one communicator is connected to.
663  */
664 struct Neighbour
665 {
666
667   /**
668    * Which peer is this about?
669    */
670   struct GNUNET_PeerIdentity pid;
671
672   /**
673    * Head of list of messages pending for this neighbour.
674    */
675   struct PendingMessage *pending_msg_head;
676
677   /**
678    * Tail of list of messages pending for this neighbour.
679    */
680   struct PendingMessage *pending_msg_tail;
681
682   /**
683    * Head of DLL of ATS sessions to this peer.
684    */
685   struct GNUNET_ATS_Session *session_head;
686
687   /**
688    * Tail of DLL of ATS sessions to this peer.
689    */
690   struct GNUNET_ATS_Session *session_tail;
691
692   /**
693    * Task run to cleanup pending messages that have exceeded their timeout.
694    */  
695   struct GNUNET_SCHEDULER_Task *timeout_task;
696
697   /**
698    * Quota at which CORE is allowed to transmit to this peer
699    * according to ATS.
700    *
701    * FIXME: not yet used, tricky to get right given multiple queues!
702    *        (=> Idea: let ATS set a quota per queue and we add them up here?)
703    * FIXME: how do we set this value initially when we tell CORE?
704    *    Options: start at a minimum value or at literally zero (before ATS?)
705    *         (=> Current thought: clean would be zero!)
706    */
707   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
708
709   /**
710    * What is the earliest timeout of any message in @e pending_msg_tail?
711    */ 
712   struct GNUNET_TIME_Absolute earliest_timeout;
713   
714 };
715
716
717 /**
718  * Types of different pending messages.
719  */ 
720 enum PendingMessageType
721 {
722
723   /**
724    * Ordinary message received from the CORE service.
725    */
726   PMT_CORE = 0,
727
728   /**
729    * Fragment box.
730    */
731   PMT_FRAGMENT_BOX = 1,
732
733   /**
734    * Reliability box.
735    */
736   PMT_RELIABILITY_BOX = 2,
737
738   /**
739    * Any type of acknowledgement.
740    */
741   PMT_ACKNOWLEDGEMENT = 3
742
743   
744 };
745
746
747 /**
748  * Transmission request that is awaiting delivery.  The original
749  * transmission requests from CORE may be too big for some queues.
750  * In this case, a *tree* of fragments is created.  At each
751  * level of the tree, fragments are kept in a DLL ordered by which
752  * fragment should be sent next (at the head).  The tree is searched
753  * top-down, with the original message at the root.
754  *
755  * To select a node for transmission, first it is checked if the
756  * current node's message fits with the MTU.  If it does not, we
757  * either calculate the next fragment (based on @e frag_off) from the
758  * current node, or, if all fragments have already been created,
759  * descend to the @e head_frag.  Even though the node was already
760  * fragmented, the fragment may be too big if the fragment was 
761  * generated for a queue with a larger MTU. In this case, the node
762  * may be fragmented again, thus creating a tree.
763  *
764  * When acknowledgements for fragments are received, the tree
765  * must be pruned, removing those parts that were already 
766  * acknowledged.  When fragments are sent over a reliable 
767  * channel, they can be immediately removed.
768  *
769  * If a message is ever fragmented, then the original "full" message
770  * is never again transmitted (even if it fits below the MTU), and
771  * only (remaining) fragments are sent.
772  */
773 struct PendingMessage
774 {
775   /**
776    * Kept in a MDLL of messages for this @a target.
777    */
778   struct PendingMessage *next_neighbour;
779
780   /**
781    * Kept in a MDLL of messages for this @a target.
782    */
783   struct PendingMessage *prev_neighbour;
784
785   /**
786    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
787    */
788   struct PendingMessage *next_client;
789   
790   /**
791    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
792    */
793   struct PendingMessage *prev_client;
794
795   /**
796    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
797    */
798   struct PendingMessage *next_frag;
799   
800   /**
801    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
802    */
803   struct PendingMessage *prev_frag;
804     
805   /**
806    * Target of the request.
807    */
808   struct Neighbour *target;
809       
810   /**
811    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
812    */
813   struct TransportClient *client;
814   
815   /**
816    * Head of a MDLL of fragments created for this core message.
817    */
818   struct PendingMessage *head_frag;
819   
820   /**
821    * Tail of a MDLL of fragments created for this core message.
822    */
823   struct PendingMessage *tail_frag;
824
825   /**
826    * Our parent in the fragmentation tree.
827    */
828   struct PendingMessage *frag_parent;
829     
830   /**
831    * At what time should we give up on the transmission (and no longer retry)?
832    */
833   struct GNUNET_TIME_Absolute timeout;
834
835   /**
836    * What is the earliest time for us to retry transmission of this message?
837    */
838   struct GNUNET_TIME_Absolute next_attempt;
839
840   /**
841    * UUID to use for this message (used for reassembly of fragments, only
842    * initialized if @e msg_uuid_set is #GNUNET_YES).
843    */
844   struct GNUNET_ShortHashCode msg_uuid;
845
846   /**
847    * Counter incremented per generated fragment.
848    */ 
849   uint32_t frag_uuidgen;
850   
851   /**
852    * Type of the pending message.
853    */
854   enum PendingMessageType pmt;
855
856   /**
857    * Size of the original message.
858    */
859   uint16_t bytes_msg;
860
861   /**
862    * Offset at which we should generate the next fragment.
863    */ 
864   uint16_t frag_off;
865
866   /**
867    * #GNUNET_YES once @e msg_uuid was initialized
868    */
869   int16_t msg_uuid_set;
870   
871   /* Followed by @e bytes_msg to transmit */
872 };
873
874
875 /**
876  * One of the addresses of this peer.
877  */
878 struct AddressListEntry
879 {
880
881   /**
882    * Kept in a DLL.
883    */
884   struct AddressListEntry *next;
885
886   /**
887    * Kept in a DLL.
888    */
889   struct AddressListEntry *prev;
890
891   /**
892    * Which communicator provides this address?
893    */
894   struct TransportClient *tc;
895
896   /**
897    * The actual address.
898    */
899   const char *address;
900
901   /**
902    * Current context for storing this address in the peerstore.
903    */
904   struct GNUNET_PEERSTORE_StoreContext *sc;
905
906   /**
907    * Task to periodically do @e st operation.
908    */
909   struct GNUNET_SCHEDULER_Task *st;
910
911   /**
912    * What is a typical lifetime the communicator expects this
913    * address to have? (Always from now.)
914    */
915   struct GNUNET_TIME_Relative expiration;
916
917   /**
918    * Address identifier used by the communicator.
919    */
920   uint32_t aid;
921
922   /**
923    * Network type offered by this address.
924    */
925   enum GNUNET_NetworkType nt;
926
927 };
928
929
930 /**
931  * Client connected to the transport service.
932  */
933 struct TransportClient
934 {
935
936   /**
937    * Kept in a DLL.
938    */
939   struct TransportClient *next;
940
941   /**
942    * Kept in a DLL.
943    */
944   struct TransportClient *prev;
945
946   /**
947    * Handle to the client.
948    */
949   struct GNUNET_SERVICE_Client *client;
950
951   /**
952    * Message queue to the client.
953    */
954   struct GNUNET_MQ_Handle *mq;
955
956   /**
957    * What type of client is this?
958    */
959   enum ClientType type;
960
961   union
962   {
963
964     /**
965      * Information for @e type #CT_CORE.
966      */
967     struct {
968
969       /**
970        * Head of list of messages pending for this client, sorted by 
971        * transmission time ("next_attempt" + possibly internal prioritization).
972        */
973       struct PendingMessage *pending_msg_head;
974
975       /**
976        * Tail of list of messages pending for this client.
977        */
978       struct PendingMessage *pending_msg_tail;
979
980     } core;
981
982     /**
983      * Information for @e type #CT_MONITOR.
984      */
985     struct {
986
987       /**
988        * Peer identity to monitor the addresses of.
989        * Zero to monitor all neighbours.  Valid if
990        * @e type is #CT_MONITOR.
991        */
992       struct GNUNET_PeerIdentity peer;
993
994       /**
995        * Is this a one-shot monitor?
996        */
997       int one_shot;
998
999     } monitor;
1000
1001
1002     /**
1003      * Information for @e type #CT_COMMUNICATOR.
1004      */
1005     struct {
1006       /**
1007        * If @e type is #CT_COMMUNICATOR, this communicator
1008        * supports communicating using these addresses.
1009        */
1010       char *address_prefix;
1011
1012       /**
1013        * Head of DLL of queues offered by this communicator.
1014        */
1015       struct GNUNET_ATS_Session *session_head;
1016
1017       /**
1018        * Tail of DLL of queues offered by this communicator.
1019        */
1020       struct GNUNET_ATS_Session *session_tail;
1021
1022       /**
1023        * Head of list of the addresses of this peer offered by this communicator.
1024        */
1025       struct AddressListEntry *addr_head;
1026
1027       /**
1028        * Tail of list of the addresses of this peer offered by this communicator.
1029        */
1030       struct AddressListEntry *addr_tail;
1031
1032       /**
1033        * Characteristics of this communicator.
1034        */
1035       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1036
1037     } communicator;
1038
1039   } details;
1040
1041 };
1042
1043
1044 /**
1045  * Head of linked list of all clients to this service.
1046  */
1047 static struct TransportClient *clients_head;
1048
1049 /**
1050  * Tail of linked list of all clients to this service.
1051  */
1052 static struct TransportClient *clients_tail;
1053
1054 /**
1055  * Statistics handle.
1056  */
1057 static struct GNUNET_STATISTICS_Handle *GST_stats;
1058
1059 /**
1060  * Configuration handle.
1061  */
1062 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1063
1064 /**
1065  * Our public key.
1066  */
1067 static struct GNUNET_PeerIdentity GST_my_identity;
1068
1069 /**
1070  * Our private key.
1071  */
1072 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1073
1074 /**
1075  * Map from PIDs to `struct Neighbour` entries.  A peer is
1076  * a neighbour if we have an MQ to it from some communicator.
1077  */
1078 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1079
1080 /**
1081  * Database for peer's HELLOs.
1082  */
1083 static struct GNUNET_PEERSTORE_Handle *peerstore;
1084
1085 /**
1086  * Heap sorting `struct EphemeralCacheEntry` by their
1087  * key/signature validity.
1088  */
1089 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1090
1091 /**
1092  * Hash map for looking up `struct EphemeralCacheEntry`s
1093  * by peer identity. (We may have ephemerals in our
1094  * cache for which we do not have a neighbour entry,
1095  * and similar many neighbours may not need ephemerals,
1096  * so we use a second map.)
1097  */
1098 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1099
1100 /**
1101  * Our connection to ATS for allocation and bootstrapping.
1102  */
1103 static struct GNUNET_ATS_TransportHandle *ats;
1104
1105
1106 /**
1107  * Free cached ephemeral key.
1108  *
1109  * @param ece cached signature to free
1110  */
1111 static void
1112 free_ephemeral (struct EphemeralCacheEntry *ece)
1113 {
1114   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1115                                         &ece->target,
1116                                         ece);
1117   GNUNET_CONTAINER_heap_remove_node (ece->hn);
1118   GNUNET_free (ece);
1119 }
1120
1121
1122 /**
1123  * Lookup neighbour record for peer @a pid.
1124  *
1125  * @param pid neighbour to look for
1126  * @return NULL if we do not have this peer as a neighbour
1127  */
1128 static struct Neighbour *
1129 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1130 {
1131   return GNUNET_CONTAINER_multipeermap_get (neighbours,
1132                                             pid);
1133 }
1134
1135
1136 /**
1137  * Details about what to notify monitors about.
1138  */
1139 struct MonitorEvent
1140 {
1141   /**
1142    * @deprecated To be discussed if we keep these...
1143    */
1144   struct GNUNET_TIME_Absolute last_validation;
1145   struct GNUNET_TIME_Absolute valid_until;
1146   struct GNUNET_TIME_Absolute next_validation;
1147
1148   /**
1149    * Current round-trip time estimate.
1150    */
1151   struct GNUNET_TIME_Relative rtt;
1152
1153   /**
1154    * Connection status.
1155    */
1156   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1157
1158   /**
1159    * Messages pending.
1160    */
1161   uint32_t num_msg_pending;
1162
1163   /**
1164    * Bytes pending.
1165    */
1166   uint32_t num_bytes_pending;
1167
1168
1169 };
1170
1171
1172 /**
1173  * Notify monitor @a tc about an event.  That @a tc
1174  * cares about the event has already been checked.
1175  *
1176  * Send @a tc information in @a me about a @a peer's status with
1177  * respect to some @a address to all monitors that care.
1178  *
1179  * @param tc monitor to inform
1180  * @param peer peer the information is about
1181  * @param address address the information is about
1182  * @param nt network type associated with @a address
1183  * @param me detailed information to transmit
1184  */
1185 static void
1186 notify_monitor (struct TransportClient *tc,
1187                 const struct GNUNET_PeerIdentity *peer,
1188                 const char *address,
1189                 enum GNUNET_NetworkType nt,
1190                 const struct MonitorEvent *me)
1191 {
1192   struct GNUNET_MQ_Envelope *env;
1193   struct GNUNET_TRANSPORT_MonitorData *md;
1194   size_t addr_len = strlen (address) + 1;
1195
1196   env = GNUNET_MQ_msg_extra (md,
1197                              addr_len,
1198                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1199   md->nt = htonl ((uint32_t) nt);
1200   md->peer = *peer;
1201   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1202   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1203   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1204   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1205   md->cs = htonl ((uint32_t) me->cs);
1206   md->num_msg_pending = htonl (me->num_msg_pending);
1207   md->num_bytes_pending = htonl (me->num_bytes_pending);
1208   memcpy (&md[1],
1209           address,
1210           addr_len);
1211   GNUNET_MQ_send (tc->mq,
1212                   env);
1213 }
1214
1215
1216 /**
1217  * Send information in @a me about a @a peer's status with respect
1218  * to some @a address to all monitors that care.
1219  *
1220  * @param peer peer the information is about
1221  * @param address address the information is about
1222  * @param nt network type associated with @a address
1223  * @param me detailed information to transmit
1224  */
1225 static void
1226 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1227                  const char *address,
1228                  enum GNUNET_NetworkType nt,
1229                  const struct MonitorEvent *me)
1230 {
1231   static struct GNUNET_PeerIdentity zero;
1232
1233   for (struct TransportClient *tc = clients_head;
1234        NULL != tc;
1235        tc = tc->next)
1236   {
1237     if (CT_MONITOR != tc->type)
1238       continue;
1239     if (tc->details.monitor.one_shot)
1240       continue;
1241     if ( (0 != memcmp (&tc->details.monitor.peer,
1242                        &zero,
1243                        sizeof (zero))) &&
1244          (0 != memcmp (&tc->details.monitor.peer,
1245                        peer,
1246                        sizeof (*peer))) )
1247       continue;
1248     notify_monitor (tc,
1249                     peer,
1250                     address,
1251                     nt,
1252                     me);
1253   }
1254 }
1255
1256
1257 /**
1258  * Called whenever a client connects.  Allocates our
1259  * data structures associated with that client.
1260  *
1261  * @param cls closure, NULL
1262  * @param client identification of the client
1263  * @param mq message queue for the client
1264  * @return our `struct TransportClient`
1265  */
1266 static void *
1267 client_connect_cb (void *cls,
1268                    struct GNUNET_SERVICE_Client *client,
1269                    struct GNUNET_MQ_Handle *mq)
1270 {
1271   struct TransportClient *tc;
1272
1273   tc = GNUNET_new (struct TransportClient);
1274   tc->client = client;
1275   tc->mq = mq;
1276   GNUNET_CONTAINER_DLL_insert (clients_head,
1277                                clients_tail,
1278                                tc);
1279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1280               "Client %p connected\n",
1281               tc);
1282   return tc;
1283 }
1284
1285
1286 /**
1287  * Release memory used by @a neighbour.
1288  *
1289  * @param neighbour neighbour entry to free
1290  */
1291 static void
1292 free_neighbour (struct Neighbour *neighbour)
1293 {
1294   GNUNET_assert (NULL == neighbour->session_head);
1295   GNUNET_assert (GNUNET_YES ==
1296                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
1297                                                        &neighbour->pid,
1298                                                        neighbour));
1299   if (NULL != neighbour->timeout_task)
1300     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1301   GNUNET_free (neighbour);
1302 }
1303
1304
1305 /**
1306  * Send message to CORE clients that we lost a connection.
1307  *
1308  * @param tc client to inform (must be CORE client)
1309  * @param pid peer the connection is for
1310  * @param quota_out current quota for the peer
1311  */
1312 static void
1313 core_send_connect_info (struct TransportClient *tc,
1314                         const struct GNUNET_PeerIdentity *pid,
1315                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1316 {
1317   struct GNUNET_MQ_Envelope *env;
1318   struct ConnectInfoMessage *cim;
1319
1320   GNUNET_assert (CT_CORE == tc->type);
1321   env = GNUNET_MQ_msg (cim,
1322                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1323   cim->quota_out = quota_out;
1324   cim->id = *pid;
1325   GNUNET_MQ_send (tc->mq,
1326                   env);
1327 }
1328
1329
1330 /**
1331  * Send message to CORE clients that we gained a connection
1332  *
1333  * @param pid peer the queue was for
1334  * @param quota_out current quota for the peer
1335  */
1336 static void
1337 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1338                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1339 {
1340   for (struct TransportClient *tc = clients_head;
1341        NULL != tc;
1342        tc = tc->next)
1343   {
1344     if (CT_CORE != tc->type)
1345       continue;
1346     core_send_connect_info (tc,
1347                             pid,
1348                             quota_out);
1349   }
1350 }
1351
1352
1353 /**
1354  * Send message to CORE clients that we lost a connection.
1355  *
1356  * @param pid peer the connection was for
1357  */
1358 static void
1359 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1360 {
1361   for (struct TransportClient *tc = clients_head;
1362        NULL != tc;
1363        tc = tc->next)
1364   {
1365     struct GNUNET_MQ_Envelope *env;
1366     struct DisconnectInfoMessage *dim;
1367
1368     if (CT_CORE != tc->type)
1369       continue;
1370     env = GNUNET_MQ_msg (dim,
1371                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1372     dim->peer = *pid;
1373     GNUNET_MQ_send (tc->mq,
1374                     env);
1375   }
1376 }
1377
1378
1379 /**
1380  * Free @a queue.
1381  *
1382  * @param queue the queue to free
1383  */
1384 static void
1385 free_queue (struct GNUNET_ATS_Session *queue)
1386 {
1387   struct Neighbour *neighbour = queue->neighbour;
1388   struct TransportClient *tc = queue->tc;
1389   struct MonitorEvent me = {
1390     .cs = GNUNET_TRANSPORT_CS_DOWN,
1391     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
1392   };
1393
1394   if (NULL != queue->transmit_task)
1395   {
1396     GNUNET_SCHEDULER_cancel (queue->transmit_task);
1397     queue->transmit_task = NULL;
1398   }
1399   GNUNET_CONTAINER_MDLL_remove (neighbour,
1400                                 neighbour->session_head,
1401                                 neighbour->session_tail,
1402                                 queue);
1403   GNUNET_CONTAINER_MDLL_remove (client,
1404                                 tc->details.communicator.session_head,
1405                                 tc->details.communicator.session_tail,
1406                                 queue);  
1407   notify_monitors (&neighbour->pid,
1408                    queue->address,
1409                    queue->nt,
1410                    &me);
1411   GNUNET_ATS_session_del (queue->sr);
1412   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
1413   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
1414   GNUNET_free (queue);
1415   if (NULL == neighbour->session_head)
1416   {
1417     cores_send_disconnect_info (&neighbour->pid);
1418     free_neighbour (neighbour);
1419   }
1420 }
1421
1422
1423 /**
1424  * Free @a ale
1425  *
1426  * @param ale address list entry to free
1427  */
1428 static void
1429 free_address_list_entry (struct AddressListEntry *ale)
1430 {
1431   struct TransportClient *tc = ale->tc;
1432
1433   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
1434                                tc->details.communicator.addr_tail,
1435                                ale);
1436   if (NULL != ale->sc)
1437   {
1438     GNUNET_PEERSTORE_store_cancel (ale->sc);
1439     ale->sc = NULL;
1440   }
1441   if (NULL != ale->st)
1442   {
1443     GNUNET_SCHEDULER_cancel (ale->st);
1444     ale->st = NULL;
1445   }
1446   GNUNET_free (ale);
1447 }
1448
1449
1450 /**
1451  * Called whenever a client is disconnected.  Frees our
1452  * resources associated with that client.
1453  *
1454  * @param cls closure, NULL
1455  * @param client identification of the client
1456  * @param app_ctx our `struct TransportClient`
1457  */
1458 static void
1459 client_disconnect_cb (void *cls,
1460                       struct GNUNET_SERVICE_Client *client,
1461                       void *app_ctx)
1462 {
1463   struct TransportClient *tc = app_ctx;
1464
1465   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1466               "Client %p disconnected, cleaning up.\n",
1467               tc);
1468   GNUNET_CONTAINER_DLL_remove (clients_head,
1469                                clients_tail,
1470                                tc);
1471   switch (tc->type)
1472   {
1473   case CT_NONE:
1474     break;
1475   case CT_CORE:
1476     {
1477       struct PendingMessage *pm;
1478
1479       while (NULL != (pm = tc->details.core.pending_msg_head))
1480       {
1481         GNUNET_CONTAINER_MDLL_remove (client,
1482                                       tc->details.core.pending_msg_head,
1483                                       tc->details.core.pending_msg_tail,
1484                                       pm);
1485         pm->client = NULL;
1486       }
1487     }
1488     break;
1489   case CT_MONITOR:
1490     break;
1491   case CT_COMMUNICATOR:
1492     {
1493       struct GNUNET_ATS_Session *q;
1494       struct AddressListEntry *ale;
1495
1496       while (NULL != (q = tc->details.communicator.session_head))
1497         free_queue (q);
1498       while (NULL != (ale = tc->details.communicator.addr_head))
1499         free_address_list_entry (ale);
1500       GNUNET_free (tc->details.communicator.address_prefix);
1501     }
1502     break;
1503   }
1504   GNUNET_free (tc);
1505 }
1506
1507
1508 /**
1509  * Iterator telling new CORE client about all existing
1510  * connections to peers.
1511  *
1512  * @param cls the new `struct TransportClient`
1513  * @param pid a connected peer
1514  * @param value the `struct Neighbour` with more information
1515  * @return #GNUNET_OK (continue to iterate)
1516  */
1517 static int
1518 notify_client_connect_info (void *cls,
1519                             const struct GNUNET_PeerIdentity *pid,
1520                             void *value)
1521 {
1522   struct TransportClient *tc = cls;
1523   struct Neighbour *neighbour = value;
1524
1525   core_send_connect_info (tc,
1526                           pid,
1527                           neighbour->quota_out);
1528   return GNUNET_OK;
1529 }
1530
1531
1532 /**
1533  * Initialize a "CORE" client.  We got a start message from this
1534  * client, so add it to the list of clients for broadcasting of
1535  * inbound messages.
1536  *
1537  * @param cls the client
1538  * @param start the start message that was sent
1539  */
1540 static void
1541 handle_client_start (void *cls,
1542                      const struct StartMessage *start)
1543 {
1544   struct TransportClient *tc = cls;
1545   uint32_t options;
1546
1547   options = ntohl (start->options);
1548   if ( (0 != (1 & options)) &&
1549        (0 !=
1550         memcmp (&start->self,
1551                 &GST_my_identity,
1552                 sizeof (struct GNUNET_PeerIdentity)) ) )
1553   {
1554     /* client thinks this is a different peer, reject */
1555     GNUNET_break (0);
1556     GNUNET_SERVICE_client_drop (tc->client);
1557     return;
1558   }
1559   if (CT_NONE != tc->type)
1560   {
1561     GNUNET_break (0);
1562     GNUNET_SERVICE_client_drop (tc->client);
1563     return;
1564   }
1565   tc->type = CT_CORE;
1566   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1567                                          &notify_client_connect_info,
1568                                          tc);
1569   GNUNET_SERVICE_client_continue (tc->client);
1570 }
1571
1572
1573 /**
1574  * Client asked for transmission to a peer.  Process the request.
1575  *
1576  * @param cls the client
1577  * @param obm the send message that was sent
1578  */
1579 static int
1580 check_client_send (void *cls,
1581                    const struct OutboundMessage *obm)
1582 {
1583   struct TransportClient *tc = cls;
1584   uint16_t size;
1585   const struct GNUNET_MessageHeader *obmm;
1586
1587   if (CT_CORE != tc->type)
1588   {
1589     GNUNET_break (0);
1590     return GNUNET_SYSERR;
1591   }
1592   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
1593   if (size < sizeof (struct GNUNET_MessageHeader))
1594   {
1595     GNUNET_break (0);
1596     return GNUNET_SYSERR;
1597   }
1598   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1599   if (size != ntohs (obmm->size))
1600   {
1601     GNUNET_break (0);
1602     return GNUNET_SYSERR;
1603   }
1604   return GNUNET_OK;
1605 }
1606
1607
1608 /**
1609  * Free fragment tree below @e root, excluding @e root itself.
1610  *
1611  * @param root root of the tree to free
1612  */  
1613 static void
1614 free_fragment_tree (struct PendingMessage *root)
1615 {
1616   struct PendingMessage *frag;
1617
1618   while (NULL != (frag = root->head_frag))
1619   {
1620     free_fragment_tree (frag);
1621     GNUNET_CONTAINER_MDLL_remove (frag,
1622                                   root->head_frag,
1623                                   root->tail_frag,
1624                                   frag);
1625     GNUNET_free (frag);
1626   }
1627 }
1628
1629
1630 /**
1631  * Send a response to the @a pm that we have processed a
1632  * "send" request with status @a success. We
1633  * transmitted @a bytes_physical on the actual wire.
1634  * Sends a confirmation to the "core" client responsible
1635  * for the original request and free's @a pm.
1636  *
1637  * @param pm handle to the original pending message
1638  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
1639  *          for transmission failure
1640  * @param bytes_physical amount of bandwidth consumed
1641  */
1642 static void
1643 client_send_response (struct PendingMessage *pm,
1644                       int success,
1645                       uint32_t bytes_physical)
1646 {
1647   struct TransportClient *tc = pm->client;
1648   struct Neighbour *target = pm->target;
1649   struct GNUNET_MQ_Envelope *env;
1650   struct SendOkMessage *som;
1651
1652   if (NULL != tc)
1653   {
1654     env = GNUNET_MQ_msg (som,
1655                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1656     som->success = htonl ((uint32_t) success);
1657     som->bytes_msg = htons (pm->bytes_msg);
1658     som->bytes_physical = htonl (bytes_physical);
1659     som->peer = target->pid;
1660     GNUNET_MQ_send (tc->mq,
1661                     env);
1662     GNUNET_CONTAINER_MDLL_remove (client,
1663                                   tc->details.core.pending_msg_head,
1664                                   tc->details.core.pending_msg_tail,
1665                                   pm);
1666   }
1667   GNUNET_CONTAINER_MDLL_remove (neighbour,
1668                                 target->pending_msg_head,
1669                                 target->pending_msg_tail,
1670                                 pm);
1671   free_fragment_tree (pm);
1672   GNUNET_free (pm);
1673 }
1674
1675
1676 /**
1677  * Checks the message queue for a neighbour for messages that have timed
1678  * out and purges them.
1679  *
1680  * @param cls a `struct Neighbour`
1681  */
1682 static void
1683 check_queue_timeouts (void *cls)
1684 {
1685   struct Neighbour *n = cls;
1686   struct PendingMessage *pm;
1687   struct GNUNET_TIME_Absolute now;
1688   struct GNUNET_TIME_Absolute earliest_timeout;
1689
1690   n->timeout_task = NULL;
1691   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1692   now = GNUNET_TIME_absolute_get ();
1693   for (struct PendingMessage *pos = n->pending_msg_head;
1694        NULL != pos;
1695        pos = pm)
1696   {
1697     pm = pos->next_neighbour;
1698     if (pos->timeout.abs_value_us <= now.abs_value_us)
1699     {
1700       GNUNET_STATISTICS_update (GST_stats,
1701                                 "# messages dropped (timeout before confirmation)",
1702                                 1,
1703                                 GNUNET_NO);
1704       client_send_response (pm,
1705                             GNUNET_NO,
1706                             0);      
1707       continue;
1708     }
1709     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
1710                                                  pos->timeout);
1711   }
1712   n->earliest_timeout = earliest_timeout;
1713   if (NULL != n->pending_msg_head)
1714     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
1715                                                &check_queue_timeouts,
1716                                                n);
1717 }
1718
1719
1720 /**
1721  * Client asked for transmission to a peer.  Process the request.
1722  *
1723  * @param cls the client
1724  * @param obm the send message that was sent
1725  */
1726 static void
1727 handle_client_send (void *cls,
1728                     const struct OutboundMessage *obm)
1729 {
1730   struct TransportClient *tc = cls;
1731   struct PendingMessage *pm;
1732   const struct GNUNET_MessageHeader *obmm;
1733   struct Neighbour *target;
1734   uint32_t bytes_msg;
1735
1736   GNUNET_assert (CT_CORE == tc->type);
1737   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1738   bytes_msg = ntohs (obmm->size);
1739   target = lookup_neighbour (&obm->peer);
1740   if (NULL == target)
1741   {
1742     /* Failure: don't have this peer as a neighbour (anymore).
1743        Might have gone down asynchronously, so this is NOT
1744        a protocol violation by CORE. Still count the event,
1745        as this should be rare. */
1746     struct GNUNET_MQ_Envelope *env;
1747     struct SendOkMessage *som;
1748
1749     env = GNUNET_MQ_msg (som,
1750                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1751     som->success = htonl (GNUNET_SYSERR);
1752     som->bytes_msg = htonl (bytes_msg);
1753     som->bytes_physical = htonl (0);
1754     som->peer = obm->peer;
1755     GNUNET_MQ_send (tc->mq,
1756                     env);
1757     GNUNET_SERVICE_client_continue (tc->client);
1758     GNUNET_STATISTICS_update (GST_stats,
1759                               "# messages dropped (neighbour unknown)",
1760                               1,
1761                               GNUNET_NO);
1762     return;
1763   }
1764   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
1765   pm->client = tc;
1766   pm->target = target;
1767   pm->bytes_msg = bytes_msg;
1768   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
1769   memcpy (&pm[1],
1770           &obm[1],
1771           bytes_msg);
1772   GNUNET_CONTAINER_MDLL_insert (neighbour,
1773                                 target->pending_msg_head,
1774                                 target->pending_msg_tail,
1775                                 pm);
1776   GNUNET_CONTAINER_MDLL_insert (client,
1777                                 tc->details.core.pending_msg_head,
1778                                 tc->details.core.pending_msg_tail,
1779                                 pm);
1780   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
1781   {
1782     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
1783     if (NULL != target->timeout_task)
1784       GNUNET_SCHEDULER_cancel (target->timeout_task);
1785     target->timeout_task 
1786       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
1787                                  &check_queue_timeouts,
1788                                  target);
1789   }
1790 }
1791
1792
1793 /**
1794  * Communicator started.  Test message is well-formed.
1795  *
1796  * @param cls the client
1797  * @param cam the send message that was sent
1798  */
1799 static int
1800 check_communicator_available (void *cls,
1801                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
1802 {
1803   struct TransportClient *tc = cls;
1804   uint16_t size;
1805
1806   if (CT_NONE != tc->type)
1807   {
1808     GNUNET_break (0);
1809     return GNUNET_SYSERR;
1810   }
1811   tc->type = CT_COMMUNICATOR;
1812   size = ntohs (cam->header.size) - sizeof (*cam);
1813   if (0 == size)
1814     return GNUNET_OK; /* receive-only communicator */
1815   GNUNET_MQ_check_zero_termination (cam);
1816   return GNUNET_OK;
1817 }
1818
1819
1820 /**
1821  * Communicator started.  Process the request.
1822  *
1823  * @param cls the client
1824  * @param cam the send message that was sent
1825  */
1826 static void
1827 handle_communicator_available (void *cls,
1828                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
1829 {
1830   struct TransportClient *tc = cls;
1831   uint16_t size;
1832
1833   size = ntohs (cam->header.size) - sizeof (*cam);
1834   if (0 == size)
1835     return; /* receive-only communicator */
1836   tc->details.communicator.address_prefix
1837     = GNUNET_strdup ((const char *) &cam[1]);
1838   tc->details.communicator.cc
1839     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
1840   GNUNET_SERVICE_client_continue (tc->client);
1841 }
1842
1843
1844 /**
1845  * Address of our peer added.  Test message is well-formed.
1846  *
1847  * @param cls the client
1848  * @param aam the send message that was sent
1849  */
1850 static int
1851 check_add_address (void *cls,
1852                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
1853 {
1854   struct TransportClient *tc = cls;
1855
1856   if (CT_COMMUNICATOR != tc->type)
1857   {
1858     GNUNET_break (0);
1859     return GNUNET_SYSERR;
1860   }
1861   GNUNET_MQ_check_zero_termination (aam);
1862   return GNUNET_OK;
1863 }
1864
1865
1866 /**
1867  * Ask peerstore to store our address.
1868  *
1869  * @param cls an `struct AddressListEntry *`
1870  */
1871 static void
1872 store_pi (void *cls);
1873
1874
1875 /**
1876  * Function called when peerstore is done storing our address.
1877  */
1878 static void
1879 peerstore_store_cb (void *cls,
1880                     int success)
1881 {
1882   struct AddressListEntry *ale = cls;
1883
1884   ale->sc = NULL;
1885   if (GNUNET_YES != success)
1886     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1887                 "Failed to store our own address `%s' in peerstore!\n",
1888                 ale->address);
1889   /* refresh period is 1/4 of expiration time, that should be plenty
1890      without being excessive. */
1891   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
1892                                                                        4ULL),
1893                                           &store_pi,
1894                                           ale);
1895 }
1896
1897
1898 /**
1899  * Ask peerstore to store our address.
1900  *
1901  * @param cls an `struct AddressListEntry *`
1902  */
1903 static void
1904 store_pi (void *cls)
1905 {
1906   struct AddressListEntry *ale = cls;
1907   void *addr;
1908   size_t addr_len;
1909   struct GNUNET_TIME_Absolute expiration;
1910
1911   ale->st = NULL;
1912   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
1913   GNUNET_HELLO_sign_address (ale->address,
1914                              ale->nt,
1915                              expiration,
1916                              GST_my_private_key,
1917                              &addr,
1918                              &addr_len);
1919   ale->sc = GNUNET_PEERSTORE_store (peerstore,
1920                                     "transport",
1921                                     &GST_my_identity,
1922                                     GNUNET_HELLO_PEERSTORE_KEY,
1923                                     addr,
1924                                     addr_len,
1925                                     expiration,
1926                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
1927                                     &peerstore_store_cb,
1928                                     ale);
1929   GNUNET_free (addr);
1930   if (NULL == ale->sc)
1931   {
1932     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1933                 "Failed to store our address `%s' with peerstore\n",
1934                 ale->address);
1935     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1936                                             &store_pi,
1937                                             ale);
1938   }
1939 }
1940
1941
1942 /**
1943  * Address of our peer added.  Process the request.
1944  *
1945  * @param cls the client
1946  * @param aam the send message that was sent
1947  */
1948 static void
1949 handle_add_address (void *cls,
1950                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
1951 {
1952   struct TransportClient *tc = cls;
1953   struct AddressListEntry *ale;
1954   size_t slen;
1955
1956   slen = ntohs (aam->header.size) - sizeof (*aam);
1957   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
1958   ale->tc = tc;
1959   ale->address = (const char *) &ale[1];
1960   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
1961   ale->aid = aam->aid;
1962   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
1963   memcpy (&ale[1],
1964           &aam[1],
1965           slen);
1966   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
1967                                tc->details.communicator.addr_tail,
1968                                ale);
1969   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
1970                                       ale);
1971   GNUNET_SERVICE_client_continue (tc->client);
1972 }
1973
1974
1975 /**
1976  * Address of our peer deleted.  Process the request.
1977  *
1978  * @param cls the client
1979  * @param dam the send message that was sent
1980  */
1981 static void
1982 handle_del_address (void *cls,
1983                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
1984 {
1985   struct TransportClient *tc = cls;
1986
1987   if (CT_COMMUNICATOR != tc->type)
1988   {
1989     GNUNET_break (0);
1990     GNUNET_SERVICE_client_drop (tc->client);
1991     return;
1992   }
1993   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
1994        NULL != ale;
1995        ale = ale->next)
1996   {
1997     if (dam->aid != ale->aid)
1998       continue;
1999     GNUNET_assert (ale->tc == tc);
2000     free_address_list_entry (ale);
2001     GNUNET_SERVICE_client_continue (tc->client);
2002   }
2003   GNUNET_break (0);
2004   GNUNET_SERVICE_client_drop (tc->client);
2005 }
2006
2007
2008 /**
2009  * Client notified us about transmission from a peer.  Process the request.
2010  *
2011  * @param cls the client
2012  * @param obm the send message that was sent
2013  */
2014 static int
2015 check_incoming_msg (void *cls,
2016                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
2017 {
2018   struct TransportClient *tc = cls;
2019   uint16_t size;
2020   const struct GNUNET_MessageHeader *obmm;
2021
2022   if (CT_COMMUNICATOR != tc->type)
2023   {
2024     GNUNET_break (0);
2025     return GNUNET_SYSERR;
2026   }
2027   size = ntohs (im->header.size) - sizeof (*im);
2028   if (size < sizeof (struct GNUNET_MessageHeader))
2029   {
2030     GNUNET_break (0);
2031     return GNUNET_SYSERR;
2032   }
2033   obmm = (const struct GNUNET_MessageHeader *) &im[1];
2034   if (size != ntohs (obmm->size))
2035   {
2036     GNUNET_break (0);
2037     return GNUNET_SYSERR;
2038   }
2039   return GNUNET_OK;
2040 }
2041
2042
2043 /**
2044  * Incoming meessage.  Process the request.
2045  *
2046  * @param cls the client
2047  * @param im the send message that was received
2048  */
2049 static void
2050 handle_incoming_msg (void *cls,
2051                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
2052 {
2053   struct TransportClient *tc = cls;
2054
2055   GNUNET_SERVICE_client_continue (tc->client);
2056 }
2057
2058
2059 /**
2060  * New queue became available.  Check message.
2061  *
2062  * @param cls the client
2063  * @param aqm the send message that was sent
2064  */
2065 static int
2066 check_add_queue_message (void *cls,
2067                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
2068 {
2069   struct TransportClient *tc = cls;
2070
2071   if (CT_COMMUNICATOR != tc->type)
2072   {
2073     GNUNET_break (0);
2074     return GNUNET_SYSERR;
2075   }
2076   GNUNET_MQ_check_zero_termination (aqm);
2077   return GNUNET_OK;
2078 }
2079
2080
2081 /**
2082  * Bandwidth tracker informs us that the delay until we should receive
2083  * more has changed.
2084  *
2085  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
2086  */
2087 static void
2088 tracker_update_in_cb (void *cls)
2089 {
2090   struct GNUNET_ATS_Session *queue = cls;
2091   struct GNUNET_TIME_Relative in_delay;
2092   unsigned int rsize;
2093   
2094   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
2095   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
2096                                                  rsize);
2097   // FIXME: how exactly do we do inbound flow control?
2098 }
2099
2100
2101 /**
2102  * We believe we are ready to transmit a message on a queue. Double-checks
2103  * with the queue's "tracker_out" and then gives the message to the 
2104  * communicator for transmission (updating the tracker, and re-scheduling
2105  * itself if applicable).  
2106  *
2107  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
2108  */ 
2109 static void
2110 transmit_on_queue (void *cls);
2111
2112
2113 /**
2114  * Schedule next run of #transmit_on_queue().  Does NOTHING if 
2115  * we should run immediately or if the message queue is empty.
2116  * Test for no task being added AND queue not being empty to
2117  * transmit immediately afterwards!  This function must only
2118  * be called if the message queue is non-empty!
2119  *
2120  * @param queue the queue to do scheduling for
2121  */ 
2122 static void
2123 schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
2124 {
2125   struct Neighbour *n = queue->neighbour;
2126   struct PendingMessage *pm = n->pending_msg_head;
2127   struct GNUNET_TIME_Relative out_delay;
2128   unsigned int wsize;
2129
2130   GNUNET_assert (NULL != pm);
2131   wsize = (0 == queue->mtu)
2132     ? pm->bytes_msg /* FIXME: add overheads? */
2133     : queue->mtu;
2134   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
2135                                                   wsize);
2136   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
2137                                         out_delay);
2138   if (0 == out_delay.rel_value_us)
2139     return; /* we should run immediately! */
2140   /* queue has changed since we were scheduled, reschedule again */
2141   queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
2142                                                        &transmit_on_queue,
2143                                                        queue);
2144   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
2145     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2146                 "Next transmission on queue `%s' in %s (high delay)\n",
2147                 queue->address,
2148                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2149                                                         GNUNET_YES));
2150   else
2151     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2152                 "Next transmission on queue `%s' in %s\n",
2153                 queue->address,
2154                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2155                                                         GNUNET_YES));
2156 }
2157
2158
2159 /**
2160  * Fragment the given @a pm to the given @a mtu.  Adds 
2161  * additional fragments to the neighbour as well. If the
2162  * @a mtu is too small, generates and error for the @a pm
2163  * and returns NULL.
2164  *
2165  * @param pm pending message to fragment for transmission
2166  * @param mtu MTU to apply
2167  * @return new message to transmit
2168  */
2169 static struct PendingMessage *
2170 fragment_message (struct PendingMessage *pm,
2171                   uint16_t mtu)
2172 {
2173   struct PendingMessage *ff;
2174
2175   if (GNUNET_NO == pm->msg_uuid_set)
2176   {
2177     GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
2178                                 &pm->msg_uuid,
2179                                 sizeof (pm->msg_uuid));
2180     pm->msg_uuid_set = GNUNET_YES;
2181   }
2182   
2183   /* This invariant is established in #handle_add_queue_message() */
2184   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
2185
2186   /* select fragment for transmission, descending the tree if it has
2187      been expanded until we are at a leaf or at a fragment that is small enough */
2188   ff = pm;
2189   while ( ( (ff->bytes_msg > mtu) ||
2190             (pm == ff) ) &&
2191           (ff->frag_off == ff->bytes_msg) &&
2192           (NULL != ff->head_frag) )
2193   {
2194     ff = ff->head_frag; /* descent into fragmented fragments */
2195   }
2196
2197   if ( ( (ff->bytes_msg > mtu) ||
2198          (pm == ff) ) &&
2199        (pm->frag_off < pm->bytes_msg) )
2200   {
2201     /* Did not yet calculate all fragments, calculate next fragment */
2202     struct PendingMessage *frag;
2203     struct TransportFragmentBox tfb;
2204     const char *orig;
2205     char *msg;
2206     uint16_t fragmax;
2207     uint16_t fragsize;
2208     uint16_t msize;
2209     uint16_t xoff = 0;
2210
2211     orig = (const char *) &ff[1];
2212     msize = ff->bytes_msg;
2213     if (pm != ff)
2214     {
2215       const struct TransportFragmentBox *tfbo;
2216
2217       tfbo = (const struct TransportFragmentBox *) orig;
2218       orig += sizeof (struct TransportFragmentBox);
2219       msize -= sizeof (struct TransportFragmentBox);
2220       xoff = ntohs (tfbo->frag_off);
2221     }
2222     fragmax = mtu - sizeof (struct TransportFragmentBox);
2223     fragsize = GNUNET_MIN (msize - ff->frag_off,
2224                            fragmax);
2225     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
2226                           sizeof (struct TransportFragmentBox) +
2227                           fragsize);
2228     frag->target = pm->target;
2229     frag->frag_parent = ff;
2230     frag->timeout = pm->timeout;
2231     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
2232     frag->pmt = PMT_FRAGMENT_BOX;
2233     msg = (char *) &frag[1];
2234     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
2235     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
2236                              fragsize);
2237     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
2238     tfb.msg_uuid = pm->msg_uuid;
2239     tfb.frag_off = htons (ff->frag_off + xoff);
2240     tfb.msg_size = htons (pm->bytes_msg);
2241     memcpy (msg,
2242             &tfb,
2243             sizeof (tfb));
2244     memcpy (&msg[sizeof (tfb)],
2245             &orig[ff->frag_off],
2246             fragsize);
2247     GNUNET_CONTAINER_MDLL_insert (frag,
2248                                   ff->head_frag,
2249                                   ff->tail_frag,
2250                                   frag);
2251     ff->frag_off += fragsize;
2252     ff = frag;
2253   }
2254
2255   /* Move head to the tail and return it */
2256   GNUNET_CONTAINER_MDLL_remove (frag,
2257                                 ff->frag_parent->head_frag,
2258                                 ff->frag_parent->tail_frag,
2259                                 ff);
2260   GNUNET_CONTAINER_MDLL_insert_tail (frag,
2261                                      ff->frag_parent->head_frag,
2262                                      ff->frag_parent->tail_frag,
2263                                      ff);
2264   return ff;
2265 }
2266
2267
2268 /**
2269  * Reliability-box the given @a pm. On error (can there be any), NULL
2270  * may be returned, otherwise the "replacement" for @a pm (which
2271  * should then be added to the respective neighbour's queue instead of
2272  * @a pm).  If the @a pm is already fragmented or reliability boxed,
2273  * or itself an ACK, this function simply returns @a pm.
2274  *
2275  * @param pm pending message to box for transmission over unreliabile queue
2276  * @return new message to transmit
2277  */
2278 static struct PendingMessage *
2279 reliability_box_message (struct PendingMessage *pm)
2280 {
2281   if (PMT_CORE != pm->pmt) 
2282   {
2283     /* already fragmented or reliability boxed, or control message: do nothing */
2284     return pm;
2285   }
2286   
2287   if (0) // FIXME
2288   {
2289     /* failed hard */
2290     // FIMXE: bitch
2291     client_send_response (pm,
2292                           GNUNET_NO,
2293                           0);
2294     return NULL;
2295   }
2296
2297   /* FIXME: return boxed PM here! */
2298   return NULL;
2299 }
2300
2301
2302 /**
2303  * We believe we are ready to transmit a message on a queue. Double-checks
2304  * with the queue's "tracker_out" and then gives the message to the 
2305  * communicator for transmission (updating the tracker, and re-scheduling
2306  * itself if applicable).  
2307  *
2308  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
2309  */ 
2310 static void
2311 transmit_on_queue (void *cls)
2312 {
2313   struct GNUNET_ATS_Session *queue = cls;
2314   struct Neighbour *n = queue->neighbour;
2315   struct PendingMessage *pm;
2316   struct PendingMessage *s;
2317   uint32_t overhead;
2318
2319   queue->transmit_task = NULL;
2320   if (NULL == (pm = n->pending_msg_head))
2321   {
2322     /* no message pending, nothing to do here! */
2323     return; 
2324   }
2325   schedule_transmit_on_queue (queue);
2326   if (NULL != queue->transmit_task)
2327     return; /* do it later */
2328   overhead = 0;
2329   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
2330     overhead += sizeof (struct TransportReliabilityBox);
2331   s = pm;
2332   if ( ( (0 != queue->mtu) &&
2333          (pm->bytes_msg + overhead > queue->mtu) ) ||
2334        (NULL != pm->head_frag /* fragments already exist, should
2335                                  respect that even if MTU is 0 for
2336                                  this queue */) )
2337     s = fragment_message (s,
2338                           (0 == queue->mtu)
2339                           ? UINT16_MAX /* no real maximum */
2340                           : queue->mtu);
2341   if (NULL == s)
2342   {
2343     /* Fragmentation failed, try next message... */
2344     schedule_transmit_on_queue (queue);
2345     return;
2346   }
2347   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
2348     s = reliability_box_message (s);
2349   if (NULL == s)
2350   {
2351     /* Reliability boxing failed, try next message... */
2352     schedule_transmit_on_queue (queue);
2353     return;
2354   }
2355   
2356   // FIXME: actually give 's' to communicator for transmission here!
2357
2358   // FIXME: do something similar to the logic below
2359   // in defragmentation / reliability ACK handling!
2360
2361   /* Check if this transmission somehow conclusively finished handing 'pm'
2362      even without any explicit ACKs */
2363   if ( (PMT_CORE == s->pmt) &&
2364        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
2365   {
2366     /* Full message sent, and over reliabile channel */
2367     client_send_response (pm,
2368                           GNUNET_YES,
2369                           pm->bytes_msg);
2370   }
2371   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
2372             (PMT_FRAGMENT_BOX == s->pmt) )
2373   {
2374     struct PendingMessage *pos;
2375     
2376     /* Fragment sent over reliabile channel */
2377     free_fragment_tree (s);
2378     pos = s->frag_parent;
2379     GNUNET_CONTAINER_MDLL_remove (frag,
2380                                   pos->head_frag,
2381                                   pos->tail_frag,
2382                                   s);
2383     GNUNET_free (s);
2384     /* check if subtree is done */
2385     while ( (NULL == pos->head_frag) &&
2386             (pos->frag_off == pos->bytes_msg) &&
2387             (pos != pm) )
2388     {
2389       s = pos;
2390       pos = s->frag_parent;
2391       GNUNET_CONTAINER_MDLL_remove (frag,
2392                                     pos->head_frag,
2393                                     pos->tail_frag,
2394                                     s);
2395       GNUNET_free (s);      
2396     }
2397     
2398     /* Was this the last applicable fragmment? */
2399     if ( (NULL == pm->head_frag) &&
2400          (pm->frag_off == pm->bytes_msg) )
2401       client_send_response (pm,
2402                             GNUNET_YES,
2403                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
2404   }
2405   else if (PMT_CORE != pm->pmt)
2406   {
2407     /* This was an acknowledgement of some type, always free */
2408
2409     struct Neighbour *neighbour = pm->target;
2410     GNUNET_CONTAINER_MDLL_remove (neighbour,
2411                                   neighbour->pending_msg_head,
2412                                   neighbour->pending_msg_tail,
2413                                   pm);
2414     GNUNET_free (pm);
2415   }
2416   else
2417   {
2418     /* message not finished, waiting for acknowledgement */
2419     // FIXME: update time by which we might retransmit 's' based on
2420     // queue characteristics (i.e. RTT)
2421     
2422     // FIXME: move 'pm' back in the transmission queue (simplistic: to
2423     // the end, better: with position depending on type, timeout,
2424     // etc.)
2425   }
2426   
2427   /* finally, re-schedule self */
2428   schedule_transmit_on_queue (queue);
2429 }
2430
2431
2432 /**
2433  * Bandwidth tracker informs us that the delay until we
2434  * can transmit again changed.
2435  *
2436  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
2437  */
2438 static void
2439 tracker_update_out_cb (void *cls)
2440 {
2441   struct GNUNET_ATS_Session *queue = cls;
2442   struct Neighbour *n = queue->neighbour;
2443
2444   if (NULL == n->pending_msg_head)
2445   {
2446     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2447                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
2448                 queue->address);
2449     return; /* no message pending, nothing to do here! */
2450   }
2451   GNUNET_SCHEDULER_cancel (queue->transmit_task);
2452   queue->transmit_task = NULL;
2453   schedule_transmit_on_queue (queue);
2454 }
2455
2456
2457 /**
2458  * Bandwidth tracker informs us that excessive outbound bandwidth was
2459  * allocated which is not being used.
2460  *
2461  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
2462  */
2463 static void
2464 tracker_excess_out_cb (void *cls)
2465 {
2466   /* FIXME: trigger excess bandwidth report to core? Right now,
2467      this is done internally within transport_api2_core already,
2468      but we probably want to change the logic and trigger it 
2469      from here via a message instead! */
2470   /* TODO: maybe inform ATS at this point? */
2471   GNUNET_STATISTICS_update (GST_stats,
2472                             "# Excess outbound bandwidth reported",
2473                             1,
2474                             GNUNET_NO);    
2475 }
2476
2477
2478
2479 /**
2480  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
2481  * which is not being used.
2482  *
2483  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
2484  */
2485 static void
2486 tracker_excess_in_cb (void *cls)
2487 {
2488   /* TODO: maybe inform ATS at this point? */
2489   GNUNET_STATISTICS_update (GST_stats,
2490                             "# Excess inbound bandwidth reported",
2491                             1,
2492                             GNUNET_NO);    
2493 }
2494
2495
2496 /**
2497  * New queue became available.  Process the request.
2498  *
2499  * @param cls the client
2500  * @param aqm the send message that was sent
2501  */
2502 static void
2503 handle_add_queue_message (void *cls,
2504                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
2505 {
2506   struct TransportClient *tc = cls;
2507   struct GNUNET_ATS_Session *queue;
2508   struct Neighbour *neighbour;
2509   const char *addr;
2510   uint16_t addr_len;
2511
2512   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
2513   {
2514     /* MTU so small as to be useless for transmissions,
2515        required for #fragment_message()! */
2516     GNUNET_break_op (0);
2517     GNUNET_SERVICE_client_drop (tc->client);
2518     return;
2519   }
2520   neighbour = lookup_neighbour (&aqm->receiver);
2521   if (NULL == neighbour)
2522   {
2523     neighbour = GNUNET_new (struct Neighbour);
2524     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2525     neighbour->pid = aqm->receiver;
2526     GNUNET_assert (GNUNET_OK ==
2527                    GNUNET_CONTAINER_multipeermap_put (neighbours,
2528                                                       &neighbour->pid,
2529                                                       neighbour,
2530                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2531     cores_send_connect_info (&neighbour->pid,
2532                              GNUNET_BANDWIDTH_ZERO);
2533   }
2534   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
2535   addr = (const char *) &aqm[1];
2536
2537   queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
2538   queue->tc = tc;
2539   queue->address = (const char *) &queue[1];
2540   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
2541   queue->qid = aqm->qid;
2542   queue->mtu = ntohl (aqm->mtu);
2543   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
2544   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
2545   queue->neighbour = neighbour;
2546   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
2547                                   &tracker_update_in_cb,
2548                                   queue,
2549                                   GNUNET_BANDWIDTH_ZERO,
2550                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
2551                                   &tracker_excess_in_cb,
2552                                   queue);
2553   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
2554                                   &tracker_update_out_cb,
2555                                   queue,
2556                                   GNUNET_BANDWIDTH_ZERO,
2557                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
2558                                   &tracker_excess_out_cb,
2559                                   queue);
2560   memcpy (&queue[1],
2561           addr,
2562           addr_len);
2563   /* notify ATS about new queue */
2564   {
2565     struct GNUNET_ATS_Properties prop = {
2566       .delay = GNUNET_TIME_UNIT_FOREVER_REL,
2567       .mtu = queue->mtu,
2568       .nt = queue->nt,
2569       .cc = tc->details.communicator.cc
2570     };
2571     
2572     queue->sr = GNUNET_ATS_session_add (ats,
2573                                         &neighbour->pid,
2574                                         queue->address,
2575                                         queue,
2576                                         &prop);
2577     if  (NULL == queue->sr)
2578     {
2579       /* This can only happen if the 'address' was way too long for ATS
2580          (approaching 64k in strlen()!). In this case, the communicator
2581          must be buggy and we drop it. */
2582       GNUNET_break (0);
2583       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2584       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2585       GNUNET_free (queue);
2586       if (NULL == neighbour->session_head)
2587       {
2588         cores_send_disconnect_info (&neighbour->pid);
2589         free_neighbour (neighbour);
2590       }
2591       GNUNET_SERVICE_client_drop (tc->client);
2592       return;
2593     }
2594   }
2595   /* notify monitors about new queue */
2596   {
2597     struct MonitorEvent me = {
2598       .rtt = queue->rtt,
2599       .cs = queue->cs
2600     };
2601
2602     notify_monitors (&neighbour->pid,
2603                      queue->address,
2604                      queue->nt,
2605                      &me);
2606   }
2607   GNUNET_CONTAINER_MDLL_insert (neighbour,
2608                                 neighbour->session_head,
2609                                 neighbour->session_tail,
2610                                 queue);
2611   GNUNET_CONTAINER_MDLL_insert (client,
2612                                 tc->details.communicator.session_head,
2613                                 tc->details.communicator.session_tail,
2614                                 queue);
2615   GNUNET_SERVICE_client_continue (tc->client);
2616 }
2617
2618
2619 /**
2620  * Queue to a peer went down.  Process the request.
2621  *
2622  * @param cls the client
2623  * @param dqm the send message that was sent
2624  */
2625 static void
2626 handle_del_queue_message (void *cls,
2627                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
2628 {
2629   struct TransportClient *tc = cls;
2630
2631   if (CT_COMMUNICATOR != tc->type)
2632   {
2633     GNUNET_break (0);
2634     GNUNET_SERVICE_client_drop (tc->client);
2635     return;
2636   }
2637   for (struct GNUNET_ATS_Session *queue = tc->details.communicator.session_head;
2638        NULL != queue;
2639        queue = queue->next_client)
2640   {
2641     struct Neighbour *neighbour = queue->neighbour;
2642
2643     if ( (dqm->qid != queue->qid) ||
2644          (0 != memcmp (&dqm->receiver,
2645                        &neighbour->pid,
2646                        sizeof (struct GNUNET_PeerIdentity))) )
2647       continue;
2648     free_queue (queue);
2649     GNUNET_SERVICE_client_continue (tc->client);
2650     return;
2651   }
2652   GNUNET_break (0);
2653   GNUNET_SERVICE_client_drop (tc->client);
2654 }
2655
2656
2657 /**
2658  * Message was transmitted.  Process the request.
2659  *
2660  * @param cls the client
2661  * @param sma the send message that was sent
2662  */
2663 static void
2664 handle_send_message_ack (void *cls,
2665                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
2666 {
2667   struct TransportClient *tc = cls;
2668
2669   if (CT_COMMUNICATOR != tc->type)
2670   {
2671     GNUNET_break (0);
2672     GNUNET_SERVICE_client_drop (tc->client);
2673     return;
2674   }
2675   // FIXME: react to communicator status about transmission request. We got:
2676   sma->status; // OK success, SYSERR failure
2677   sma->mid; // message ID of original message
2678   sma->receiver; // receiver of original message
2679
2680   
2681   GNUNET_SERVICE_client_continue (tc->client);
2682 }
2683
2684
2685 /**
2686  * Iterator telling new MONITOR client about all existing
2687  * queues to peers.
2688  *
2689  * @param cls the new `struct TransportClient`
2690  * @param pid a connected peer
2691  * @param value the `struct Neighbour` with more information
2692  * @return #GNUNET_OK (continue to iterate)
2693  */
2694 static int
2695 notify_client_queues (void *cls,
2696                       const struct GNUNET_PeerIdentity *pid,
2697                       void *value)
2698 {
2699   struct TransportClient *tc = cls;
2700   struct Neighbour *neighbour = value;
2701
2702   GNUNET_assert (CT_MONITOR == tc->type);
2703   for (struct GNUNET_ATS_Session *q = neighbour->session_head;
2704        NULL != q;
2705        q = q->next_neighbour)
2706   {
2707     struct MonitorEvent me = {
2708       .rtt = q->rtt,
2709       .cs = q->cs,
2710       .num_msg_pending = q->num_msg_pending,
2711       .num_bytes_pending = q->num_bytes_pending
2712     };
2713
2714     notify_monitor (tc,
2715                     pid,
2716                     q->address,
2717                     q->nt,
2718                     &me);
2719   }
2720   return GNUNET_OK;
2721 }
2722
2723
2724 /**
2725  * Initialize a monitor client.
2726  *
2727  * @param cls the client
2728  * @param start the start message that was sent
2729  */
2730 static void
2731 handle_monitor_start (void *cls,
2732                       const struct GNUNET_TRANSPORT_MonitorStart *start)
2733 {
2734   struct TransportClient *tc = cls;
2735
2736   if (CT_NONE != tc->type)
2737   {
2738     GNUNET_break (0);
2739     GNUNET_SERVICE_client_drop (tc->client);
2740     return;
2741   }
2742   tc->type = CT_MONITOR;
2743   tc->details.monitor.peer = start->peer;
2744   tc->details.monitor.one_shot = ntohl (start->one_shot);
2745   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2746                                          &notify_client_queues,
2747                                          tc);
2748   GNUNET_SERVICE_client_mark_monitor (tc->client);
2749   GNUNET_SERVICE_client_continue (tc->client);
2750 }
2751
2752
2753 /**
2754  * Signature of a function called by ATS with the current bandwidth
2755  * allocation to be used as determined by ATS.
2756  *
2757  * @param cls closure, NULL
2758  * @param session session this is about
2759  * @param bandwidth_out assigned outbound bandwidth for the connection,
2760  *        0 to signal disconnect
2761  * @param bandwidth_in assigned inbound bandwidth for the connection,
2762  *        0 to signal disconnect
2763  */
2764 static void
2765 ats_allocation_cb (void *cls,
2766                    struct GNUNET_ATS_Session *session,
2767                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
2768                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
2769 {
2770   (void) cls;
2771   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
2772                                          bandwidth_out);
2773   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
2774                                          bandwidth_in);
2775 }
2776
2777
2778 /**
2779  * Find transport client providing communication service
2780  * for the protocol @a prefix.
2781  *
2782  * @param prefix communicator name
2783  * @return NULL if no such transport client is available
2784  */
2785 static struct TransportClient *
2786 lookup_communicator (const char *prefix)
2787 {
2788   for (struct TransportClient *tc = clients_head;
2789        NULL != tc;
2790        tc = tc->next)
2791   {
2792     if (CT_COMMUNICATOR != tc->type)
2793       continue;
2794     if (0 == strcmp (prefix,
2795                      tc->details.communicator.address_prefix))
2796       return tc;
2797   }
2798   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2799               "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
2800               prefix);
2801   return NULL;
2802 }
2803
2804
2805 /**
2806  * Signature of a function called by ATS suggesting transport to
2807  * try connecting with a particular address.
2808  *
2809  * @param cls closure, NULL
2810  * @param pid target peer
2811  * @param address the address to try
2812  */
2813 static void
2814 ats_suggestion_cb (void *cls,
2815                    const struct GNUNET_PeerIdentity *pid,
2816                    const char *address)
2817 {
2818   static uint32_t idgen;
2819   struct TransportClient *tc;
2820   char *prefix;
2821   struct GNUNET_TRANSPORT_CreateQueue *cqm;
2822   struct GNUNET_MQ_Envelope *env;
2823   size_t alen;
2824
2825   (void) cls;
2826   prefix = GNUNET_HELLO_address_to_prefix (address);
2827   if (NULL == prefix)
2828   {
2829     GNUNET_break (0); /* ATS gave invalid address!? */
2830     return;
2831   }
2832   tc = lookup_communicator (prefix);
2833   if (NULL == tc)
2834   {
2835     GNUNET_STATISTICS_update (GST_stats,
2836                               "# ATS suggestions ignored due to missing communicator",
2837                               1,
2838                               GNUNET_NO);    
2839     return;
2840   }
2841   /* forward suggestion for queue creation to communicator */
2842   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2843               "Request #%u for `%s' communicator to create queue to `%s'\n",
2844               (unsigned int) idgen,
2845               prefix,
2846               address);
2847   alen = strlen (address) + 1;
2848   env = GNUNET_MQ_msg_extra (cqm,
2849                              alen,
2850                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
2851   cqm->request_id = htonl (idgen++);
2852   cqm->receiver = *pid;
2853   memcpy (&cqm[1],
2854           address,
2855           alen);
2856   GNUNET_MQ_send (tc->mq,
2857                   env);
2858 }
2859
2860
2861 /**
2862  * Communicator tells us that our request to create a queue "worked", that
2863  * is setting up the queue is now in process.
2864  *
2865  * @param cls the `struct TransportClient`
2866  * @param cqr confirmation message
2867  */ 
2868 static void
2869 handle_queue_create_ok (void *cls,
2870                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
2871 {
2872   struct TransportClient *tc = cls;
2873
2874   if (CT_COMMUNICATOR != tc->type)
2875   {
2876     GNUNET_break (0);
2877     GNUNET_SERVICE_client_drop (tc->client);
2878     return;
2879   }
2880   GNUNET_STATISTICS_update (GST_stats,
2881                             "# ATS suggestions succeeded at communicator",
2882                             1,
2883                             GNUNET_NO);
2884   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2885               "Request #%u for communicator to create queue succeeded\n",
2886               (unsigned int) ntohs (cqr->request_id));
2887   GNUNET_SERVICE_client_continue (tc->client);    
2888 }
2889
2890
2891 /**
2892  * Communicator tells us that our request to create a queue failed. This usually
2893  * indicates that the provided address is simply invalid or that the communicator's
2894  * resources are exhausted.
2895  *
2896  * @param cls the `struct TransportClient`
2897  * @param cqr failure message
2898  */ 
2899 static void
2900 handle_queue_create_fail (void *cls,
2901                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
2902 {
2903   struct TransportClient *tc = cls;
2904
2905   if (CT_COMMUNICATOR != tc->type)
2906   {
2907     GNUNET_break (0);
2908     GNUNET_SERVICE_client_drop (tc->client);
2909     return;
2910   }
2911   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2912               "Request #%u for communicator to create queue failed\n",
2913               (unsigned int) ntohs (cqr->request_id));
2914   GNUNET_STATISTICS_update (GST_stats,
2915                             "# ATS suggestions failed in queue creation at communicator",
2916                             1,
2917                             GNUNET_NO);
2918   GNUNET_SERVICE_client_continue (tc->client);    
2919 }
2920
2921
2922 /**
2923  * Free neighbour entry.
2924  *
2925  * @param cls NULL
2926  * @param pid unused
2927  * @param value a `struct Neighbour`
2928  * @return #GNUNET_OK (always)
2929  */
2930 static int
2931 free_neighbour_cb (void *cls,
2932                    const struct GNUNET_PeerIdentity *pid,
2933                    void *value)
2934 {
2935   struct Neighbour *neighbour = value;
2936
2937   (void) cls;
2938   (void) pid;
2939   GNUNET_break (0); // should this ever happen?
2940   free_neighbour (neighbour);
2941
2942   return GNUNET_OK;
2943 }
2944
2945
2946 /**
2947  * Free ephemeral entry.
2948  *
2949  * @param cls NULL
2950  * @param pid unused
2951  * @param value a `struct Neighbour`
2952  * @return #GNUNET_OK (always)
2953  */
2954 static int
2955 free_ephemeral_cb (void *cls,
2956                    const struct GNUNET_PeerIdentity *pid,
2957                    void *value)
2958 {
2959   struct EphemeralCacheEntry *ece = value;
2960
2961   (void) cls;
2962   (void) pid;
2963   free_ephemeral (ece);
2964   return GNUNET_OK;
2965 }
2966
2967
2968 /**
2969  * Function called when the service shuts down.  Unloads our plugins
2970  * and cancels pending validations.
2971  *
2972  * @param cls closure, unused
2973  */
2974 static void
2975 do_shutdown (void *cls)
2976 {
2977   (void) cls;
2978
2979   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2980                                          &free_neighbour_cb,
2981                                          NULL);
2982   if (NULL != ats)
2983   {
2984     GNUNET_ATS_transport_done (ats);
2985     ats = NULL;
2986   }
2987   if (NULL != peerstore)
2988   {
2989     GNUNET_PEERSTORE_disconnect (peerstore,
2990                                  GNUNET_NO);
2991     peerstore = NULL;
2992   }
2993   if (NULL != GST_stats)
2994   {
2995     GNUNET_STATISTICS_destroy (GST_stats,
2996                                GNUNET_NO);
2997     GST_stats = NULL;
2998   }
2999   if (NULL != GST_my_private_key)
3000   {
3001     GNUNET_free (GST_my_private_key);
3002     GST_my_private_key = NULL;
3003   }
3004   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
3005   neighbours = NULL;
3006   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
3007                                          &free_ephemeral_cb,
3008                                          NULL);
3009   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
3010   ephemeral_map = NULL;
3011   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
3012   ephemeral_heap = NULL;
3013 }
3014
3015
3016 /**
3017  * Initiate transport service.
3018  *
3019  * @param cls closure
3020  * @param c configuration to use
3021  * @param service the initialized service
3022  */
3023 static void
3024 run (void *cls,
3025      const struct GNUNET_CONFIGURATION_Handle *c,
3026      struct GNUNET_SERVICE_Handle *service)
3027 {
3028   (void) cls;
3029   /* setup globals */
3030   GST_cfg = c;
3031   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
3032                                                      GNUNET_YES);
3033   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
3034                                                         GNUNET_YES);
3035   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3036   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
3037   if (NULL == GST_my_private_key)
3038   {
3039     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3040                 _("Transport service is lacking key configuration settings. Exiting.\n"));
3041     GNUNET_SCHEDULER_shutdown ();
3042     return;
3043   }
3044   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
3045                                       &GST_my_identity.public_key);
3046   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
3047              "My identity is `%s'\n",
3048              GNUNET_i2s_full (&GST_my_identity));
3049   GST_stats = GNUNET_STATISTICS_create ("transport",
3050                                         GST_cfg);
3051   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
3052                                  NULL);
3053   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
3054   if (NULL == peerstore)
3055   {
3056     GNUNET_break (0);
3057     GNUNET_SCHEDULER_shutdown ();
3058     return;
3059   }
3060   ats = GNUNET_ATS_transport_init (GST_cfg,
3061                                    &ats_allocation_cb,
3062                                    NULL,
3063                                    &ats_suggestion_cb,
3064                                    NULL);
3065   if (NULL == ats)
3066   {
3067     GNUNET_break (0);
3068     GNUNET_SCHEDULER_shutdown ();
3069     return;
3070   }
3071 }
3072
3073
3074 /**
3075  * Define "main" method using service macro.
3076  */
3077 GNUNET_SERVICE_MAIN
3078 ("transport",
3079  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
3080  &run,
3081  &client_connect_cb,
3082  &client_disconnect_cb,
3083  NULL,
3084  /* communication with core */
3085  GNUNET_MQ_hd_fixed_size (client_start,
3086                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
3087                           struct StartMessage,
3088                           NULL),
3089  GNUNET_MQ_hd_var_size (client_send,
3090                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
3091                         struct OutboundMessage,
3092                         NULL),
3093  /* communication with communicators */
3094  GNUNET_MQ_hd_var_size (communicator_available,
3095                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
3096                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
3097                         NULL),
3098  GNUNET_MQ_hd_var_size (add_address,
3099                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
3100                         struct GNUNET_TRANSPORT_AddAddressMessage,
3101                         NULL),
3102  GNUNET_MQ_hd_fixed_size (del_address,
3103                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
3104                           struct GNUNET_TRANSPORT_DelAddressMessage,
3105                           NULL),
3106  GNUNET_MQ_hd_var_size (incoming_msg,
3107                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
3108                         struct GNUNET_TRANSPORT_IncomingMessage,
3109                         NULL),
3110  GNUNET_MQ_hd_fixed_size (queue_create_ok,
3111                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
3112                           struct GNUNET_TRANSPORT_CreateQueueResponse,
3113                           NULL),
3114  GNUNET_MQ_hd_fixed_size (queue_create_fail,
3115                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
3116                           struct GNUNET_TRANSPORT_CreateQueueResponse,
3117                           NULL),
3118  GNUNET_MQ_hd_var_size (add_queue_message,
3119                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
3120                         struct GNUNET_TRANSPORT_AddQueueMessage,
3121                         NULL),
3122  GNUNET_MQ_hd_fixed_size (del_queue_message,
3123                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
3124                           struct GNUNET_TRANSPORT_DelQueueMessage,
3125                           NULL),
3126  GNUNET_MQ_hd_fixed_size (send_message_ack,
3127                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
3128                           struct GNUNET_TRANSPORT_SendMessageToAck,
3129                           NULL),
3130  /* communication with monitors */
3131  GNUNET_MQ_hd_fixed_size (monitor_start,
3132                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
3133                           struct GNUNET_TRANSPORT_MonitorStart,
3134                           NULL),
3135  GNUNET_MQ_handler_end ());
3136
3137
3138 /* end of file gnunet-service-transport.c */