notes
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement next:
36  * - address validation: what is our plan here?
37  *   #1 Peerstore only gets 'validated' addresses
38  *   #2 transport needs another API to "trigger" validation!
39  *      API may be used by core/application or communicators;
40  *      => use yet another lib/MQ/connection?
41  *   #3 transport should use validation to also establish
42  *      effective flow control (for uni-directional transports!)
43  *   #4 UDP broadcasting logic must be extended to use the new API
44  *   #5 only validated addresses go to ATS for scheduling; that
45  *      also ensures we know the RTT 
46  *   #6 to ensure flow control and RTT are OK, we always do the
47  *      'validation', even if address comes from PEERSTORE
48  *   #7 
49  * - ACK handling / retransmission 
50  * - track RTT, distance, loss, etc.
51  * - DV data structures:
52  *   + learning
53  *   + forgetting 
54  *   + using them!
55  * - routing of messages (using DV data structures!)
56  * - handling of DV-boxed messages that need to be forwarded
57  * - backchannel message encryption & decryption
58  * - 
59  *
60  * Easy:
61  * - use ATS bandwidth allocation callback and schedule transmissions!
62  *
63  * Plan:
64  * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
65  *
66  * Later:
67  * - change transport-core API to provide proper flow control in both
68  *   directions, allow multiple messages per peer simultaneously (tag
69  *   confirmations with unique message ID), and replace quota-out with
70  *   proper flow control;
71  * - if messages are below MTU, consider adding ACKs and other stuff
72  *   (requires planning at receiver, and additional MST-style demultiplex
73  *    at receiver!)
74  * - could avoid copying body of message into each fragment and keep
75  *   fragments as just pointers into the original message and only 
76  *   fully build fragments just before transmission (optimization, should
77  *   reduce CPU and memory use)
78  *
79  * Design realizations / discussion:
80  * - communicators do flow control by calling MQ "notify sent"
81  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
82  *   or explicitly via background channel FC ACKs.  As long as the
83  *   channel is not full, they may 'notify sent' even if the other
84  *   peer has not yet confirmed receipt. The other peer confirming
85  *   is _only_ for FC, not for more reliable transmission; reliable
86  *   transmission (i.e. of fragments) is left to _transport_.
87  * - ACKs sent back in uni-directional communicators are done via
88  *   the background channel API; here transport _may_ initially
89  *   broadcast (with bounded # hops) if no path is known;
90  * - transport should _integrate_ DV-routing and build a view of
91  *   the network; then background channel traffic can be
92  *   routed via DV as well as explicit "DV" traffic.
93  * - background channel is also used for ACKs and NAT traversal support
94  * - transport service is responsible for AEAD'ing the background
95  *   channel, timestamps and monotonic time are used against replay
96  *   of old messages -> peerstore needs to be supplied with
97  *   "latest timestamps seen" data
98  * - if transport implements DV, we likely need a 3rd peermap
99  *   in addition to ephemerals and (direct) neighbours
100  *   => in this data structure, we should track ATS metrics (distance, RTT, etc.)
101  *   as well as latest timestamps seen, goodput, fragments for transmission, etc.
102  *   ==> check if stuff needs to be moved out of "Neighbour"
103  * - transport should encapsualte core-level messages and do its
104  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
105  *   for retransmission
106  */
107 #include "platform.h"
108 #include "gnunet_util_lib.h"
109 #include "gnunet_statistics_service.h"
110 #include "gnunet_transport_monitor_service.h"
111 #include "gnunet_peerstore_service.h"
112 #include "gnunet_hello_lib.h"
113 #include "gnunet_ats_transport_service.h"
114 #include "gnunet_signatures.h"
115 #include "transport.h"
116
117
118 /**
119  * What is the size we assume for a read operation in the
120  * absence of an MTU for the purpose of flow control?
121  */
122 #define IN_PACKET_SIZE_WITHOUT_MTU 128
123
124 /**
125  * If a queue delays the next message by more than this number
126  * of seconds we log a warning. Note: this is for testing,
127  * the value chosen here might be too aggressively low!
128  */
129 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
130
131 /**
132  * How long are ephemeral keys valid?
133  */
134 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
135
136 /**
137  * How long do we keep partially reassembled messages around before giving up?
138  */
139 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
140
141 /**
142  * How many messages can we have pending for a given communicator
143  * process before we start to throttle that communicator?
144  * 
145  * Used if a communicator might be CPU-bound and cannot handle the traffic.
146  */
147 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
148
149 /**
150  * How many messages can we have pending for a given session (queue to
151  * a particular peer via a communicator) process before we start to
152  * throttle that queue?
153  * 
154  * Used if ATS assigns more bandwidth to a particular transmission
155  * method than that transmission method can right now handle. (Yes,
156  * ATS should eventually notice utilization below allocation and
157  * adjust, but we don't want to queue up tons of messages in the
158  * meantime). Must be significantly below
159  * #COMMUNICATOR_TOTAL_QUEUE_LIMIT.
160  */
161 #define SESSION_QUEUE_LIMIT 32
162
163
164 GNUNET_NETWORK_STRUCT_BEGIN
165
166 /**
167  * Outer layer of an encapsulated backchannel message.
168  */
169 struct TransportBackchannelEncapsulationMessage
170 {
171   /**
172    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
173    */
174   struct GNUNET_MessageHeader header;
175
176   /**
177    * Distance the backchannel message has traveled, to be updated at
178    * each hop.  Used to bound the number of hops in case a backchannel
179    * message is broadcast and thus travels without routing
180    * information (during initial backchannel discovery).
181    */
182   uint32_t distance;
183
184   /**
185    * Target's peer identity (as backchannels may be transmitted
186    * indirectly, or even be broadcast).
187    */
188   struct GNUNET_PeerIdentity target;
189
190   /**
191    * Ephemeral key setup by the sender for @e target, used
192    * to encrypt the payload.
193    */
194   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
195
196   // FIXME: probably should add random IV here as well,
197   // especially if we re-use ephemeral keys!
198   
199   /**
200    * HMAC over the ciphertext of the encrypted, variable-size
201    * body that follows.  Verified via DH of @e target and
202    * @e ephemeral_key
203    */
204   struct GNUNET_HashCode hmac;
205
206   /* Followed by encrypted, variable-size payload */
207 };
208
209
210 /**
211  * Body by which a peer confirms that it is using an ephemeral key.
212  */
213 struct EphemeralConfirmation
214 {
215
216   /**
217    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
218    */
219   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
220
221   /**
222    * How long is this signature over the ephemeral key valid?
223    * Note that the receiver MUST IGNORE the absolute time, and
224    * only interpret the value as a mononic time and reject
225    * "older" values than the last one observed.  Even with this,
226    * there is no real guarantee against replay achieved here,
227    * as the latest timestamp is not persisted.  This is 
228    * necessary as we do not want to require synchronized 
229    * clocks and may not have a bidirectional communication
230    * channel.  Communicators must protect against replay
231    * attacks when using backchannel communication!
232    */
233   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
234
235   /**
236    * Target's peer identity.
237    */
238   struct GNUNET_PeerIdentity target;
239
240   /**
241    * Ephemeral key setup by the sender for @e target, used
242    * to encrypt the payload.
243    */
244   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
245
246 };
247
248
249 /**
250  * Plaintext of the variable-size payload that is encrypted
251  * within a `struct TransportBackchannelEncapsulationMessage`
252  */
253 struct TransportBackchannelRequestPayload
254 {
255
256   /**
257    * Sender's peer identity.
258    */
259   struct GNUNET_PeerIdentity sender;
260
261   /**
262    * Signature of the sender over an
263    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
264    */
265   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
266
267   /**
268    * How long is this signature over the ephemeral key
269    * valid?
270    */
271   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
272
273   /**
274    * Current monotonic time of the sending transport service.  Used to
275    * detect replayed messages.  Note that the receiver should remember
276    * a list of the recently seen timestamps and only reject messages
277    * if the timestamp is in the list, or the list is "full" and the
278    * timestamp is smaller than the lowest in the list.  This list of
279    * timestamps per peer should be persisted to guard against replays
280    * after restarts.
281    */
282   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
283
284   /* Followed by a `struct GNUNET_MessageHeader` with a message
285      for a communicator */
286
287   /* Followed by a 0-termianted string specifying the name of
288      the communicator which is to receive the message */
289
290 };
291
292
293 /**
294  * Outer layer of an encapsulated unfragmented application message sent
295  * over an unreliable channel.
296  */
297 struct TransportReliabilityBox
298 {
299   /**
300    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
301    */
302   struct GNUNET_MessageHeader header;
303
304   /**
305    * Number of messages still to be sent before a commulative
306    * ACK is requested.  Zero if an ACK is requested immediately.
307    * In NBO.  Note that the receiver may send the ACK faster
308    * if it believes that is reasonable.
309    */
310   uint32_t ack_countdown GNUNET_PACKED;
311
312   /**
313    * Unique ID of the message used for signalling receipt of
314    * messages sent over possibly unreliable channels.  Should
315    * be a random.
316    */
317   struct GNUNET_ShortHashCode msg_uuid;
318 };
319
320
321 /**
322  * Confirmation that the receiver got a
323  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
324  * confirmation may be transmitted over a completely different queue,
325  * so ACKs are identified by a combination of PID of sender and
326  * message UUID, without the queue playing any role!
327  */
328 struct TransportReliabilityAckMessage
329 {
330   /**
331    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
332    */
333   struct GNUNET_MessageHeader header;
334
335   /**
336    * Reserved. Zero.
337    */
338   uint32_t reserved GNUNET_PACKED;
339
340   /**
341    * How long was the ACK delayed relative to the average time of
342    * receipt of the messages being acknowledged?  Used to calculate
343    * the average RTT by taking the receipt time of the ack minus the
344    * average transmission time of the sender minus this value.
345    */
346   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
347
348   /* followed by any number of `struct GNUNET_ShortHashCode`
349      messages providing ACKs */
350 };
351
352
353 /**
354  * Outer layer of an encapsulated fragmented application message.
355  */
356 struct TransportFragmentBox
357 {
358   /**
359    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
360    */
361   struct GNUNET_MessageHeader header;
362
363   /**
364    * Unique ID of this fragment (and fragment transmission!). Will
365    * change even if a fragement is retransmitted to make each
366    * transmission attempt unique! Should be incremented by one for
367    * each fragment transmission. If a client receives a duplicate
368    * fragment (same @e frag_off), it must send
369    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
370    */
371   uint32_t frag_uuid GNUNET_PACKED;
372
373   /**
374    * Original message ID for of the message that all the1
375    * fragments belong to.  Must be the same for all fragments.
376    */ 
377   struct GNUNET_ShortHashCode msg_uuid;
378
379   /**
380    * Offset of this fragment in the overall message.
381    */ 
382   uint16_t frag_off GNUNET_PACKED;
383
384   /**
385    * Total size of the message that is being fragmented.
386    */ 
387   uint16_t msg_size GNUNET_PACKED;
388
389 };
390
391
392 /**
393  * Outer layer of an fragmented application message sent over a queue
394  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
395  * received, the receiver has two RTTs or 64 further fragments with
396  * the same basic message time to send an acknowledgement, possibly
397  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
398  * sent immediately once all fragments were sent.
399  */
400 struct TransportFragmentAckMessage
401 {
402   /**
403    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
404    */
405   struct GNUNET_MessageHeader header;
406
407   /**
408    * Unique ID of the lowest fragment UUID being acknowledged.
409    */
410   uint32_t frag_uuid GNUNET_PACKED;
411
412   /**
413    * Bitfield of up to 64 additional fragments following the
414    * @e msg_uuid being acknowledged by this message.
415    */ 
416   uint64_t extra_acks GNUNET_PACKED;
417
418   /**
419    * Original message ID for of the message that all the
420    * fragments belong to.
421    */ 
422   struct GNUNET_ShortHashCode msg_uuid;
423
424   /**
425    * How long was the ACK delayed relative to the average time of
426    * receipt of the fragments being acknowledged?  Used to calculate
427    * the average RTT by taking the receipt time of the ack minus the
428    * average transmission time of the sender minus this value.
429    */
430   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
431
432   /**
433    * How long until the receiver will stop trying reassembly
434    * of this message?
435    */
436   struct GNUNET_TIME_RelativeNBO reassembly_timeout;
437 };
438
439
440 /**
441  * Internal message used by transport for distance vector learning.
442  * If @e num_hops does not exceed the threshold, peers should append
443  * themselves to the peer list and flood the message (possibly only
444  * to a subset of their neighbours to limit discoverability of the
445  * network topology).  To the extend that the @e bidirectional bits
446  * are set, peers may learn the inverse paths even if they did not
447  * initiate. 
448  *
449  * Unless received on a bidirectional queue and @e num_hops just
450  * zero, peers that can forward to the initator should always try to
451  * forward to the initiator.
452  */
453 struct TransportDVLearn
454 {
455   /**
456    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
457    */
458   struct GNUNET_MessageHeader header;
459
460   /**
461    * Number of hops this messages has travelled, in NBO. Zero if
462    * sent by initiator.
463    */
464   uint16_t num_hops GNUNET_PACKED;
465
466   /**
467    * Bitmask of the last 16 hops indicating whether they are confirmed
468    * available (without DV) in both directions or not, in NBO.  Used
469    * to possibly instantly learn a path in both directions.  Each peer
470    * should shift this value by one to the left, and then set the
471    * lowest bit IF the current sender can be reached from it (without
472    * DV routing).  
473    */ 
474   uint16_t bidirectional GNUNET_PACKED;
475
476   /**
477    * Peers receiving this message and delaying forwarding to other
478    * peers for any reason should increment this value such as to 
479    * enable the origin to determine the actual network-only delay
480    * in addition to the real-time delay (assuming the message loops
481    * back to the origin).
482    */
483   struct GNUNET_TIME_Relative cummulative_non_network_delay;
484
485   /**
486    * Identity of the peer that started this learning activity.
487    */
488   struct GNUNET_PeerIdentity initiator;
489   
490   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
491      excluding the initiator of the DV trace; the last entry is the
492      current sender; the current peer must not be included. */
493   
494 };
495
496
497 /**
498  * Outer layer of an encapsulated message send over multiple hops.
499  * The path given only includes the identities of the subsequent
500  * peers, i.e. it will be empty if we are the receiver. Each
501  * forwarding peer should scan the list from the end, and if it can,
502  * forward to the respective peer. The list should then be shortened
503  * by all the entries up to and including that peer.  Each hop should
504  * also increment @e total_hops to allow the receiver to get a precise
505  * estimate on the number of hops the message travelled.  Senders must
506  * provide a learned path that thus should work, but intermediaries
507  * know of a shortcut, they are allowed to send the message via that
508  * shortcut.
509  *
510  * If a peer finds itself still on the list, it must drop the message.
511  */
512 struct TransportDVBox
513 {
514   /**
515    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
516    */
517   struct GNUNET_MessageHeader header;
518
519   /**
520    * Number of total hops this messages travelled. In NBO.
521    * @e origin sets this to zero, to be incremented at
522    * each hop.
523    */
524   uint16_t total_hops GNUNET_PACKED;
525
526   /**
527    * Number of hops this messages includes. In NBO.
528    */
529   uint16_t num_hops GNUNET_PACKED;
530   
531   /**
532    * Identity of the peer that originated the message.
533    */
534   struct GNUNET_PeerIdentity origin;
535
536   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
537      excluding the @e origin and the current peer, the last must be
538      the ultimate target; if @e num_hops is zero, the receiver of this
539      message is the ultimate target. */
540
541   /* Followed by the actual message, which itself may be
542      another box, but not a DV_LEARN or DV_BOX message! */
543 };
544
545
546 GNUNET_NETWORK_STRUCT_END
547
548
549
550 /**
551  * What type of client is the `struct TransportClient` about?
552  */
553 enum ClientType
554 {
555   /**
556    * We do not know yet (client is fresh).
557    */
558   CT_NONE = 0,
559
560   /**
561    * Is the CORE service, we need to forward traffic to it.
562    */
563   CT_CORE = 1,
564
565   /**
566    * It is a monitor, forward monitor data.
567    */
568   CT_MONITOR = 2,
569
570   /**
571    * It is a communicator, use for communication.
572    */
573   CT_COMMUNICATOR = 3
574 };
575
576
577 /**
578  * Entry in our cache of ephemeral keys we currently use.
579  * This way, we only sign an ephemeral once per @e target,
580  * and then can re-use it over multiple 
581  * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION
582  * messages (as signing is expensive).
583  */
584 struct EphemeralCacheEntry
585 {
586
587   /**
588    * Target's peer identity (we don't re-use ephemerals
589    * to limit linkability of messages).
590    */
591   struct GNUNET_PeerIdentity target;
592
593   /**
594    * Signature affirming @e ephemeral_key of type
595    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
596    */
597   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
598
599   /**
600    * How long is @e sender_sig valid
601    */
602   struct GNUNET_TIME_Absolute ephemeral_validity;
603
604   /**
605    * Our ephemeral key.
606    */
607   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
608
609   /**
610    * Our private ephemeral key.
611    */
612   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
613
614   /**
615    * Node in the ephemeral cache for this entry.
616    * Used for expiration.
617    */
618   struct GNUNET_CONTAINER_HeapNode *hn;
619 };
620
621
622 /**
623  * Client connected to the transport service.
624  */
625 struct TransportClient;
626
627
628 /**
629  * A neighbour that at least one communicator is connected to.
630  */
631 struct Neighbour;
632
633
634 /**
635  * Entry in our #dv_routes table, representing a (set of) distance
636  * vector routes to a particular peer.
637  */
638 struct DistanceVector;
639
640 /**
641  * One possible hop towards a DV target.
642  */
643 struct DistanceVectorHop
644 {
645
646   /**
647    * Kept in a MDLL, sorted by @e timeout.
648    */ 
649   struct DistanceVectorHop *next_dv;
650
651   /**
652    * Kept in a MDLL, sorted by @e timeout.
653    */ 
654   struct DistanceVectorHop *prev_dv;
655
656   /**
657    * Kept in a MDLL.
658    */ 
659   struct DistanceVectorHop *next_neighbour;
660
661   /**
662    * Kept in a MDLL.
663    */ 
664   struct DistanceVectorHop *prev_neighbour;
665
666   /**
667    * What would be the next hop to @e target?
668    */ 
669   struct Neighbour *next_hop;
670
671   /**
672    * Distance vector entry this hop belongs with.
673    */ 
674   struct DistanceVector *dv;
675   
676   /**
677    * Array of @e distance hops to the target, excluding @e next_hop.
678    * NULL if the entire path is us to @e next_hop to `target`. Allocated
679    * at the end of this struct.
680    */ 
681   const struct GNUNET_PeerIdentity *path;
682
683   /**
684    * At what time do we forget about this path unless we see it again
685    * while learning?
686    */
687   struct GNUNET_TIME_Absolute timeout;
688   
689   /**
690    * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
691    * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
692    */ 
693   unsigned int distance;
694 };
695
696
697 /**
698  * Entry in our #dv_routes table, representing a (set of) distance
699  * vector routes to a particular peer.
700  */
701 struct DistanceVector
702 {
703
704   /**
705    * To which peer is this a route?
706    */
707   struct GNUNET_PeerIdentity target;
708
709   /**
710    * Known paths to @e target.
711    */ 
712   struct DistanceVectorHop *dv_head;
713
714   /**
715    * Known paths to @e target.
716    */ 
717   struct DistanceVectorHop *dv_tail;
718
719   /**
720    * Task scheduled to purge expired paths from @e dv_head MDLL.
721    */
722   struct GNUNET_SCHEDULER_Task *timeout_task;
723 };
724
725
726 /**
727  * Entry identifying transmission in one of our `struct
728  * GNUNET_ATS_Sessions` which still awaits an ACK.  This is used to
729  * ensure we do not overwhelm a communicator and limit the number of
730  * messages outstanding per communicator (say in case communicator is
731  * CPU bound) and per queue (in case ATS bandwidth allocation exceeds
732  * what the communicator can actually provide towards a particular
733  * peer/target).
734  */
735 struct QueueEntry
736 {
737
738   /**
739    * Kept as a DLL.
740    */ 
741   struct QueueEntry *next;
742
743   /**
744    * Kept as a DLL.
745    */ 
746   struct QueueEntry *prev;
747
748   /**
749    * ATS session this entry is queued with.
750    */
751   struct GNUNET_ATS_Session *session;
752   
753   /**
754    * Message ID used for this message with the queue used for transmission.
755    */
756   uint64_t mid;
757 };
758
759
760 /**
761  * An ATS session is a message queue provided by a communicator
762  * via which we can reach a particular neighbour.
763  */
764 struct GNUNET_ATS_Session
765 {
766   /**
767    * Kept in a MDLL.
768    */
769   struct GNUNET_ATS_Session *next_neighbour;
770
771   /**
772    * Kept in a MDLL.
773    */
774   struct GNUNET_ATS_Session *prev_neighbour;
775
776   /**
777    * Kept in a MDLL.
778    */
779   struct GNUNET_ATS_Session *prev_client;
780
781   /**
782    * Kept in a MDLL.
783    */
784   struct GNUNET_ATS_Session *next_client;
785
786   /**
787    * Head of DLL of unacked transmission requests.
788    */ 
789   struct QueueEntry *queue_head;
790
791   /**
792    * End of DLL of unacked transmission requests.
793    */ 
794   struct QueueEntry *queue_tail;
795
796   /**
797    * Which neighbour is this ATS session for?
798    */
799   struct Neighbour *neighbour;
800
801   /**
802    * Which communicator offers this ATS session?
803    */
804   struct TransportClient *tc;
805
806   /**
807    * Address served by the ATS session.
808    */
809   const char *address;
810
811   /**
812    * Handle by which we inform ATS about this queue.
813    */
814   struct GNUNET_ATS_SessionRecord *sr;
815
816   /**
817    * Task scheduled for the time when this queue can (likely) transmit the
818    * next message. Still needs to check with the @e tracker_out to be sure.
819    */ 
820   struct GNUNET_SCHEDULER_Task *transmit_task;
821   
822   /**
823    * Our current RTT estimate for this ATS session.
824    */
825   struct GNUNET_TIME_Relative rtt;
826
827   /**
828    * Message ID generator for transmissions on this queue.
829    */ 
830   uint64_t mid_gen;
831   
832   /**
833    * Unique identifier of this ATS session with the communicator.
834    */
835   uint32_t qid;
836
837   /**
838    * Maximum transmission unit supported by this ATS session.
839    */
840   uint32_t mtu;
841
842   /**
843    * Distance to the target of this ATS session.
844    */
845   uint32_t distance;
846
847   /**
848    * Messages pending.
849    */
850   uint32_t num_msg_pending;
851
852   /**
853    * Bytes pending.
854    */
855   uint32_t num_bytes_pending;
856
857   /**
858    * Length of the DLL starting at @e queue_head.
859    */
860   unsigned int queue_length;
861   
862   /**
863    * Network type offered by this ATS session.
864    */
865   enum GNUNET_NetworkType nt;
866
867   /**
868    * Connection status for this ATS session.
869    */
870   enum GNUNET_TRANSPORT_ConnectionStatus cs;
871
872   /**
873    * How much outbound bandwidth do we have available for this session?
874    */
875   struct GNUNET_BANDWIDTH_Tracker tracker_out;
876
877   /**
878    * How much inbound bandwidth do we have available for this session?
879    */
880   struct GNUNET_BANDWIDTH_Tracker tracker_in;
881 };
882
883
884 /**
885  * Information we keep for a message that we are reassembling.
886  */ 
887 struct ReassemblyContext
888 {
889
890   /**
891    * Original message ID for of the message that all the
892    * fragments belong to.
893    */ 
894   struct GNUNET_ShortHashCode msg_uuid;
895
896   /**
897    * Which neighbour is this context for?
898    */
899   struct Neighbour *neighbour;
900
901   /**
902    * Entry in the reassembly heap (sorted by expiration).
903    */ 
904   struct GNUNET_CONTAINER_HeapNode *hn;
905
906   /**
907    * Bitfield with @e msg_size bits representing the positions
908    * where we have received fragments.  When we receive a fragment,
909    * we check the bits in @e bitfield before incrementing @e msg_missing.
910    *
911    * Allocated after the reassembled message.
912    */
913   uint8_t *bitfield;
914
915   /**
916    * Task for sending ACK. We may send ACKs either because of hitting
917    * the @e extra_acks limit, or based on time and @e num_acks.  This
918    * task is for the latter case.
919    */
920   struct GNUNET_SCHEDULER_Task *ack_task;
921   
922   /**
923    * At what time will we give up reassembly of this message?
924    */
925   struct GNUNET_TIME_Absolute reassembly_timeout;
926
927   /**
928    * Average delay of all acks in @e extra_acks and @e frag_uuid.
929    * Should be reset to zero when @e num_acks is set to 0.
930    */
931   struct GNUNET_TIME_Relative avg_ack_delay;
932
933   /**
934    * Time we received the last fragment.  @e avg_ack_delay must be
935    * incremented by now - @e last_frag multiplied by @e num_acks.
936    */
937   struct GNUNET_TIME_Absolute last_frag;
938
939   /**
940    * Bitfield of up to 64 additional fragments following @e frag_uuid
941    * to be acknowledged in the next cummulative ACK.
942    */
943   uint64_t extra_acks;
944   
945   /**
946    * Unique ID of the lowest fragment UUID to be acknowledged in the
947    * next cummulative ACK.  Only valid if @e num_acks > 0.
948    */
949   uint32_t frag_uuid;
950
951   /**
952    * Number of ACKs we have accumulated so far.  Reset to 0
953    * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
954    */
955   unsigned int num_acks;
956   
957   /**
958    * How big is the message we are reassembling in total?
959    */
960   uint16_t msg_size;
961
962   /**
963    * How many bytes of the message are still missing?  Defragmentation
964    * is complete when @e msg_missing == 0.
965    */
966   uint16_t msg_missing;
967
968   /* Followed by @e msg_size bytes of the (partially) defragmented original message */
969
970   /* Followed by @e bitfield data */
971 };
972
973
974 /**
975  * A neighbour that at least one communicator is connected to.
976  */
977 struct Neighbour
978 {
979
980   /**
981    * Which peer is this about?
982    */
983   struct GNUNET_PeerIdentity pid;
984
985   /**
986    * Map with `struct ReassemblyContext` structs for fragments under
987    * reassembly. May be NULL if we currently have no fragments from
988    * this @e pid (lazy initialization).
989    */ 
990   struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
991
992   /**
993    * Heap with `struct ReassemblyContext` structs for fragments under
994    * reassembly. May be NULL if we currently have no fragments from
995    * this @e pid (lazy initialization).
996    */ 
997   struct GNUNET_CONTAINER_Heap *reassembly_heap;
998
999   /**
1000    * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
1001    */
1002   struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
1003   
1004   /**
1005    * Head of list of messages pending for this neighbour.
1006    */
1007   struct PendingMessage *pending_msg_head;
1008
1009   /**
1010    * Tail of list of messages pending for this neighbour.
1011    */
1012   struct PendingMessage *pending_msg_tail;
1013
1014   /**
1015    * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1016    * purged if this neighbour goes down.
1017    */ 
1018   struct DistanceVectorHop *dv_head;
1019
1020   /**
1021    * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1022    * purged if this neighbour goes down.
1023    */ 
1024   struct DistanceVectorHop *dv_tail;
1025
1026   /**
1027    * Head of DLL of ATS sessions to this peer.
1028    */
1029   struct GNUNET_ATS_Session *session_head;
1030
1031   /**
1032    * Tail of DLL of ATS sessions to this peer.
1033    */
1034   struct GNUNET_ATS_Session *session_tail;
1035
1036   /**
1037    * Task run to cleanup pending messages that have exceeded their timeout.
1038    */  
1039   struct GNUNET_SCHEDULER_Task *timeout_task;
1040
1041   /**
1042    * Quota at which CORE is allowed to transmit to this peer
1043    * according to ATS.
1044    *
1045    * FIXME: not yet used, tricky to get right given multiple queues!
1046    *        (=> Idea: let ATS set a quota per queue and we add them up here?)
1047    * FIXME: how do we set this value initially when we tell CORE?
1048    *    Options: start at a minimum value or at literally zero (before ATS?)
1049    *         (=> Current thought: clean would be zero!)
1050    */
1051   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1052
1053   /**
1054    * What is the earliest timeout of any message in @e pending_msg_tail?
1055    */ 
1056   struct GNUNET_TIME_Absolute earliest_timeout;
1057   
1058 };
1059
1060
1061 /**
1062  * Types of different pending messages.
1063  */ 
1064 enum PendingMessageType
1065 {
1066
1067   /**
1068    * Ordinary message received from the CORE service.
1069    */
1070   PMT_CORE = 0,
1071
1072   /**
1073    * Fragment box.
1074    */
1075   PMT_FRAGMENT_BOX = 1,
1076
1077   /**
1078    * Reliability box.
1079    */
1080   PMT_RELIABILITY_BOX = 2,
1081
1082   /**
1083    * Any type of acknowledgement.
1084    */
1085   PMT_ACKNOWLEDGEMENT = 3
1086
1087   
1088 };
1089
1090
1091 /**
1092  * Transmission request that is awaiting delivery.  The original
1093  * transmission requests from CORE may be too big for some queues.
1094  * In this case, a *tree* of fragments is created.  At each
1095  * level of the tree, fragments are kept in a DLL ordered by which
1096  * fragment should be sent next (at the head).  The tree is searched
1097  * top-down, with the original message at the root.
1098  *
1099  * To select a node for transmission, first it is checked if the
1100  * current node's message fits with the MTU.  If it does not, we
1101  * either calculate the next fragment (based on @e frag_off) from the
1102  * current node, or, if all fragments have already been created,
1103  * descend to the @e head_frag.  Even though the node was already
1104  * fragmented, the fragment may be too big if the fragment was 
1105  * generated for a queue with a larger MTU. In this case, the node
1106  * may be fragmented again, thus creating a tree.
1107  *
1108  * When acknowledgements for fragments are received, the tree
1109  * must be pruned, removing those parts that were already 
1110  * acknowledged.  When fragments are sent over a reliable 
1111  * channel, they can be immediately removed.
1112  *
1113  * If a message is ever fragmented, then the original "full" message
1114  * is never again transmitted (even if it fits below the MTU), and
1115  * only (remaining) fragments are sent.
1116  */
1117 struct PendingMessage
1118 {
1119   /**
1120    * Kept in a MDLL of messages for this @a target.
1121    */
1122   struct PendingMessage *next_neighbour;
1123
1124   /**
1125    * Kept in a MDLL of messages for this @a target.
1126    */
1127   struct PendingMessage *prev_neighbour;
1128
1129   /**
1130    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1131    */
1132   struct PendingMessage *next_client;
1133   
1134   /**
1135    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
1136    */
1137   struct PendingMessage *prev_client;
1138
1139   /**
1140    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1141    */
1142   struct PendingMessage *next_frag;
1143   
1144   /**
1145    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
1146    */
1147   struct PendingMessage *prev_frag;
1148
1149   /**
1150    * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1151    */ 
1152   struct PendingMessage *bpm;
1153   
1154   /**
1155    * Target of the request.
1156    */
1157   struct Neighbour *target;
1158       
1159   /**
1160    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1161    */
1162   struct TransportClient *client;
1163   
1164   /**
1165    * Head of a MDLL of fragments created for this core message.
1166    */
1167   struct PendingMessage *head_frag;
1168   
1169   /**
1170    * Tail of a MDLL of fragments created for this core message.
1171    */
1172   struct PendingMessage *tail_frag;
1173
1174   /**
1175    * Our parent in the fragmentation tree.
1176    */
1177   struct PendingMessage *frag_parent;
1178     
1179   /**
1180    * At what time should we give up on the transmission (and no longer retry)?
1181    */
1182   struct GNUNET_TIME_Absolute timeout;
1183
1184   /**
1185    * What is the earliest time for us to retry transmission of this message?
1186    */
1187   struct GNUNET_TIME_Absolute next_attempt;
1188
1189   /**
1190    * UUID to use for this message (used for reassembly of fragments, only
1191    * initialized if @e msg_uuid_set is #GNUNET_YES).
1192    */
1193   struct GNUNET_ShortHashCode msg_uuid;
1194   
1195   /**
1196    * Counter incremented per generated fragment.
1197    */ 
1198   uint32_t frag_uuidgen;
1199   
1200   /**
1201    * Type of the pending message.
1202    */
1203   enum PendingMessageType pmt;
1204
1205   /**
1206    * Size of the original message.
1207    */
1208   uint16_t bytes_msg;
1209
1210   /**
1211    * Offset at which we should generate the next fragment.
1212    */ 
1213   uint16_t frag_off;
1214
1215   /**
1216    * #GNUNET_YES once @e msg_uuid was initialized
1217    */
1218   int16_t msg_uuid_set;
1219   
1220   /* Followed by @e bytes_msg to transmit */
1221 };
1222
1223
1224 /**
1225  * One of the addresses of this peer.
1226  */
1227 struct AddressListEntry
1228 {
1229
1230   /**
1231    * Kept in a DLL.
1232    */
1233   struct AddressListEntry *next;
1234
1235   /**
1236    * Kept in a DLL.
1237    */
1238   struct AddressListEntry *prev;
1239
1240   /**
1241    * Which communicator provides this address?
1242    */
1243   struct TransportClient *tc;
1244
1245   /**
1246    * The actual address.
1247    */
1248   const char *address;
1249
1250   /**
1251    * Current context for storing this address in the peerstore.
1252    */
1253   struct GNUNET_PEERSTORE_StoreContext *sc;
1254
1255   /**
1256    * Task to periodically do @e st operation.
1257    */
1258   struct GNUNET_SCHEDULER_Task *st;
1259
1260   /**
1261    * What is a typical lifetime the communicator expects this
1262    * address to have? (Always from now.)
1263    */
1264   struct GNUNET_TIME_Relative expiration;
1265
1266   /**
1267    * Address identifier used by the communicator.
1268    */
1269   uint32_t aid;
1270
1271   /**
1272    * Network type offered by this address.
1273    */
1274   enum GNUNET_NetworkType nt;
1275
1276 };
1277
1278
1279 /**
1280  * Client connected to the transport service.
1281  */
1282 struct TransportClient
1283 {
1284
1285   /**
1286    * Kept in a DLL.
1287    */
1288   struct TransportClient *next;
1289
1290   /**
1291    * Kept in a DLL.
1292    */
1293   struct TransportClient *prev;
1294
1295   /**
1296    * Handle to the client.
1297    */
1298   struct GNUNET_SERVICE_Client *client;
1299
1300   /**
1301    * Message queue to the client.
1302    */
1303   struct GNUNET_MQ_Handle *mq;
1304
1305   /**
1306    * What type of client is this?
1307    */
1308   enum ClientType type;
1309
1310   union
1311   {
1312
1313     /**
1314      * Information for @e type #CT_CORE.
1315      */
1316     struct {
1317
1318       /**
1319        * Head of list of messages pending for this client, sorted by 
1320        * transmission time ("next_attempt" + possibly internal prioritization).
1321        */
1322       struct PendingMessage *pending_msg_head;
1323
1324       /**
1325        * Tail of list of messages pending for this client.
1326        */
1327       struct PendingMessage *pending_msg_tail;
1328
1329     } core;
1330
1331     /**
1332      * Information for @e type #CT_MONITOR.
1333      */
1334     struct {
1335
1336       /**
1337        * Peer identity to monitor the addresses of.
1338        * Zero to monitor all neighbours.  Valid if
1339        * @e type is #CT_MONITOR.
1340        */
1341       struct GNUNET_PeerIdentity peer;
1342
1343       /**
1344        * Is this a one-shot monitor?
1345        */
1346       int one_shot;
1347
1348     } monitor;
1349
1350
1351     /**
1352      * Information for @e type #CT_COMMUNICATOR.
1353      */
1354     struct {
1355       /**
1356        * If @e type is #CT_COMMUNICATOR, this communicator
1357        * supports communicating using these addresses.
1358        */
1359       char *address_prefix;
1360
1361       /**
1362        * Head of DLL of queues offered by this communicator.
1363        */
1364       struct GNUNET_ATS_Session *session_head;
1365
1366       /**
1367        * Tail of DLL of queues offered by this communicator.
1368        */
1369       struct GNUNET_ATS_Session *session_tail;
1370
1371       /**
1372        * Head of list of the addresses of this peer offered by this communicator.
1373        */
1374       struct AddressListEntry *addr_head;
1375
1376       /**
1377        * Tail of list of the addresses of this peer offered by this communicator.
1378        */
1379       struct AddressListEntry *addr_tail;
1380
1381       /**
1382        * Number of queue entries in all queues to this communicator. Used
1383        * throttle sending to a communicator if we see that the communicator
1384        * is globally unable to keep up.
1385        */
1386       unsigned int total_queue_length;
1387       
1388       /**
1389        * Characteristics of this communicator.
1390        */
1391       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1392
1393     } communicator;
1394
1395   } details;
1396
1397 };
1398
1399
1400 /**
1401  * Head of linked list of all clients to this service.
1402  */
1403 static struct TransportClient *clients_head;
1404
1405 /**
1406  * Tail of linked list of all clients to this service.
1407  */
1408 static struct TransportClient *clients_tail;
1409
1410 /**
1411  * Statistics handle.
1412  */
1413 static struct GNUNET_STATISTICS_Handle *GST_stats;
1414
1415 /**
1416  * Configuration handle.
1417  */
1418 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1419
1420 /**
1421  * Our public key.
1422  */
1423 static struct GNUNET_PeerIdentity GST_my_identity;
1424
1425 /**
1426  * Our private key.
1427  */
1428 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1429
1430 /**
1431  * Map from PIDs to `struct Neighbour` entries.  A peer is
1432  * a neighbour if we have an MQ to it from some communicator.
1433  */
1434 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1435
1436 /**
1437  * Map from PIDs to `struct DistanceVector` entries describing
1438  * known paths to the peer.
1439  */
1440 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1441
1442 /**
1443  * Database for peer's HELLOs.
1444  */
1445 static struct GNUNET_PEERSTORE_Handle *peerstore;
1446
1447 /**
1448  * Heap sorting `struct EphemeralCacheEntry` by their
1449  * key/signature validity.
1450  */
1451 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1452
1453 /**
1454  * Hash map for looking up `struct EphemeralCacheEntry`s
1455  * by peer identity. (We may have ephemerals in our
1456  * cache for which we do not have a neighbour entry,
1457  * and similar many neighbours may not need ephemerals,
1458  * so we use a second map.)
1459  */
1460 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1461
1462 /**
1463  * Task to free expired ephemerals.
1464  */
1465 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
1466
1467 /**
1468  * Our connection to ATS for allocation and bootstrapping.
1469  */
1470 static struct GNUNET_ATS_TransportHandle *ats;
1471
1472
1473 /**
1474  * Free cached ephemeral key.
1475  *
1476  * @param ece cached signature to free
1477  */
1478 static void
1479 free_ephemeral (struct EphemeralCacheEntry *ece)
1480 {
1481   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1482                                         &ece->target,
1483                                         ece);
1484   GNUNET_CONTAINER_heap_remove_node (ece->hn);
1485   GNUNET_free (ece);
1486 }
1487
1488
1489 /**
1490  * Lookup neighbour record for peer @a pid.
1491  *
1492  * @param pid neighbour to look for
1493  * @return NULL if we do not have this peer as a neighbour
1494  */
1495 static struct Neighbour *
1496 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1497 {
1498   return GNUNET_CONTAINER_multipeermap_get (neighbours,
1499                                             pid);
1500 }
1501
1502
1503 /**
1504  * Details about what to notify monitors about.
1505  */
1506 struct MonitorEvent
1507 {
1508   /**
1509    * @deprecated To be discussed if we keep these...
1510    */
1511   struct GNUNET_TIME_Absolute last_validation;
1512   struct GNUNET_TIME_Absolute valid_until;
1513   struct GNUNET_TIME_Absolute next_validation;
1514
1515   /**
1516    * Current round-trip time estimate.
1517    */
1518   struct GNUNET_TIME_Relative rtt;
1519
1520   /**
1521    * Connection status.
1522    */
1523   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1524
1525   /**
1526    * Messages pending.
1527    */
1528   uint32_t num_msg_pending;
1529
1530   /**
1531    * Bytes pending.
1532    */
1533   uint32_t num_bytes_pending;
1534
1535
1536 };
1537
1538
1539 /**
1540  * Free a @dvh, and if it is the last path to the `target`,also
1541  * free the associated DV entry in #dv_routes.
1542  *
1543  * @param dvh hop to free
1544  */
1545 static void
1546 free_distance_vector_hop (struct DistanceVectorHop *dvh)
1547 {
1548   struct Neighbour *n = dvh->next_hop;
1549   struct DistanceVector *dv = dvh->dv;
1550
1551   GNUNET_CONTAINER_MDLL_remove (neighbour,
1552                                 n->dv_head,
1553                                 n->dv_tail,
1554                                 dvh);
1555   GNUNET_CONTAINER_MDLL_remove (dv,
1556                                 dv->dv_head,
1557                                 dv->dv_tail,
1558                                 dvh);
1559   GNUNET_free (dvh);
1560   if (NULL == dv->dv_head)
1561   {
1562     GNUNET_assert (GNUNET_YES == 
1563                    GNUNET_CONTAINER_multipeermap_remove (dv_routes,
1564                                                          &dv->target,
1565                                                          dv));
1566     if (NULL != dv->timeout_task)
1567       GNUNET_SCHEDULER_cancel (dv->timeout_task);
1568     GNUNET_free (dv);
1569   }
1570 }
1571
1572
1573 /**
1574  * Free entry in #dv_routes.  First frees all hops to the target, and
1575  * the last target will implicitly free @a dv as well.
1576  *
1577  * @param dv route to free
1578  */
1579 static void
1580 free_dv_route (struct DistanceVector *dv)
1581 {
1582   struct DistanceVectorHop *dvh;
1583
1584   while (NULL != (dvh = dv->dv_head))
1585     free_distance_vector_hop (dvh);
1586 }
1587
1588
1589 /**
1590  * Notify monitor @a tc about an event.  That @a tc
1591  * cares about the event has already been checked.
1592  *
1593  * Send @a tc information in @a me about a @a peer's status with
1594  * respect to some @a address to all monitors that care.
1595  *
1596  * @param tc monitor to inform
1597  * @param peer peer the information is about
1598  * @param address address the information is about
1599  * @param nt network type associated with @a address
1600  * @param me detailed information to transmit
1601  */
1602 static void
1603 notify_monitor (struct TransportClient *tc,
1604                 const struct GNUNET_PeerIdentity *peer,
1605                 const char *address,
1606                 enum GNUNET_NetworkType nt,
1607                 const struct MonitorEvent *me)
1608 {
1609   struct GNUNET_MQ_Envelope *env;
1610   struct GNUNET_TRANSPORT_MonitorData *md;
1611   size_t addr_len = strlen (address) + 1;
1612
1613   env = GNUNET_MQ_msg_extra (md,
1614                              addr_len,
1615                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1616   md->nt = htonl ((uint32_t) nt);
1617   md->peer = *peer;
1618   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1619   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1620   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1621   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1622   md->cs = htonl ((uint32_t) me->cs);
1623   md->num_msg_pending = htonl (me->num_msg_pending);
1624   md->num_bytes_pending = htonl (me->num_bytes_pending);
1625   memcpy (&md[1],
1626           address,
1627           addr_len);
1628   GNUNET_MQ_send (tc->mq,
1629                   env);
1630 }
1631
1632
1633 /**
1634  * Send information in @a me about a @a peer's status with respect
1635  * to some @a address to all monitors that care.
1636  *
1637  * @param peer peer the information is about
1638  * @param address address the information is about
1639  * @param nt network type associated with @a address
1640  * @param me detailed information to transmit
1641  */
1642 static void
1643 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1644                  const char *address,
1645                  enum GNUNET_NetworkType nt,
1646                  const struct MonitorEvent *me)
1647 {
1648   static struct GNUNET_PeerIdentity zero;
1649
1650   for (struct TransportClient *tc = clients_head;
1651        NULL != tc;
1652        tc = tc->next)
1653   {
1654     if (CT_MONITOR != tc->type)
1655       continue;
1656     if (tc->details.monitor.one_shot)
1657       continue;
1658     if ( (0 != memcmp (&tc->details.monitor.peer,
1659                        &zero,
1660                        sizeof (zero))) &&
1661          (0 != memcmp (&tc->details.monitor.peer,
1662                        peer,
1663                        sizeof (*peer))) )
1664       continue;
1665     notify_monitor (tc,
1666                     peer,
1667                     address,
1668                     nt,
1669                     me);
1670   }
1671 }
1672
1673
1674 /**
1675  * Called whenever a client connects.  Allocates our
1676  * data structures associated with that client.
1677  *
1678  * @param cls closure, NULL
1679  * @param client identification of the client
1680  * @param mq message queue for the client
1681  * @return our `struct TransportClient`
1682  */
1683 static void *
1684 client_connect_cb (void *cls,
1685                    struct GNUNET_SERVICE_Client *client,
1686                    struct GNUNET_MQ_Handle *mq)
1687 {
1688   struct TransportClient *tc;
1689
1690   tc = GNUNET_new (struct TransportClient);
1691   tc->client = client;
1692   tc->mq = mq;
1693   GNUNET_CONTAINER_DLL_insert (clients_head,
1694                                clients_tail,
1695                                tc);
1696   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697               "Client %p connected\n",
1698               tc);
1699   return tc;
1700 }
1701
1702
1703 /**
1704  * Free @a rc
1705  *
1706  * @param rc data structure to free
1707  */
1708 static void
1709 free_reassembly_context (struct ReassemblyContext *rc)
1710 {
1711   struct Neighbour *n = rc->neighbour;
1712
1713   GNUNET_assert (rc ==
1714                  GNUNET_CONTAINER_heap_remove_node (rc->hn));
1715   GNUNET_assert (GNUNET_OK ==
1716                  GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
1717                                                         &rc->msg_uuid,
1718                                                         rc));
1719   GNUNET_free (rc);
1720 }
1721
1722
1723 /**
1724  * Task run to clean up reassembly context of a neighbour that have expired.
1725  *
1726  * @param cls a `struct Neighbour`
1727  */
1728 static void
1729 reassembly_cleanup_task (void *cls)
1730 {
1731   struct Neighbour *n = cls;
1732   struct ReassemblyContext *rc;
1733
1734   n->reassembly_timeout_task = NULL;
1735   while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
1736   {
1737     if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
1738     {
1739       free_reassembly_context (rc);
1740       continue;
1741     }
1742     GNUNET_assert (NULL == n->reassembly_timeout_task);
1743     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
1744                                                           &reassembly_cleanup_task,
1745                                                           n);
1746     return;
1747   }
1748 }
1749
1750
1751 /**
1752  * function called to #free_reassembly_context().
1753  *
1754  * @param cls NULL
1755  * @param key unused
1756  * @param value a `struct ReassemblyContext` to free
1757  * @return #GNUNET_OK (continue iteration)
1758  */
1759 static int
1760 free_reassembly_cb (void *cls,
1761                     const struct GNUNET_ShortHashCode *key,
1762                     void *value)
1763 {
1764   struct ReassemblyContext *rc = value;
1765   (void) cls;
1766   (void) key;
1767   
1768   free_reassembly_context (rc);
1769   return GNUNET_OK;
1770 }
1771
1772
1773 /**
1774  * Release memory used by @a neighbour.
1775  *
1776  * @param neighbour neighbour entry to free
1777  */
1778 static void
1779 free_neighbour (struct Neighbour *neighbour)
1780 {
1781   struct DistanceVectorHop *dvh;
1782   
1783   GNUNET_assert (NULL == neighbour->session_head);
1784   GNUNET_assert (GNUNET_YES ==
1785                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
1786                                                        &neighbour->pid,
1787                                                        neighbour));
1788   if (NULL != neighbour->timeout_task)
1789     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1790   if (NULL != neighbour->reassembly_map)
1791   {
1792     GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
1793                                             &free_reassembly_cb,
1794                                             NULL);
1795     GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
1796     neighbour->reassembly_map = NULL;
1797     GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
1798     neighbour->reassembly_heap = NULL;
1799   }
1800   while (NULL != (dvh = neighbour->dv_head))
1801     free_distance_vector_hop (dvh);
1802   if (NULL != neighbour->reassembly_timeout_task)
1803     GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
1804   GNUNET_free (neighbour);
1805 }
1806
1807
1808 /**
1809  * Send message to CORE clients that we lost a connection.
1810  *
1811  * @param tc client to inform (must be CORE client)
1812  * @param pid peer the connection is for
1813  * @param quota_out current quota for the peer
1814  */
1815 static void
1816 core_send_connect_info (struct TransportClient *tc,
1817                         const struct GNUNET_PeerIdentity *pid,
1818                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1819 {
1820   struct GNUNET_MQ_Envelope *env;
1821   struct ConnectInfoMessage *cim;
1822
1823   GNUNET_assert (CT_CORE == tc->type);
1824   env = GNUNET_MQ_msg (cim,
1825                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1826   cim->quota_out = quota_out;
1827   cim->id = *pid;
1828   GNUNET_MQ_send (tc->mq,
1829                   env);
1830 }
1831
1832
1833 /**
1834  * Send message to CORE clients that we gained a connection
1835  *
1836  * @param pid peer the queue was for
1837  * @param quota_out current quota for the peer
1838  */
1839 static void
1840 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1841                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1842 {
1843   for (struct TransportClient *tc = clients_head;
1844        NULL != tc;
1845        tc = tc->next)
1846   {
1847     if (CT_CORE != tc->type)
1848       continue;
1849     core_send_connect_info (tc,
1850                             pid,
1851                             quota_out);
1852   }
1853 }
1854
1855
1856 /**
1857  * Send message to CORE clients that we lost a connection.
1858  *
1859  * @param pid peer the connection was for
1860  */
1861 static void
1862 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1863 {
1864   for (struct TransportClient *tc = clients_head;
1865        NULL != tc;
1866        tc = tc->next)
1867   {
1868     struct GNUNET_MQ_Envelope *env;
1869     struct DisconnectInfoMessage *dim;
1870
1871     if (CT_CORE != tc->type)
1872       continue;
1873     env = GNUNET_MQ_msg (dim,
1874                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1875     dim->peer = *pid;
1876     GNUNET_MQ_send (tc->mq,
1877                     env);
1878   }
1879 }
1880
1881
1882 /**
1883  * We believe we are ready to transmit a message on a queue. Double-checks
1884  * with the queue's "tracker_out" and then gives the message to the 
1885  * communicator for transmission (updating the tracker, and re-scheduling
1886  * itself if applicable).  
1887  *
1888  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
1889  */ 
1890 static void
1891 transmit_on_queue (void *cls);
1892
1893
1894 /**
1895  * Schedule next run of #transmit_on_queue().  Does NOTHING if 
1896  * we should run immediately or if the message queue is empty.
1897  * Test for no task being added AND queue not being empty to
1898  * transmit immediately afterwards!  This function must only
1899  * be called if the message queue is non-empty!
1900  *
1901  * @param queue the queue to do scheduling for
1902  */ 
1903 static void
1904 schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
1905 {
1906   struct Neighbour *n = queue->neighbour;
1907   struct PendingMessage *pm = n->pending_msg_head;
1908   struct GNUNET_TIME_Relative out_delay;
1909   unsigned int wsize;
1910
1911   GNUNET_assert (NULL != pm);
1912   if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1913   {
1914     GNUNET_STATISTICS_update (GST_stats,
1915                               "# Transmission throttled due to communicator queue limit",
1916                               1,
1917                               GNUNET_NO);    
1918     return;
1919   }
1920   if (queue->queue_length >= SESSION_QUEUE_LIMIT)
1921   {
1922     GNUNET_STATISTICS_update (GST_stats,
1923                               "# Transmission throttled due to session queue limit",
1924                               1,
1925                               GNUNET_NO);    
1926     return;
1927   }
1928       
1929   wsize = (0 == queue->mtu)
1930     ? pm->bytes_msg /* FIXME: add overheads? */
1931     : queue->mtu;
1932   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1933                                                   wsize);
1934   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1935                                         out_delay);
1936   if (0 == out_delay.rel_value_us)
1937     return; /* we should run immediately! */
1938   /* queue has changed since we were scheduled, reschedule again */
1939   queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
1940                                                        &transmit_on_queue,
1941                                                        queue);
1942   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1943     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1944                 "Next transmission on queue `%s' in %s (high delay)\n",
1945                 queue->address,
1946                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1947                                                         GNUNET_YES));
1948   else
1949     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1950                 "Next transmission on queue `%s' in %s\n",
1951                 queue->address,
1952                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1953                                                         GNUNET_YES));
1954 }
1955
1956
1957 /**
1958  * Free @a session.
1959  *
1960  * @param session the session to free
1961  */
1962 static void
1963 free_session (struct GNUNET_ATS_Session *session)
1964 {
1965   struct Neighbour *neighbour = session->neighbour;
1966   struct TransportClient *tc = session->tc;
1967   struct MonitorEvent me = {
1968     .cs = GNUNET_TRANSPORT_CS_DOWN,
1969     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
1970   };
1971   struct QueueEntry *qe;
1972   int maxxed;
1973
1974   if (NULL != session->transmit_task)
1975   {
1976     GNUNET_SCHEDULER_cancel (session->transmit_task);
1977     session->transmit_task = NULL;
1978   }
1979   GNUNET_CONTAINER_MDLL_remove (neighbour,
1980                                 neighbour->session_head,
1981                                 neighbour->session_tail,
1982                                 session);
1983   GNUNET_CONTAINER_MDLL_remove (client,
1984                                 tc->details.communicator.session_head,
1985                                 tc->details.communicator.session_tail,
1986                                 session);
1987   maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
1988   while (NULL != (qe = session->queue_head))
1989   {
1990     GNUNET_CONTAINER_DLL_remove (session->queue_head,
1991                                  session->queue_tail,
1992                                  qe);
1993     session->queue_length--;
1994     tc->details.communicator.total_queue_length--;
1995     GNUNET_free (qe);
1996   }
1997   GNUNET_assert (0 == session->queue_length);
1998   if ( (maxxed) &&
1999        (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2000   {
2001     /* Communicator dropped below threshold, resume all queues */
2002     GNUNET_STATISTICS_update (GST_stats,
2003                               "# Transmission throttled due to communicator queue limit",
2004                               -1,
2005                               GNUNET_NO);
2006     for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
2007          NULL != s;
2008          s = s->next_client)
2009       schedule_transmit_on_queue (s);
2010   }
2011   notify_monitors (&neighbour->pid,
2012                    session->address,
2013                    session->nt,
2014                    &me);
2015   GNUNET_ATS_session_del (session->sr);
2016   GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
2017   GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
2018   GNUNET_free (session);
2019   if (NULL == neighbour->session_head)
2020   {
2021     cores_send_disconnect_info (&neighbour->pid);
2022     free_neighbour (neighbour);
2023   }
2024 }
2025
2026
2027 /**
2028  * Free @a ale
2029  *
2030  * @param ale address list entry to free
2031  */
2032 static void
2033 free_address_list_entry (struct AddressListEntry *ale)
2034 {
2035   struct TransportClient *tc = ale->tc;
2036
2037   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2038                                tc->details.communicator.addr_tail,
2039                                ale);
2040   if (NULL != ale->sc)
2041   {
2042     GNUNET_PEERSTORE_store_cancel (ale->sc);
2043     ale->sc = NULL;
2044   }
2045   if (NULL != ale->st)
2046   {
2047     GNUNET_SCHEDULER_cancel (ale->st);
2048     ale->st = NULL;
2049   }
2050   GNUNET_free (ale);
2051 }
2052
2053
2054 /**
2055  * Called whenever a client is disconnected.  Frees our
2056  * resources associated with that client.
2057  *
2058  * @param cls closure, NULL
2059  * @param client identification of the client
2060  * @param app_ctx our `struct TransportClient`
2061  */
2062 static void
2063 client_disconnect_cb (void *cls,
2064                       struct GNUNET_SERVICE_Client *client,
2065                       void *app_ctx)
2066 {
2067   struct TransportClient *tc = app_ctx;
2068
2069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2070               "Client %p disconnected, cleaning up.\n",
2071               tc);
2072   GNUNET_CONTAINER_DLL_remove (clients_head,
2073                                clients_tail,
2074                                tc);
2075   switch (tc->type)
2076   {
2077   case CT_NONE:
2078     break;
2079   case CT_CORE:
2080     {
2081       struct PendingMessage *pm;
2082
2083       while (NULL != (pm = tc->details.core.pending_msg_head))
2084       {
2085         GNUNET_CONTAINER_MDLL_remove (client,
2086                                       tc->details.core.pending_msg_head,
2087                                       tc->details.core.pending_msg_tail,
2088                                       pm);
2089         pm->client = NULL;
2090       }
2091     }
2092     break;
2093   case CT_MONITOR:
2094     break;
2095   case CT_COMMUNICATOR:
2096     {
2097       struct GNUNET_ATS_Session *q;
2098       struct AddressListEntry *ale;
2099
2100       while (NULL != (q = tc->details.communicator.session_head))
2101         free_session (q);
2102       while (NULL != (ale = tc->details.communicator.addr_head))
2103         free_address_list_entry (ale);
2104       GNUNET_free (tc->details.communicator.address_prefix);
2105     }
2106     break;
2107   }
2108   GNUNET_free (tc);
2109 }
2110
2111
2112 /**
2113  * Iterator telling new CORE client about all existing
2114  * connections to peers.
2115  *
2116  * @param cls the new `struct TransportClient`
2117  * @param pid a connected peer
2118  * @param value the `struct Neighbour` with more information
2119  * @return #GNUNET_OK (continue to iterate)
2120  */
2121 static int
2122 notify_client_connect_info (void *cls,
2123                             const struct GNUNET_PeerIdentity *pid,
2124                             void *value)
2125 {
2126   struct TransportClient *tc = cls;
2127   struct Neighbour *neighbour = value;
2128
2129   core_send_connect_info (tc,
2130                           pid,
2131                           neighbour->quota_out);
2132   return GNUNET_OK;
2133 }
2134
2135
2136 /**
2137  * Initialize a "CORE" client.  We got a start message from this
2138  * client, so add it to the list of clients for broadcasting of
2139  * inbound messages.
2140  *
2141  * @param cls the client
2142  * @param start the start message that was sent
2143  */
2144 static void
2145 handle_client_start (void *cls,
2146                      const struct StartMessage *start)
2147 {
2148   struct TransportClient *tc = cls;
2149   uint32_t options;
2150
2151   options = ntohl (start->options);
2152   if ( (0 != (1 & options)) &&
2153        (0 !=
2154         memcmp (&start->self,
2155                 &GST_my_identity,
2156                 sizeof (struct GNUNET_PeerIdentity)) ) )
2157   {
2158     /* client thinks this is a different peer, reject */
2159     GNUNET_break (0);
2160     GNUNET_SERVICE_client_drop (tc->client);
2161     return;
2162   }
2163   if (CT_NONE != tc->type)
2164   {
2165     GNUNET_break (0);
2166     GNUNET_SERVICE_client_drop (tc->client);
2167     return;
2168   }
2169   tc->type = CT_CORE;
2170   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2171                                          &notify_client_connect_info,
2172                                          tc);
2173   GNUNET_SERVICE_client_continue (tc->client);
2174 }
2175
2176
2177 /**
2178  * Client asked for transmission to a peer.  Process the request.
2179  *
2180  * @param cls the client
2181  * @param obm the send message that was sent
2182  */
2183 static int
2184 check_client_send (void *cls,
2185                    const struct OutboundMessage *obm)
2186 {
2187   struct TransportClient *tc = cls;
2188   uint16_t size;
2189   const struct GNUNET_MessageHeader *obmm;
2190
2191   if (CT_CORE != tc->type)
2192   {
2193     GNUNET_break (0);
2194     return GNUNET_SYSERR;
2195   }
2196   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2197   if (size < sizeof (struct GNUNET_MessageHeader))
2198   {
2199     GNUNET_break (0);
2200     return GNUNET_SYSERR;
2201   }
2202   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2203   if (size != ntohs (obmm->size))
2204   {
2205     GNUNET_break (0);
2206     return GNUNET_SYSERR;
2207   }
2208   return GNUNET_OK;
2209 }
2210
2211
2212 /**
2213  * Free fragment tree below @e root, excluding @e root itself.
2214  *
2215  * @param root root of the tree to free
2216  */  
2217 static void
2218 free_fragment_tree (struct PendingMessage *root)
2219 {
2220   struct PendingMessage *frag;
2221
2222   while (NULL != (frag = root->head_frag))
2223   {
2224     free_fragment_tree (frag);
2225     GNUNET_CONTAINER_MDLL_remove (frag,
2226                                   root->head_frag,
2227                                   root->tail_frag,
2228                                   frag);
2229     GNUNET_free (frag);
2230   }
2231 }
2232
2233
2234 /**
2235  * Release memory associated with @a pm and remove @a pm from associated
2236  * data structures.  @a pm must be a top-level pending message and not
2237  * a fragment in the tree.  The entire tree is freed (if applicable).
2238  *
2239  * @param pm the pending message to free
2240  */
2241 static void
2242 free_pending_message (struct PendingMessage *pm)
2243 {
2244   struct TransportClient *tc = pm->client;
2245   struct Neighbour *target = pm->target;
2246
2247   if (NULL != tc)
2248   {
2249     GNUNET_CONTAINER_MDLL_remove (client,
2250                                   tc->details.core.pending_msg_head,
2251                                   tc->details.core.pending_msg_tail,
2252                                   pm);
2253   }
2254   GNUNET_CONTAINER_MDLL_remove (neighbour,
2255                                 target->pending_msg_head,
2256                                 target->pending_msg_tail,
2257                                 pm);
2258   free_fragment_tree (pm);
2259   GNUNET_free_non_null (pm->bpm);
2260   GNUNET_free (pm);
2261 }
2262
2263
2264 /**
2265  * Send a response to the @a pm that we have processed a
2266  * "send" request with status @a success. We
2267  * transmitted @a bytes_physical on the actual wire.
2268  * Sends a confirmation to the "core" client responsible
2269  * for the original request and free's @a pm.
2270  *
2271  * @param pm handle to the original pending message
2272  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2273  *          for transmission failure
2274  * @param bytes_physical amount of bandwidth consumed
2275  */
2276 static void
2277 client_send_response (struct PendingMessage *pm,
2278                       int success,
2279                       uint32_t bytes_physical)
2280 {
2281   struct TransportClient *tc = pm->client;
2282   struct Neighbour *target = pm->target;
2283   struct GNUNET_MQ_Envelope *env;
2284   struct SendOkMessage *som;
2285
2286   if (NULL != tc)
2287   {
2288     env = GNUNET_MQ_msg (som,
2289                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2290     som->success = htonl ((uint32_t) success);
2291     som->bytes_msg = htons (pm->bytes_msg);
2292     som->bytes_physical = htonl (bytes_physical);
2293     som->peer = target->pid;
2294     GNUNET_MQ_send (tc->mq,
2295                     env);
2296   }
2297   free_pending_message (pm);
2298 }
2299
2300
2301 /**
2302  * Checks the message queue for a neighbour for messages that have timed
2303  * out and purges them.
2304  *
2305  * @param cls a `struct Neighbour`
2306  */
2307 static void
2308 check_queue_timeouts (void *cls)
2309 {
2310   struct Neighbour *n = cls;
2311   struct PendingMessage *pm;
2312   struct GNUNET_TIME_Absolute now;
2313   struct GNUNET_TIME_Absolute earliest_timeout;
2314
2315   n->timeout_task = NULL;
2316   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2317   now = GNUNET_TIME_absolute_get ();
2318   for (struct PendingMessage *pos = n->pending_msg_head;
2319        NULL != pos;
2320        pos = pm)
2321   {
2322     pm = pos->next_neighbour;
2323     if (pos->timeout.abs_value_us <= now.abs_value_us)
2324     {
2325       GNUNET_STATISTICS_update (GST_stats,
2326                                 "# messages dropped (timeout before confirmation)",
2327                                 1,
2328                                 GNUNET_NO);
2329       client_send_response (pm,
2330                             GNUNET_NO,
2331                             0);      
2332       continue;
2333     }
2334     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2335                                                  pos->timeout);
2336   }
2337   n->earliest_timeout = earliest_timeout;
2338   if (NULL != n->pending_msg_head)
2339     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2340                                                &check_queue_timeouts,
2341                                                n);
2342 }
2343
2344
2345 /**
2346  * Client asked for transmission to a peer.  Process the request.
2347  *
2348  * @param cls the client
2349  * @param obm the send message that was sent
2350  */
2351 static void
2352 handle_client_send (void *cls,
2353                     const struct OutboundMessage *obm)
2354 {
2355   struct TransportClient *tc = cls;
2356   struct PendingMessage *pm;
2357   const struct GNUNET_MessageHeader *obmm;
2358   struct Neighbour *target;
2359   uint32_t bytes_msg;
2360
2361   GNUNET_assert (CT_CORE == tc->type);
2362   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2363   bytes_msg = ntohs (obmm->size);
2364   target = lookup_neighbour (&obm->peer);
2365   if (NULL == target)
2366   {
2367     /* Failure: don't have this peer as a neighbour (anymore).
2368        Might have gone down asynchronously, so this is NOT
2369        a protocol violation by CORE. Still count the event,
2370        as this should be rare. */
2371     struct GNUNET_MQ_Envelope *env;
2372     struct SendOkMessage *som;
2373
2374     env = GNUNET_MQ_msg (som,
2375                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2376     som->success = htonl (GNUNET_SYSERR);
2377     som->bytes_msg = htonl (bytes_msg);
2378     som->bytes_physical = htonl (0);
2379     som->peer = obm->peer;
2380     GNUNET_MQ_send (tc->mq,
2381                     env);
2382     GNUNET_SERVICE_client_continue (tc->client);
2383     GNUNET_STATISTICS_update (GST_stats,
2384                               "# messages dropped (neighbour unknown)",
2385                               1,
2386                               GNUNET_NO);
2387     return;
2388   }
2389   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2390   pm->client = tc;
2391   pm->target = target;
2392   pm->bytes_msg = bytes_msg;
2393   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2394   memcpy (&pm[1],
2395           &obm[1],
2396           bytes_msg);
2397   GNUNET_CONTAINER_MDLL_insert (neighbour,
2398                                 target->pending_msg_head,
2399                                 target->pending_msg_tail,
2400                                 pm);
2401   GNUNET_CONTAINER_MDLL_insert (client,
2402                                 tc->details.core.pending_msg_head,
2403                                 tc->details.core.pending_msg_tail,
2404                                 pm);
2405   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2406   {
2407     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2408     if (NULL != target->timeout_task)
2409       GNUNET_SCHEDULER_cancel (target->timeout_task);
2410     target->timeout_task 
2411       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2412                                  &check_queue_timeouts,
2413                                  target);
2414   }
2415 }
2416
2417
2418 /**
2419  * Communicator started.  Test message is well-formed.
2420  *
2421  * @param cls the client
2422  * @param cam the send message that was sent
2423  */
2424 static int
2425 check_communicator_available (void *cls,
2426                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2427 {
2428   struct TransportClient *tc = cls;
2429   uint16_t size;
2430
2431   if (CT_NONE != tc->type)
2432   {
2433     GNUNET_break (0);
2434     return GNUNET_SYSERR;
2435   }
2436   tc->type = CT_COMMUNICATOR;
2437   size = ntohs (cam->header.size) - sizeof (*cam);
2438   if (0 == size)
2439     return GNUNET_OK; /* receive-only communicator */
2440   GNUNET_MQ_check_zero_termination (cam);
2441   return GNUNET_OK;
2442 }
2443
2444
2445 /**
2446  * Communicator started.  Process the request.
2447  *
2448  * @param cls the client
2449  * @param cam the send message that was sent
2450  */
2451 static void
2452 handle_communicator_available (void *cls,
2453                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2454 {
2455   struct TransportClient *tc = cls;
2456   uint16_t size;
2457
2458   size = ntohs (cam->header.size) - sizeof (*cam);
2459   if (0 == size)
2460     return; /* receive-only communicator */
2461   tc->details.communicator.address_prefix
2462     = GNUNET_strdup ((const char *) &cam[1]);
2463   tc->details.communicator.cc
2464     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2465   GNUNET_SERVICE_client_continue (tc->client);
2466 }
2467
2468
2469 /**
2470  * Communicator requests backchannel transmission.  Check the request.
2471  *
2472  * @param cls the client
2473  * @param cb the send message that was sent
2474  * @return #GNUNET_OK if message is well-formed
2475  */
2476 static int
2477 check_communicator_backchannel (void *cls,
2478                                 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2479 {
2480   const struct GNUNET_MessageHeader *inbox;
2481   const char *is;
2482   uint16_t msize;
2483   uint16_t isize;
2484
2485   msize = ntohs (cb->header.size) - sizeof (*cb);
2486   if (UINT16_MAX - msize >
2487       sizeof (struct TransportBackchannelEncapsulationMessage) +
2488       sizeof (struct TransportBackchannelRequestPayload) )
2489   {
2490     GNUNET_break (0);
2491     return GNUNET_SYSERR;
2492   }
2493   inbox = (const struct GNUNET_MessageHeader *) &cb[1];
2494   isize = ntohs (inbox->size);
2495   if (isize >= msize)
2496   {
2497     GNUNET_break (0);
2498     return GNUNET_SYSERR;
2499   }
2500   is = (const char *) inbox;
2501   is += isize;
2502   msize -= isize;
2503   GNUNET_assert (msize > 0);
2504   if ('\0' != is[msize-1])
2505   {
2506     GNUNET_break (0);
2507     return GNUNET_SYSERR;
2508   }
2509   return GNUNET_OK;
2510 }
2511
2512
2513 /**
2514  * Remove memory used by expired ephemeral keys.
2515  *
2516  * @param cls NULL
2517  */
2518 static void
2519 expire_ephemerals (void *cls)
2520 {
2521   struct EphemeralCacheEntry *ece;
2522
2523   (void) cls;
2524   ephemeral_task = NULL;
2525   while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
2526   {
2527     if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
2528     {
2529       free_ephemeral (ece);
2530       continue;
2531     }
2532     ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2533                                               &expire_ephemerals,
2534                                               NULL);
2535     return;
2536   }
2537 }
2538
2539
2540 /**
2541  * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
2542  * one, cache it and return it.
2543  *
2544  * @param pid peer to look up ephemeral for
2545  * @param private_key[out] set to the private key 
2546  * @param ephemeral_key[out] set to the key
2547  * @param ephemeral_sender_sig[out] set to the signature
2548  * @param ephemeral_validity[out] set to the validity expiration time
2549  */
2550 static void
2551 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
2552                   struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
2553                   struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
2554                   struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
2555                   struct GNUNET_TIME_Absolute *ephemeral_validity)
2556 {
2557   struct EphemeralCacheEntry *ece;
2558   struct EphemeralConfirmation ec;
2559   
2560   ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
2561                                            pid);
2562   if ( (NULL != ece) &&
2563        (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
2564   {
2565     free_ephemeral (ece);
2566     ece = NULL;
2567   }
2568   if (NULL == ece)
2569   {
2570     ece = GNUNET_new (struct EphemeralCacheEntry);
2571     ece->target = *pid;
2572     ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
2573                                                         EPHEMERAL_VALIDITY);
2574     GNUNET_assert (GNUNET_OK ==
2575                    GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
2576     GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
2577                                         &ece->ephemeral_key);
2578     ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
2579     ec.purpose.size = htonl (sizeof (ec));
2580     ec.target = *pid;
2581     ec.ephemeral_key = ece->ephemeral_key;
2582     GNUNET_assert (GNUNET_OK ==
2583                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
2584                                              &ec.purpose,
2585                                              &ece->sender_sig));
2586     ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
2587                                             ece,
2588                                             ece->ephemeral_validity.abs_value_us);
2589     GNUNET_assert (GNUNET_OK ==
2590                    GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
2591                                                       &ece->target,
2592                                                       ece,
2593                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2594     if (NULL == ephemeral_task)
2595       ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2596                                                 &expire_ephemerals,
2597                                                 NULL);
2598   }
2599   *private_key = ece->private_key;
2600   *ephemeral_key = ece->ephemeral_key;
2601   *ephemeral_sender_sig = ece->sender_sig;
2602   *ephemeral_validity = ece->ephemeral_validity;
2603 }
2604
2605
2606 /**
2607  * We need to transmit @a hdr to @a target.  If necessary, this may
2608  * involve DV routing or even broadcasting and fragmentation.
2609  *
2610  * @param target peer to receive @a hdr
2611  * @param hdr header of the message to route
2612  */
2613 static void
2614 route_message (const struct GNUNET_PeerIdentity *target,
2615                struct GNUNET_MessageHeader *hdr)
2616 {
2617   // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
2618   GNUNET_free (hdr);
2619 }
2620
2621   
2622 /**
2623  * Communicator requests backchannel transmission.  Process the request.
2624  *
2625  * @param cls the client
2626  * @param cb the send message that was sent
2627  */
2628 static void
2629 handle_communicator_backchannel (void *cls,
2630                                  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2631 {
2632   struct TransportClient *tc = cls;
2633   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
2634   struct GNUNET_TIME_Absolute ephemeral_validity;
2635   struct TransportBackchannelEncapsulationMessage *enc;
2636   struct TransportBackchannelRequestPayload ppay;
2637   char *mpos;
2638   uint16_t msize;
2639   
2640   /* encapsulate and encrypt message */
2641   msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
2642   enc = GNUNET_malloc (sizeof (*enc) + msize);
2643   enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
2644   enc->header.size = htons (sizeof (*enc) + msize);
2645   enc->target = cb->pid;
2646   lookup_ephemeral (&cb->pid,
2647                     &private_key,
2648                     &enc->ephemeral_key,
2649                     &ppay.sender_sig,
2650                     &ephemeral_validity);
2651   // FIXME: setup 'iv'
2652 #if FIXME
2653   dh_key_derive (&private_key,
2654                  &cb->pid,
2655                  &enc->iv,
2656                  &key);
2657 #endif
2658   ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
2659   ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
2660   mpos = (char *) &enc[1];
2661 #if FIXME
2662   encrypt (key,
2663            &ppay,
2664            &mpos,
2665            sizeof (ppay));
2666   encrypt (key,
2667            &cb[1],
2668            &mpos,
2669            ntohs (cb->header.size) - sizeof (*cb));
2670   hmac (key,
2671         &enc->hmac);
2672 #endif
2673   route_message (&cb->pid,
2674                  &enc->header);
2675   GNUNET_SERVICE_client_continue (tc->client);
2676 }
2677
2678
2679 /**
2680  * Address of our peer added.  Test message is well-formed.
2681  *
2682  * @param cls the client
2683  * @param aam the send message that was sent
2684  * @return #GNUNET_OK if message is well-formed
2685  */
2686 static int
2687 check_add_address (void *cls,
2688                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2689 {
2690   struct TransportClient *tc = cls;
2691
2692   if (CT_COMMUNICATOR != tc->type)
2693   {
2694     GNUNET_break (0);
2695     return GNUNET_SYSERR;
2696   }
2697   GNUNET_MQ_check_zero_termination (aam);
2698   return GNUNET_OK;
2699 }
2700
2701
2702 /**
2703  * Ask peerstore to store our address.
2704  *
2705  * @param cls an `struct AddressListEntry *`
2706  */
2707 static void
2708 store_pi (void *cls);
2709
2710
2711 /**
2712  * Function called when peerstore is done storing our address.
2713  */
2714 static void
2715 peerstore_store_cb (void *cls,
2716                     int success)
2717 {
2718   struct AddressListEntry *ale = cls;
2719
2720   ale->sc = NULL;
2721   if (GNUNET_YES != success)
2722     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2723                 "Failed to store our own address `%s' in peerstore!\n",
2724                 ale->address);
2725   /* refresh period is 1/4 of expiration time, that should be plenty
2726      without being excessive. */
2727   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
2728                                                                        4ULL),
2729                                           &store_pi,
2730                                           ale);
2731 }
2732
2733
2734 /**
2735  * Ask peerstore to store our address.
2736  *
2737  * @param cls an `struct AddressListEntry *`
2738  */
2739 static void
2740 store_pi (void *cls)
2741 {
2742   struct AddressListEntry *ale = cls;
2743   void *addr;
2744   size_t addr_len;
2745   struct GNUNET_TIME_Absolute expiration;
2746
2747   ale->st = NULL;
2748   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
2749   GNUNET_HELLO_sign_address (ale->address,
2750                              ale->nt,
2751                              expiration,
2752                              GST_my_private_key,
2753                              &addr,
2754                              &addr_len);
2755   ale->sc = GNUNET_PEERSTORE_store (peerstore,
2756                                     "transport",
2757                                     &GST_my_identity,
2758                                     GNUNET_HELLO_PEERSTORE_KEY,
2759                                     addr,
2760                                     addr_len,
2761                                     expiration,
2762                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
2763                                     &peerstore_store_cb,
2764                                     ale);
2765   GNUNET_free (addr);
2766   if (NULL == ale->sc)
2767   {
2768     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2769                 "Failed to store our address `%s' with peerstore\n",
2770                 ale->address);
2771     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
2772                                             &store_pi,
2773                                             ale);
2774   }
2775 }
2776
2777
2778 /**
2779  * Address of our peer added.  Process the request.
2780  *
2781  * @param cls the client
2782  * @param aam the send message that was sent
2783  */
2784 static void
2785 handle_add_address (void *cls,
2786                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2787 {
2788   struct TransportClient *tc = cls;
2789   struct AddressListEntry *ale;
2790   size_t slen;
2791
2792   slen = ntohs (aam->header.size) - sizeof (*aam);
2793   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
2794   ale->tc = tc;
2795   ale->address = (const char *) &ale[1];
2796   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
2797   ale->aid = aam->aid;
2798   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
2799   memcpy (&ale[1],
2800           &aam[1],
2801           slen);
2802   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
2803                                tc->details.communicator.addr_tail,
2804                                ale);
2805   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
2806                                       ale);
2807   GNUNET_SERVICE_client_continue (tc->client);
2808 }
2809
2810
2811 /**
2812  * Address of our peer deleted.  Process the request.
2813  *
2814  * @param cls the client
2815  * @param dam the send message that was sent
2816  */
2817 static void
2818 handle_del_address (void *cls,
2819                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
2820 {
2821   struct TransportClient *tc = cls;
2822
2823   if (CT_COMMUNICATOR != tc->type)
2824   {
2825     GNUNET_break (0);
2826     GNUNET_SERVICE_client_drop (tc->client);
2827     return;
2828   }
2829   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
2830        NULL != ale;
2831        ale = ale->next)
2832   {
2833     if (dam->aid != ale->aid)
2834       continue;
2835     GNUNET_assert (ale->tc == tc);
2836     free_address_list_entry (ale);
2837     GNUNET_SERVICE_client_continue (tc->client);
2838   }
2839   GNUNET_break (0);
2840   GNUNET_SERVICE_client_drop (tc->client);
2841 }
2842
2843
2844 /**
2845  * Context from #handle_incoming_msg().  Closure for many
2846  * message handlers below.
2847  */
2848 struct CommunicatorMessageContext
2849 {
2850   /**
2851    * Which communicator provided us with the message.
2852    */
2853   struct TransportClient *tc;
2854
2855   /**
2856    * Additional information for flow control and about the sender.
2857    */
2858   struct GNUNET_TRANSPORT_IncomingMessage im;
2859
2860   /**
2861    * Number of hops the message has travelled (if DV-routed).
2862    * FIXME: make use of this in ACK handling!
2863    */
2864   uint16_t total_hops;
2865 };
2866
2867
2868 /**
2869  * Given an inbound message @a msg from a communicator @a cmc,
2870  * demultiplex it based on the type calling the right handler.
2871  *
2872  * @param cmc context for demultiplexing
2873  * @param msg message to demultiplex
2874  */ 
2875 static void
2876 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
2877                       const struct GNUNET_MessageHeader *msg);
2878
2879
2880 /**
2881  * Send ACK to communicator (if requested) and free @a cmc.
2882  *
2883  * @param cmc context for which we are done handling the message
2884  */
2885 static void
2886 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
2887 {
2888   if (0 != ntohl (cmc->im.fc_on))
2889   {
2890     /* send ACK when done to communicator for flow control! */
2891     struct GNUNET_MQ_Envelope *env;
2892     struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
2893
2894     env = GNUNET_MQ_msg (ack,
2895                          GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
2896     ack->reserved = htonl (0);
2897     ack->fc_id = cmc->im.fc_id;
2898     ack->sender = cmc->im.sender;
2899     GNUNET_MQ_send (cmc->tc->mq,
2900                     env);
2901   }
2902   GNUNET_SERVICE_client_continue (cmc->tc->client);
2903   GNUNET_free (cmc);
2904 }
2905
2906
2907 /**
2908  * Communicator gave us an unencapsulated message to pass as-is to
2909  * CORE.  Process the request.
2910  *
2911  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2912  * @param mh the message that was received
2913  */
2914 static void
2915 handle_raw_message (void *cls,
2916                     const struct GNUNET_MessageHeader *mh)
2917 {
2918   struct CommunicatorMessageContext *cmc = cls;
2919   uint16_t size = ntohs (mh->size);
2920
2921   if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
2922        (size < sizeof (struct GNUNET_MessageHeader)) )
2923   {
2924     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
2925
2926     GNUNET_break (0);
2927     finish_cmc_handling (cmc);
2928     GNUNET_SERVICE_client_drop (client);
2929     return;
2930   }
2931   /* Forward to all CORE clients */
2932   for (struct TransportClient *tc = clients_head;
2933        NULL != tc;
2934        tc = tc->next)
2935   {
2936     struct GNUNET_MQ_Envelope *env;
2937     struct InboundMessage *im;
2938
2939     if (CT_CORE != tc->type)
2940       continue;
2941     env = GNUNET_MQ_msg_extra (im,
2942                                size,
2943                                GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
2944     im->peer = cmc->im.sender;
2945     memcpy (&im[1],
2946             mh,
2947             size);
2948     GNUNET_MQ_send (tc->mq,
2949                     env);
2950   }
2951   /* FIXME: consider doing this _only_ once the message
2952      was drained from the CORE MQs to extend flow control to CORE! 
2953      (basically, increment counter in cmc, decrement on MQ send continuation! */
2954   finish_cmc_handling (cmc);
2955 }
2956
2957
2958 /**
2959  * Communicator gave us a fragment box.  Check the message.
2960  *
2961  * @param cls a `struct CommunicatorMessageContext`
2962  * @param fb the send message that was sent
2963  * @return #GNUNET_YES if message is well-formed
2964  */
2965 static int
2966 check_fragment_box (void *cls,
2967                     const struct TransportFragmentBox *fb)
2968 {
2969   uint16_t size = ntohs (fb->header.size);
2970   uint16_t bsize = size - sizeof (*fb);
2971
2972   if (0 == bsize)
2973   {
2974     GNUNET_break_op (0);
2975     return GNUNET_SYSERR;
2976   }
2977   if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size)) 
2978   {
2979     GNUNET_break_op (0);
2980     return GNUNET_SYSERR;
2981   }
2982   if (ntohs (fb->frag_off) >= ntohs (fb->msg_size)) 
2983   {
2984     GNUNET_break_op (0);
2985     return GNUNET_SYSERR;
2986   }
2987   return GNUNET_YES;
2988 }
2989
2990
2991 /**
2992  * Generate a fragment acknowledgement for an @a rc.
2993  *
2994  * @param rc context to generate ACK for, @a rc ACK state is reset
2995  */
2996 static void
2997 send_fragment_ack (struct ReassemblyContext *rc)
2998 {
2999   struct TransportFragmentAckMessage *ack;
3000
3001   ack = GNUNET_new (struct TransportFragmentAckMessage);
3002   ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3003   ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3004   ack->frag_uuid = htonl (rc->frag_uuid);
3005   ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3006   ack->msg_uuid = rc->msg_uuid;
3007   ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3008   if (0 == rc->msg_missing)
3009     ack->reassembly_timeout
3010       = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3011   else
3012     ack->reassembly_timeout
3013       = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3014   route_message (&rc->neighbour->pid,
3015                  &ack->header);
3016   rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3017   rc->num_acks = 0;
3018   rc->extra_acks = 0LLU;
3019 }
3020
3021
3022 /**
3023  * Communicator gave us a fragment.  Process the request.
3024  *
3025  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3026  * @param fb the message that was received
3027  */
3028 static void
3029 handle_fragment_box (void *cls,
3030                      const struct TransportFragmentBox *fb)
3031 {
3032   struct CommunicatorMessageContext *cmc = cls;
3033   struct Neighbour *n;
3034   struct ReassemblyContext *rc;
3035   const struct GNUNET_MessageHeader *msg;
3036   uint16_t msize;
3037   uint16_t fsize;
3038   uint16_t frag_off;
3039   uint32_t frag_uuid;
3040   char *target;
3041   struct GNUNET_TIME_Relative cdelay;
3042   int ack_now;
3043
3044   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3045                                          &cmc->im.sender);
3046   if (NULL == n)
3047   {
3048     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3049         
3050     GNUNET_break (0);
3051     finish_cmc_handling (cmc);
3052     GNUNET_SERVICE_client_drop (client);
3053     return;
3054   }
3055   if (NULL == n->reassembly_map)
3056   {
3057     n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
3058                                                                GNUNET_YES);
3059     n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3060     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
3061                                                                &reassembly_cleanup_task,
3062                                                                n);
3063   }
3064   msize = ntohs (fb->msg_size);
3065   rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
3066                                            &fb->msg_uuid);
3067   if (NULL == rc)
3068   {
3069     rc = GNUNET_malloc (sizeof (*rc) +
3070                         msize + /* reassembly payload buffer */
3071                         (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
3072     rc->msg_uuid = fb->msg_uuid;
3073     rc->neighbour = n;
3074     rc->msg_size = msize;
3075     rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
3076     rc->last_frag = GNUNET_TIME_absolute_get ();
3077     rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
3078                                            rc,
3079                                            rc->reassembly_timeout.abs_value_us);
3080     GNUNET_assert (GNUNET_OK ==
3081                    GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
3082                                                        &rc->msg_uuid,
3083                                                        rc,
3084                                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3085     target = (char *) &rc[1];
3086     rc->bitfield = (uint8_t *) (target + rc->msg_size);
3087     rc->msg_missing = rc->msg_size;
3088   }
3089   else
3090   {
3091     target = (char *) &rc[1];
3092   }
3093   if (msize != rc->msg_size)
3094   {
3095     GNUNET_break (0);
3096     finish_cmc_handling (cmc);
3097     return;
3098   }
3099   
3100   /* reassemble */
3101   fsize = ntohs (fb->header.size) - sizeof (*fb);
3102   frag_off = ntohs (fb->frag_off);
3103   memcpy (&target[frag_off],
3104           &fb[1],
3105           fsize);
3106   /* update bitfield and msg_missing */
3107   for (unsigned int i=frag_off;i<frag_off+fsize;i++)
3108   {
3109     if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
3110     {
3111       rc->bitfield[i / 8] |= (1 << (i % 8));
3112       rc->msg_missing--;
3113     }
3114   }
3115                                     
3116   /* Compute cummulative ACK */
3117   frag_uuid = ntohl (fb->frag_uuid);
3118   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
3119   cdelay = GNUNET_TIME_relative_multiply (cdelay,
3120                                           rc->num_acks);
3121   rc->last_frag = GNUNET_TIME_absolute_get ();
3122   rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
3123                                                 cdelay);
3124   ack_now = GNUNET_NO;
3125   if (0 == rc->num_acks)
3126   {
3127     /* case one: first ack */
3128     rc->frag_uuid = frag_uuid;
3129     rc->extra_acks = 0LLU;
3130     rc->num_acks = 1;
3131   }
3132   else if ( (frag_uuid >= rc->frag_uuid) &&
3133             (frag_uuid <= rc->frag_uuid + 64) )
3134   {
3135     /* case two: ack fits after existing min UUID */
3136     if ( (frag_uuid == rc->frag_uuid) ||
3137          (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
3138     {
3139       /* duplicate fragment, ack now! */
3140       ack_now = GNUNET_YES;
3141     }
3142     else
3143     {
3144       rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
3145       rc->num_acks++;
3146     }
3147   }
3148   else if ( (rc->frag_uuid > frag_uuid) &&
3149             ( ( (rc->frag_uuid == frag_uuid + 64) &&
3150                 (0 == rc->extra_acks) ) ||
3151               ( (rc->frag_uuid < frag_uuid + 64) &&
3152                 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
3153   {
3154     /* can fit ack by shifting extra acks and starting at 
3155        frag_uid, test above esured that the bits we will
3156        shift 'extra_acks' by are all zero. */
3157     rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
3158     rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
3159     rc->frag_uuid = frag_uuid;
3160     rc->num_acks++;
3161   }
3162   if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
3163     ack_now = GNUNET_YES; /* maximum acks received */
3164   // FIXME: possibly also ACK based on RTT (but for that we'd need to
3165   // determine the session used for the ACK first!)  
3166   
3167   /* is reassembly complete? */
3168   if (0 != rc->msg_missing)
3169   {
3170     if (ack_now)
3171       send_fragment_ack (rc);
3172     finish_cmc_handling (cmc);
3173     return;
3174   }
3175   /* reassembly is complete, verify result */
3176   msg = (const struct GNUNET_MessageHeader *) &rc[1];
3177   if (ntohs (msg->size) != rc->msg_size)
3178   {
3179     GNUNET_break (0);
3180     free_reassembly_context (rc);
3181     finish_cmc_handling (cmc);
3182     return;
3183   }
3184   /* successful reassembly */
3185   send_fragment_ack (rc);
3186   demultiplex_with_cmc (cmc,
3187                         msg);
3188   /* FIXME: really free here? Might be bad if fragments are still
3189      en-route and we forget that we finished this reassembly immediately!
3190      -> keep around until timeout? 
3191      -> shorten timeout based on ACK? */
3192   free_reassembly_context (rc);
3193 }
3194
3195
3196 /**
3197  * Communicator gave us a fragment acknowledgement.  Process the request.
3198  *
3199  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3200  * @param fa the message that was received
3201  */
3202 static void
3203 handle_fragment_ack (void *cls,
3204                      const struct TransportFragmentAckMessage *fa)
3205 {
3206   struct CommunicatorMessageContext *cmc = cls;
3207   
3208   // FIXME: do work: identify original message; then identify fragments being acked;
3209   // remove those from the tree to prevent retransmission;
3210   // compute RTT
3211   // if entire message is ACKed, handle that as well.
3212   finish_cmc_handling (cmc);
3213 }
3214
3215
3216 /**
3217  * Communicator gave us a reliability box.  Check the message.
3218  *
3219  * @param cls a `struct CommunicatorMessageContext`
3220  * @param rb the send message that was sent
3221  * @return #GNUNET_YES if message is well-formed
3222  */
3223 static int
3224 check_reliability_box (void *cls,
3225                        const struct TransportReliabilityBox *rb)
3226 {
3227   GNUNET_MQ_check_boxed_message (rb);
3228   return GNUNET_YES;
3229 }
3230
3231
3232 /**
3233  * Communicator gave us a reliability box.  Process the request.
3234  *
3235  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3236  * @param rb the message that was received
3237  */
3238 static void
3239 handle_reliability_box (void *cls,
3240                         const struct TransportReliabilityBox *rb)
3241 {
3242   struct CommunicatorMessageContext *cmc = cls;
3243   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
3244
3245   if (0 == ntohl (rb->ack_countdown))
3246   {
3247     struct TransportReliabilityAckMessage *ack;
3248
3249     /* FIXME: implement cummulative ACKs and ack_countdown,
3250        then setting the avg_ack_delay field below: */
3251     ack = GNUNET_malloc (sizeof (*ack) + 
3252                          sizeof (struct GNUNET_ShortHashCode));
3253     ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
3254     ack->header.size = htons (sizeof (*ack) + 
3255                               sizeof (struct GNUNET_ShortHashCode));
3256     memcpy (&ack[1],
3257             &rb->msg_uuid,
3258             sizeof (struct GNUNET_ShortHashCode));
3259     route_message (&cmc->im.sender,
3260                    &ack->header);
3261   }
3262   /* continue with inner message */
3263   demultiplex_with_cmc (cmc,
3264                         inbox);
3265 }
3266
3267
3268 /**
3269  * Communicator gave us a reliability ack.  Process the request.
3270  *
3271  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3272  * @param ra the message that was received
3273  */
3274 static void
3275 handle_reliability_ack (void *cls,
3276                         const struct TransportReliabilityAckMessage *ra)
3277 {
3278   struct CommunicatorMessageContext *cmc = cls;
3279   
3280   // FIXME: do work: find message that was acknowledged, and
3281   // remove from transmission queue; update RTT.
3282   finish_cmc_handling (cmc);
3283 }
3284
3285
3286 /**
3287  * Communicator gave us a backchannel encapsulation.  Check the message.
3288  *
3289  * @param cls a `struct CommunicatorMessageContext`
3290  * @param be the send message that was sent
3291  * @return #GNUNET_YES if message is well-formed
3292  */
3293 static int
3294 check_backchannel_encapsulation (void *cls,
3295                                  const struct TransportBackchannelEncapsulationMessage *be)
3296 {
3297   uint16_t size = ntohs (be->header.size);
3298
3299   if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
3300   {
3301     GNUNET_break_op (0);
3302     return GNUNET_SYSERR;
3303   }
3304   return GNUNET_YES;
3305 }
3306
3307
3308 /**
3309  * Communicator gave us a backchannel encapsulation.  Process the request.
3310  *
3311  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3312  * @param be the message that was received
3313  */
3314 static void
3315 handle_backchannel_encapsulation (void *cls,
3316                                   const struct TransportBackchannelEncapsulationMessage *be)
3317 {
3318   struct CommunicatorMessageContext *cmc = cls;
3319
3320   if (0 != memcmp (&be->target,
3321                    &GST_my_identity,
3322                    sizeof (struct GNUNET_PeerIdentity)))
3323   {
3324     /* not for me, try to route to target */
3325     route_message (&be->target,
3326                    GNUNET_copy_message (&be->header));
3327     finish_cmc_handling (cmc);
3328     return;
3329   }
3330   // FIXME: compute shared secret
3331   // FIXME: check HMAC
3332   // FIXME: decrypt payload
3333   // FIXME: forward to specified communicator!
3334   // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)  
3335   finish_cmc_handling (cmc);
3336 }
3337
3338
3339 /**
3340  * Communicator gave us a DV learn message.  Check the message.
3341  *
3342  * @param cls a `struct CommunicatorMessageContext`
3343  * @param dvl the send message that was sent
3344  * @return #GNUNET_YES if message is well-formed
3345  */
3346 static int
3347 check_dv_learn (void *cls,
3348                 const struct TransportDVLearn *dvl)
3349 {
3350   uint16_t size = ntohs (dvl->header.size);
3351   uint16_t num_hops = ntohs (dvl->num_hops);
3352   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvl[1];
3353
3354   if (size != sizeof (*dvl) + num_hops * sizeof (struct GNUNET_PeerIdentity))
3355   {
3356     GNUNET_break_op (0);
3357     return GNUNET_SYSERR;
3358   }
3359   for (unsigned int i=0;i<num_hops;i++)
3360   {
3361     if (0 == memcmp (&dvl->initiator,
3362                      &hops[i],
3363                      sizeof (struct GNUNET_PeerIdentity)))
3364     {
3365       GNUNET_break_op (0);
3366       return GNUNET_SYSERR;
3367     }
3368     if (0 == memcmp (&GST_my_identity,
3369                      &hops[i],
3370                      sizeof (struct GNUNET_PeerIdentity)))
3371     {
3372       GNUNET_break_op (0);
3373       return GNUNET_SYSERR;
3374     }
3375   }
3376   return GNUNET_YES;
3377 }
3378
3379
3380 /**
3381  * Communicator gave us a DV learn message.  Process the request.
3382  *
3383  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3384  * @param dvl the message that was received
3385  */
3386 static void
3387 handle_dv_learn (void *cls,
3388                  const struct TransportDVLearn *dvl)
3389 {
3390   struct CommunicatorMessageContext *cmc = cls;
3391   
3392   // FIXME: learn path from DV message (if bi-directional flags are set)
3393   // FIXME: expand DV message, forward on (unless path is getting too long)
3394   finish_cmc_handling (cmc);
3395 }
3396
3397
3398 /**
3399  * Communicator gave us a DV box.  Check the message.
3400  *
3401  * @param cls a `struct CommunicatorMessageContext`
3402  * @param dvb the send message that was sent
3403  * @return #GNUNET_YES if message is well-formed
3404  */
3405 static int
3406 check_dv_box (void *cls,
3407               const struct TransportDVBox *dvb)
3408 {
3409   uint16_t size = ntohs (dvb->header.size);
3410   uint16_t num_hops = ntohs (dvb->num_hops);
3411   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3412   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3413   uint16_t isize;
3414   uint16_t itype;
3415
3416   if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
3417   {
3418     GNUNET_break_op (0);
3419     return GNUNET_SYSERR;
3420   }
3421   isize = ntohs (inbox->size);
3422   if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize) 
3423   {
3424     GNUNET_break_op (0);
3425     return GNUNET_SYSERR;
3426   }
3427   itype = ntohs (inbox->type);
3428   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
3429        (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
3430   {
3431     GNUNET_break_op (0);
3432     return GNUNET_SYSERR;
3433   }
3434   return GNUNET_YES;
3435 }
3436
3437
3438 /**
3439  * Communicator gave us a DV box.  Process the request.
3440  *
3441  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3442  * @param dvb the message that was received
3443  */
3444 static void
3445 handle_dv_box (void *cls,
3446                const struct TransportDVBox *dvb)
3447 {
3448   struct CommunicatorMessageContext *cmc = cls;
3449   uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
3450   uint16_t num_hops = ntohs (dvb->num_hops);
3451   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3452   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3453
3454   if (num_hops > 0)
3455   {
3456     // FIXME: if we are not the target, shorten path and forward along.
3457     // Try from the _end_ of hops array if we know the given
3458     // neighbour (shortening the path!). 
3459     // NOTE: increment total_hops!
3460     finish_cmc_handling (cmc);
3461     return;
3462   }
3463   /* We are the target. Unbox and handle message. */
3464   cmc->im.sender = dvb->origin;
3465   cmc->total_hops = ntohs (dvb->total_hops); 
3466   demultiplex_with_cmc (cmc,
3467                         inbox);
3468 }
3469
3470
3471 /**
3472  * Client notified us about transmission from a peer.  Process the request.
3473  *
3474  * @param cls a `struct TransportClient` which sent us the message
3475  * @param obm the send message that was sent
3476  * @return #GNUNET_YES if message is well-formed
3477  */
3478 static int
3479 check_incoming_msg (void *cls,
3480                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
3481 {
3482   struct TransportClient *tc = cls;
3483
3484   if (CT_COMMUNICATOR != tc->type)
3485   {
3486     GNUNET_break (0);
3487     return GNUNET_SYSERR;
3488   }
3489   GNUNET_MQ_check_boxed_message (im);
3490   return GNUNET_OK;
3491 }
3492
3493
3494 /**
3495  * Incoming meessage.  Process the request.
3496  *
3497  * @param im the send message that was received
3498  */
3499 static void
3500 handle_incoming_msg (void *cls,
3501                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
3502 {
3503   struct TransportClient *tc = cls;
3504   struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
3505
3506   cmc->tc = tc;
3507   cmc->im = *im;
3508   demultiplex_with_cmc (cmc,
3509                         (const struct GNUNET_MessageHeader *) &im[1]);
3510 }
3511
3512
3513 /**
3514  * Given an inbound message @a msg from a communicator @a cmc,
3515  * demultiplex it based on the type calling the right handler.
3516  *
3517  * @param cmc context for demultiplexing
3518  * @param msg message to demultiplex
3519  */ 
3520 static void
3521 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3522                       const struct GNUNET_MessageHeader *msg)
3523 {
3524   struct GNUNET_MQ_MessageHandler handlers[] = {
3525     GNUNET_MQ_hd_var_size (fragment_box,
3526                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
3527                            struct TransportFragmentBox,
3528                            &cmc),
3529     GNUNET_MQ_hd_fixed_size (fragment_ack,
3530                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
3531                              struct TransportFragmentAckMessage,
3532                              &cmc),
3533     GNUNET_MQ_hd_var_size (reliability_box,
3534                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
3535                            struct TransportReliabilityBox,
3536                            &cmc),
3537     GNUNET_MQ_hd_fixed_size (reliability_ack,
3538                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
3539                              struct TransportReliabilityAckMessage,
3540                              &cmc),
3541     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
3542                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
3543                            struct TransportBackchannelEncapsulationMessage,
3544                            &cmc),
3545     GNUNET_MQ_hd_var_size (dv_learn,
3546                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
3547                            struct TransportDVLearn,
3548                            &cmc),
3549     GNUNET_MQ_hd_var_size (dv_box,
3550                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
3551                            struct TransportDVBox,
3552                            &cmc),
3553     GNUNET_MQ_handler_end()
3554   };
3555   int ret;
3556
3557   ret = GNUNET_MQ_handle_message (handlers,
3558                                   msg);
3559   if (GNUNET_SYSERR == ret)
3560   {
3561     GNUNET_break (0);
3562     GNUNET_SERVICE_client_drop (cmc->tc->client);
3563     GNUNET_free (cmc);
3564     return;
3565   }
3566   if (GNUNET_NO == ret)
3567   {
3568     /* unencapsulated 'raw' message */
3569     handle_raw_message (&cmc,
3570                         msg);
3571   }
3572 }
3573
3574
3575 /**
3576  * New queue became available.  Check message.
3577  *
3578  * @param cls the client
3579  * @param aqm the send message that was sent
3580  */
3581 static int
3582 check_add_queue_message (void *cls,
3583                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
3584 {
3585   struct TransportClient *tc = cls;
3586
3587   if (CT_COMMUNICATOR != tc->type)
3588   {
3589     GNUNET_break (0);
3590     return GNUNET_SYSERR;
3591   }
3592   GNUNET_MQ_check_zero_termination (aqm);
3593   return GNUNET_OK;
3594 }
3595
3596
3597 /**
3598  * Bandwidth tracker informs us that the delay until we should receive
3599  * more has changed.
3600  *
3601  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
3602  */
3603 static void
3604 tracker_update_in_cb (void *cls)
3605 {
3606   struct GNUNET_ATS_Session *queue = cls;
3607   struct GNUNET_TIME_Relative in_delay;
3608   unsigned int rsize;
3609   
3610   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
3611   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
3612                                                  rsize);
3613   // FIXME: how exactly do we do inbound flow control?
3614 }
3615
3616
3617 /**
3618  * If necessary, generates the UUID for a @a pm
3619  *
3620  * @param pm pending message to generate UUID for.
3621  */
3622 static void
3623 set_pending_message_uuid (struct PendingMessage *pm)
3624 {
3625   if (pm->msg_uuid_set)
3626     return;
3627   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
3628                               &pm->msg_uuid,
3629                               sizeof (pm->msg_uuid));
3630   pm->msg_uuid_set = GNUNET_YES;
3631 }
3632
3633
3634 /**
3635  * Fragment the given @a pm to the given @a mtu.  Adds 
3636  * additional fragments to the neighbour as well. If the
3637  * @a mtu is too small, generates and error for the @a pm
3638  * and returns NULL.
3639  *
3640  * @param pm pending message to fragment for transmission
3641  * @param mtu MTU to apply
3642  * @return new message to transmit
3643  */
3644 static struct PendingMessage *
3645 fragment_message (struct PendingMessage *pm,
3646                   uint16_t mtu)
3647 {
3648   struct PendingMessage *ff;
3649
3650   set_pending_message_uuid (pm);
3651   
3652   /* This invariant is established in #handle_add_queue_message() */
3653   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
3654
3655   /* select fragment for transmission, descending the tree if it has
3656      been expanded until we are at a leaf or at a fragment that is small enough */
3657   ff = pm;
3658   while ( ( (ff->bytes_msg > mtu) ||
3659             (pm == ff) ) &&
3660           (ff->frag_off == ff->bytes_msg) &&
3661           (NULL != ff->head_frag) )
3662   {
3663     ff = ff->head_frag; /* descent into fragmented fragments */
3664   }
3665
3666   if ( ( (ff->bytes_msg > mtu) ||
3667          (pm == ff) ) &&
3668        (pm->frag_off < pm->bytes_msg) )
3669   {
3670     /* Did not yet calculate all fragments, calculate next fragment */
3671     struct PendingMessage *frag;
3672     struct TransportFragmentBox tfb;
3673     const char *orig;
3674     char *msg;
3675     uint16_t fragmax;
3676     uint16_t fragsize;
3677     uint16_t msize;
3678     uint16_t xoff = 0;
3679
3680     orig = (const char *) &ff[1];
3681     msize = ff->bytes_msg;
3682     if (pm != ff)
3683     {
3684       const struct TransportFragmentBox *tfbo;
3685
3686       tfbo = (const struct TransportFragmentBox *) orig;
3687       orig += sizeof (struct TransportFragmentBox);
3688       msize -= sizeof (struct TransportFragmentBox);
3689       xoff = ntohs (tfbo->frag_off);
3690     }
3691     fragmax = mtu - sizeof (struct TransportFragmentBox);
3692     fragsize = GNUNET_MIN (msize - ff->frag_off,
3693                            fragmax);
3694     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
3695                           sizeof (struct TransportFragmentBox) +
3696                           fragsize);
3697     frag->target = pm->target;
3698     frag->frag_parent = ff;
3699     frag->timeout = pm->timeout;
3700     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
3701     frag->pmt = PMT_FRAGMENT_BOX;
3702     msg = (char *) &frag[1];
3703     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
3704     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
3705                              fragsize);
3706     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
3707     tfb.msg_uuid = pm->msg_uuid;
3708     tfb.frag_off = htons (ff->frag_off + xoff);
3709     tfb.msg_size = htons (pm->bytes_msg);
3710     memcpy (msg,
3711             &tfb,
3712             sizeof (tfb));
3713     memcpy (&msg[sizeof (tfb)],
3714             &orig[ff->frag_off],
3715             fragsize);
3716     GNUNET_CONTAINER_MDLL_insert (frag,
3717                                   ff->head_frag,
3718                                   ff->tail_frag,
3719                                   frag);
3720     ff->frag_off += fragsize;
3721     ff = frag;
3722   }
3723
3724   /* Move head to the tail and return it */
3725   GNUNET_CONTAINER_MDLL_remove (frag,
3726                                 ff->frag_parent->head_frag,
3727                                 ff->frag_parent->tail_frag,
3728                                 ff);
3729   GNUNET_CONTAINER_MDLL_insert_tail (frag,
3730                                      ff->frag_parent->head_frag,
3731                                      ff->frag_parent->tail_frag,
3732                                      ff);
3733   return ff;
3734 }
3735
3736
3737 /**
3738  * Reliability-box the given @a pm. On error (can there be any), NULL
3739  * may be returned, otherwise the "replacement" for @a pm (which
3740  * should then be added to the respective neighbour's queue instead of
3741  * @a pm).  If the @a pm is already fragmented or reliability boxed,
3742  * or itself an ACK, this function simply returns @a pm.
3743  *
3744  * @param pm pending message to box for transmission over unreliabile queue
3745  * @return new message to transmit
3746  */
3747 static struct PendingMessage *
3748 reliability_box_message (struct PendingMessage *pm)
3749 {
3750   struct TransportReliabilityBox rbox;
3751   struct PendingMessage *bpm;
3752   char *msg;
3753
3754   if (PMT_CORE != pm->pmt)
3755     return pm;  /* already fragmented or reliability boxed, or control message: do nothing */
3756   if (NULL != pm->bpm)
3757     return pm->bpm; /* already computed earlier: do nothing */
3758   GNUNET_assert (NULL == pm->head_frag);
3759   if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX) 
3760   {
3761     /* failed hard */
3762     GNUNET_break (0);
3763     client_send_response (pm,
3764                           GNUNET_NO,
3765                           0);
3766     return NULL;
3767   }
3768   bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
3769                        sizeof (rbox) + 
3770                        pm->bytes_msg);
3771   bpm->target = pm->target;
3772   bpm->frag_parent = pm;
3773   GNUNET_CONTAINER_MDLL_insert (frag,
3774                                 pm->head_frag,
3775                                 pm->tail_frag,
3776                                 bpm);
3777   bpm->timeout = pm->timeout;
3778   bpm->pmt = PMT_RELIABILITY_BOX;
3779   bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
3780   set_pending_message_uuid (bpm);
3781   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
3782   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
3783   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
3784   rbox.msg_uuid = pm->msg_uuid;
3785   msg = (char *) &bpm[1];
3786   memcpy (msg,
3787           &rbox,
3788           sizeof (rbox));
3789   memcpy (&msg[sizeof (rbox)],
3790           &pm[1],
3791           pm->bytes_msg);
3792   pm->bpm = bpm;
3793   return bpm;
3794 }
3795
3796
3797 /**
3798  * We believe we are ready to transmit a message on a queue. Double-checks
3799  * with the queue's "tracker_out" and then gives the message to the 
3800  * communicator for transmission (updating the tracker, and re-scheduling
3801  * itself if applicable).  
3802  *
3803  * @param cls the `struct GNUNET_ATS_Session` to process transmissions for
3804  */ 
3805 static void
3806 transmit_on_queue (void *cls)
3807 {
3808   struct GNUNET_ATS_Session *queue = cls;
3809   struct Neighbour *n = queue->neighbour;
3810   struct QueueEntry *qe;
3811   struct PendingMessage *pm;
3812   struct PendingMessage *s;
3813   uint32_t overhead;
3814   struct GNUNET_TRANSPORT_SendMessageTo *smt;
3815   struct GNUNET_MQ_Envelope *env;
3816
3817   queue->transmit_task = NULL;
3818   if (NULL == (pm = n->pending_msg_head))
3819   {
3820     /* no message pending, nothing to do here! */
3821     return; 
3822   }
3823   schedule_transmit_on_queue (queue);
3824   if (NULL != queue->transmit_task)
3825     return; /* do it later */
3826   overhead = 0;
3827   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3828     overhead += sizeof (struct TransportReliabilityBox);
3829   s = pm;
3830   if ( ( (0 != queue->mtu) &&
3831          (pm->bytes_msg + overhead > queue->mtu) ) ||
3832        (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
3833        (NULL != pm->head_frag /* fragments already exist, should
3834                                  respect that even if MTU is 0 for
3835                                  this queue */) )
3836     s = fragment_message (s,
3837                           (0 == queue->mtu)
3838                           ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
3839                           : queue->mtu);
3840   if (NULL == s)
3841   {
3842     /* Fragmentation failed, try next message... */
3843     schedule_transmit_on_queue (queue);
3844     return;
3845   }
3846   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3847     s = reliability_box_message (s);
3848   if (NULL == s)
3849   {
3850     /* Reliability boxing failed, try next message... */
3851     schedule_transmit_on_queue (queue);
3852     return;
3853   }
3854
3855   /* Pass 's' for transission to the communicator */
3856   qe = GNUNET_new (struct QueueEntry);
3857   qe->mid = queue->mid_gen++;
3858   qe->session = queue;
3859   // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
3860   GNUNET_CONTAINER_DLL_insert (queue->queue_head,
3861                                queue->queue_tail,
3862                                qe);
3863   env = GNUNET_MQ_msg_extra (smt,
3864                              s->bytes_msg,
3865                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
3866   smt->qid = queue->qid;
3867   smt->mid = qe->mid;
3868   smt->receiver = n->pid;
3869   memcpy (&smt[1],
3870           &s[1],
3871           s->bytes_msg);
3872   GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3873   queue->queue_length++;
3874   queue->tc->details.communicator.total_queue_length++;
3875   GNUNET_MQ_send (queue->tc->mq,
3876                   env);
3877   
3878   // FIXME: do something similar to the logic below
3879   // in defragmentation / reliability ACK handling!
3880
3881   /* Check if this transmission somehow conclusively finished handing 'pm'
3882      even without any explicit ACKs */
3883   if ( (PMT_CORE == s->pmt) &&
3884        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
3885   {
3886     /* Full message sent, and over reliabile channel */
3887     client_send_response (pm,
3888                           GNUNET_YES,
3889                           pm->bytes_msg);
3890   }
3891   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
3892             (PMT_FRAGMENT_BOX == s->pmt) )
3893   {
3894     struct PendingMessage *pos;
3895     
3896     /* Fragment sent over reliabile channel */
3897     free_fragment_tree (s);
3898     pos = s->frag_parent;
3899     GNUNET_CONTAINER_MDLL_remove (frag,
3900                                   pos->head_frag,
3901                                   pos->tail_frag,
3902                                   s);
3903     GNUNET_free (s);
3904     /* check if subtree is done */
3905     while ( (NULL == pos->head_frag) &&
3906             (pos->frag_off == pos->bytes_msg) &&
3907             (pos != pm) )
3908     {
3909       s = pos;
3910       pos = s->frag_parent;
3911       GNUNET_CONTAINER_MDLL_remove (frag,
3912                                     pos->head_frag,
3913                                     pos->tail_frag,
3914                                     s);
3915       GNUNET_free (s);      
3916     }
3917     
3918     /* Was this the last applicable fragmment? */
3919     if ( (NULL == pm->head_frag) &&
3920          (pm->frag_off == pm->bytes_msg) )
3921       client_send_response (pm,
3922                             GNUNET_YES,
3923                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
3924   }
3925   else if (PMT_CORE != pm->pmt)
3926   {
3927     /* This was an acknowledgement of some type, always free */
3928     free_pending_message (pm);
3929   }
3930   else
3931   {
3932     /* message not finished, waiting for acknowledgement */
3933     struct Neighbour *neighbour = pm->target;
3934     /* Update time by which we might retransmit 's' based on queue
3935        characteristics (i.e. RTT); it takes one RTT for the message to
3936        arrive and the ACK to come back in the best case; but the other
3937        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
3938        retransmitting.  Note that in the future this heuristic should
3939        likely be improved further (measure RTT stability, consider
3940        message urgency and size when delaying ACKs, etc.) */
3941     s->next_attempt = GNUNET_TIME_relative_to_absolute
3942       (GNUNET_TIME_relative_multiply (queue->rtt,
3943                                       4));
3944     if (s == pm)
3945     {
3946       struct PendingMessage *pos;
3947
3948       /* re-insert sort in neighbour list */
3949       GNUNET_CONTAINER_MDLL_remove (neighbour,
3950                                     neighbour->pending_msg_head,
3951                                     neighbour->pending_msg_tail,
3952                                     pm);
3953       pos = neighbour->pending_msg_tail;
3954       while ( (NULL != pos) &&
3955               (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3956         pos = pos->prev_neighbour;
3957       GNUNET_CONTAINER_MDLL_insert_after (neighbour,
3958                                           neighbour->pending_msg_head,
3959                                           neighbour->pending_msg_tail,
3960                                           pos,
3961                                           pm);
3962     }
3963     else
3964     {
3965       /* re-insert sort in fragment list */
3966       struct PendingMessage *fp = s->frag_parent;
3967       struct PendingMessage *pos;
3968
3969       GNUNET_CONTAINER_MDLL_remove (frag,
3970                                     fp->head_frag,
3971                                     fp->tail_frag,
3972                                     s);
3973       pos = fp->tail_frag;
3974       while ( (NULL != pos) &&
3975               (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
3976         pos = pos->prev_frag;
3977       GNUNET_CONTAINER_MDLL_insert_after (frag,
3978                                           fp->head_frag,
3979                                           fp->tail_frag,
3980                                           pos,
3981                                           s);
3982     }
3983   }
3984   
3985   /* finally, re-schedule queue transmission task itself */
3986   schedule_transmit_on_queue (queue);
3987 }
3988
3989
3990 /**
3991  * Bandwidth tracker informs us that the delay until we
3992  * can transmit again changed.
3993  *
3994  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
3995  */
3996 static void
3997 tracker_update_out_cb (void *cls)
3998 {
3999   struct GNUNET_ATS_Session *queue = cls;
4000   struct Neighbour *n = queue->neighbour;
4001
4002   if (NULL == n->pending_msg_head)
4003   {
4004     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4005                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
4006                 queue->address);
4007     return; /* no message pending, nothing to do here! */
4008   }
4009   GNUNET_SCHEDULER_cancel (queue->transmit_task);
4010   queue->transmit_task = NULL;
4011   schedule_transmit_on_queue (queue);
4012 }
4013
4014
4015 /**
4016  * Bandwidth tracker informs us that excessive outbound bandwidth was
4017  * allocated which is not being used.
4018  *
4019  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
4020  */
4021 static void
4022 tracker_excess_out_cb (void *cls)
4023 {
4024   /* FIXME: trigger excess bandwidth report to core? Right now,
4025      this is done internally within transport_api2_core already,
4026      but we probably want to change the logic and trigger it 
4027      from here via a message instead! */
4028   /* TODO: maybe inform ATS at this point? */
4029   GNUNET_STATISTICS_update (GST_stats,
4030                             "# Excess outbound bandwidth reported",
4031                             1,
4032                             GNUNET_NO);    
4033 }
4034
4035
4036
4037 /**
4038  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
4039  * which is not being used.
4040  *
4041  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
4042  */
4043 static void
4044 tracker_excess_in_cb (void *cls)
4045 {
4046   /* TODO: maybe inform ATS at this point? */
4047   GNUNET_STATISTICS_update (GST_stats,
4048                             "# Excess inbound bandwidth reported",
4049                             1,
4050                             GNUNET_NO);    
4051 }
4052
4053
4054 /**
4055  * New queue became available.  Process the request.
4056  *
4057  * @param cls the client
4058  * @param aqm the send message that was sent
4059  */
4060 static void
4061 handle_add_queue_message (void *cls,
4062                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
4063 {
4064   struct TransportClient *tc = cls;
4065   struct GNUNET_ATS_Session *queue;
4066   struct Neighbour *neighbour;
4067   const char *addr;
4068   uint16_t addr_len;
4069
4070   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
4071   {
4072     /* MTU so small as to be useless for transmissions,
4073        required for #fragment_message()! */
4074     GNUNET_break_op (0);
4075     GNUNET_SERVICE_client_drop (tc->client);
4076     return;
4077   }
4078   neighbour = lookup_neighbour (&aqm->receiver);
4079   if (NULL == neighbour)
4080   {
4081     neighbour = GNUNET_new (struct Neighbour);
4082     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
4083     neighbour->pid = aqm->receiver;
4084     GNUNET_assert (GNUNET_OK ==
4085                    GNUNET_CONTAINER_multipeermap_put (neighbours,
4086                                                       &neighbour->pid,
4087                                                       neighbour,
4088                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4089     cores_send_connect_info (&neighbour->pid,
4090                              GNUNET_BANDWIDTH_ZERO);
4091   }
4092   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
4093   addr = (const char *) &aqm[1];
4094
4095   queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
4096   queue->tc = tc;
4097   queue->address = (const char *) &queue[1];
4098   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
4099   queue->qid = aqm->qid;
4100   queue->mtu = ntohl (aqm->mtu);
4101   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
4102   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
4103   queue->neighbour = neighbour;
4104   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
4105                                   &tracker_update_in_cb,
4106                                   queue,
4107                                   GNUNET_BANDWIDTH_ZERO,
4108                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4109                                   &tracker_excess_in_cb,
4110                                   queue);
4111   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
4112                                   &tracker_update_out_cb,
4113                                   queue,
4114                                   GNUNET_BANDWIDTH_ZERO,
4115                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4116                                   &tracker_excess_out_cb,
4117                                   queue);
4118   memcpy (&queue[1],
4119           addr,
4120           addr_len);
4121   /* notify ATS about new queue */
4122   {
4123     struct GNUNET_ATS_Properties prop = {
4124       .delay = GNUNET_TIME_UNIT_FOREVER_REL,
4125       .mtu = queue->mtu,
4126       .nt = queue->nt,
4127       .cc = tc->details.communicator.cc
4128     };
4129     
4130     queue->sr = GNUNET_ATS_session_add (ats,
4131                                         &neighbour->pid,
4132                                         queue->address,
4133                                         queue,
4134                                         &prop);
4135     if  (NULL == queue->sr)
4136     {
4137       /* This can only happen if the 'address' was way too long for ATS
4138          (approaching 64k in strlen()!). In this case, the communicator
4139          must be buggy and we drop it. */
4140       GNUNET_break (0);
4141       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
4142       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
4143       GNUNET_free (queue);
4144       if (NULL == neighbour->session_head)
4145       {
4146         cores_send_disconnect_info (&neighbour->pid);
4147         free_neighbour (neighbour);
4148       }
4149       GNUNET_SERVICE_client_drop (tc->client);
4150       return;
4151     }
4152   }
4153   /* notify monitors about new queue */
4154   {
4155     struct MonitorEvent me = {
4156       .rtt = queue->rtt,
4157       .cs = queue->cs
4158     };
4159
4160     notify_monitors (&neighbour->pid,
4161                      queue->address,
4162                      queue->nt,
4163                      &me);
4164   }
4165   GNUNET_CONTAINER_MDLL_insert (neighbour,
4166                                 neighbour->session_head,
4167                                 neighbour->session_tail,
4168                                 queue);
4169   GNUNET_CONTAINER_MDLL_insert (client,
4170                                 tc->details.communicator.session_head,
4171                                 tc->details.communicator.session_tail,
4172                                 queue);
4173   GNUNET_SERVICE_client_continue (tc->client);
4174 }
4175
4176
4177 /**
4178  * Queue to a peer went down.  Process the request.
4179  *
4180  * @param cls the client
4181  * @param dqm the send message that was sent
4182  */
4183 static void
4184 handle_del_queue_message (void *cls,
4185                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
4186 {
4187   struct TransportClient *tc = cls;
4188
4189   if (CT_COMMUNICATOR != tc->type)
4190   {
4191     GNUNET_break (0);
4192     GNUNET_SERVICE_client_drop (tc->client);
4193     return;
4194   }
4195   for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4196        NULL != session;
4197        session = session->next_client)
4198   {
4199     struct Neighbour *neighbour = session->neighbour;
4200
4201     if ( (dqm->qid != session->qid) ||
4202          (0 != memcmp (&dqm->receiver,
4203                        &neighbour->pid,
4204                        sizeof (struct GNUNET_PeerIdentity))) )
4205       continue;
4206     free_session (session);
4207     GNUNET_SERVICE_client_continue (tc->client);
4208     return;
4209   }
4210   GNUNET_break (0);
4211   GNUNET_SERVICE_client_drop (tc->client);
4212 }
4213
4214
4215 /**
4216  * Message was transmitted.  Process the request.
4217  *
4218  * @param cls the client
4219  * @param sma the send message that was sent
4220  */
4221 static void
4222 handle_send_message_ack (void *cls,
4223                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
4224 {
4225   struct TransportClient *tc = cls;
4226   struct QueueEntry *queue;
4227   
4228   if (CT_COMMUNICATOR != tc->type)
4229   {
4230     GNUNET_break (0);
4231     GNUNET_SERVICE_client_drop (tc->client);
4232     return;
4233   }
4234
4235   /* find our queue entry matching the ACK */
4236   queue = NULL;
4237   for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4238        NULL != session;
4239        session = session->next_client)
4240   {
4241     if (0 != memcmp (&session->neighbour->pid,
4242                      &sma->receiver,
4243                      sizeof (struct GNUNET_PeerIdentity)))
4244       continue;
4245     for (struct QueueEntry *qe = session->queue_head;
4246          NULL != qe;
4247          qe = qe->next)
4248     {
4249       if (qe->mid != sma->mid)
4250         continue;
4251       queue = qe;
4252       break;
4253     }
4254     break;      
4255   }
4256   if (NULL == queue)
4257   {
4258     /* this should never happen */
4259     GNUNET_break (0); 
4260     GNUNET_SERVICE_client_drop (tc->client);
4261     return;
4262   }
4263   GNUNET_CONTAINER_DLL_remove (queue->session->queue_head,
4264                                queue->session->queue_tail,
4265                                queue);
4266   queue->session->queue_length--;
4267   tc->details.communicator.total_queue_length--;
4268   GNUNET_SERVICE_client_continue (tc->client);
4269
4270   /* if applicable, resume transmissions that waited on ACK */
4271   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
4272   {
4273     /* Communicator dropped below threshold, resume all queues */
4274     GNUNET_STATISTICS_update (GST_stats,
4275                               "# Transmission throttled due to communicator queue limit",
4276                               -1,
4277                               GNUNET_NO);
4278     for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
4279          NULL != session;
4280          session = session->next_client)
4281       schedule_transmit_on_queue (session);
4282   }
4283   else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
4284   {
4285     /* queue dropped below threshold; only resume this one queue */
4286     GNUNET_STATISTICS_update (GST_stats,
4287                               "# Transmission throttled due to session queue limit",
4288                               -1,
4289                               GNUNET_NO);    
4290     schedule_transmit_on_queue (queue->session);
4291   }
4292  
4293   /* TODO: we also should react on the status! */
4294   // FIXME: this probably requires queue->pm = s assignment!
4295   // FIXME: react to communicator status about transmission request. We got:
4296   sma->status; // OK success, SYSERR failure
4297
4298   GNUNET_free (queue);
4299 }
4300
4301
4302 /**
4303  * Iterator telling new MONITOR client about all existing
4304  * queues to peers.
4305  *
4306  * @param cls the new `struct TransportClient`
4307  * @param pid a connected peer
4308  * @param value the `struct Neighbour` with more information
4309  * @return #GNUNET_OK (continue to iterate)
4310  */
4311 static int
4312 notify_client_queues (void *cls,
4313                       const struct GNUNET_PeerIdentity *pid,
4314                       void *value)
4315 {
4316   struct TransportClient *tc = cls;
4317   struct Neighbour *neighbour = value;
4318
4319   GNUNET_assert (CT_MONITOR == tc->type);
4320   for (struct GNUNET_ATS_Session *q = neighbour->session_head;
4321        NULL != q;
4322        q = q->next_neighbour)
4323   {
4324     struct MonitorEvent me = {
4325       .rtt = q->rtt,
4326       .cs = q->cs,
4327       .num_msg_pending = q->num_msg_pending,
4328       .num_bytes_pending = q->num_bytes_pending
4329     };
4330
4331     notify_monitor (tc,
4332                     pid,
4333                     q->address,
4334                     q->nt,
4335                     &me);
4336   }
4337   return GNUNET_OK;
4338 }
4339
4340
4341 /**
4342  * Initialize a monitor client.
4343  *
4344  * @param cls the client
4345  * @param start the start message that was sent
4346  */
4347 static void
4348 handle_monitor_start (void *cls,
4349                       const struct GNUNET_TRANSPORT_MonitorStart *start)
4350 {
4351   struct TransportClient *tc = cls;
4352
4353   if (CT_NONE != tc->type)
4354   {
4355     GNUNET_break (0);
4356     GNUNET_SERVICE_client_drop (tc->client);
4357     return;
4358   }
4359   tc->type = CT_MONITOR;
4360   tc->details.monitor.peer = start->peer;
4361   tc->details.monitor.one_shot = ntohl (start->one_shot);
4362   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4363                                          &notify_client_queues,
4364                                          tc);
4365   GNUNET_SERVICE_client_mark_monitor (tc->client);
4366   GNUNET_SERVICE_client_continue (tc->client);
4367 }
4368
4369
4370 /**
4371  * Signature of a function called by ATS with the current bandwidth
4372  * allocation to be used as determined by ATS.
4373  *
4374  * @param cls closure, NULL
4375  * @param session session this is about
4376  * @param bandwidth_out assigned outbound bandwidth for the connection,
4377  *        0 to signal disconnect
4378  * @param bandwidth_in assigned inbound bandwidth for the connection,
4379  *        0 to signal disconnect
4380  */
4381 static void
4382 ats_allocation_cb (void *cls,
4383                    struct GNUNET_ATS_Session *session,
4384                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
4385                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
4386 {
4387   (void) cls;
4388   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
4389                                          bandwidth_out);
4390   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
4391                                          bandwidth_in);
4392 }
4393
4394
4395 /**
4396  * Find transport client providing communication service
4397  * for the protocol @a prefix.
4398  *
4399  * @param prefix communicator name
4400  * @return NULL if no such transport client is available
4401  */
4402 static struct TransportClient *
4403 lookup_communicator (const char *prefix)
4404 {
4405   for (struct TransportClient *tc = clients_head;
4406        NULL != tc;
4407        tc = tc->next)
4408   {
4409     if (CT_COMMUNICATOR != tc->type)
4410       continue;
4411     if (0 == strcmp (prefix,
4412                      tc->details.communicator.address_prefix))
4413       return tc;
4414   }
4415   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4416               "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
4417               prefix);
4418   return NULL;
4419 }
4420
4421
4422 /**
4423  * Signature of a function called by ATS suggesting transport to
4424  * try connecting with a particular address.
4425  *
4426  * @param cls closure, NULL
4427  * @param pid target peer
4428  * @param address the address to try
4429  */
4430 static void
4431 ats_suggestion_cb (void *cls,
4432                    const struct GNUNET_PeerIdentity *pid,
4433                    const char *address)
4434 {
4435   static uint32_t idgen;
4436   struct TransportClient *tc;
4437   char *prefix;
4438   struct GNUNET_TRANSPORT_CreateQueue *cqm;
4439   struct GNUNET_MQ_Envelope *env;
4440   size_t alen;
4441
4442   (void) cls;
4443   prefix = GNUNET_HELLO_address_to_prefix (address);
4444   if (NULL == prefix)
4445   {
4446     GNUNET_break (0); /* ATS gave invalid address!? */
4447     return;
4448   }
4449   tc = lookup_communicator (prefix);
4450   if (NULL == tc)
4451   {
4452     GNUNET_STATISTICS_update (GST_stats,
4453                               "# ATS suggestions ignored due to missing communicator",
4454                               1,
4455                               GNUNET_NO);    
4456     return;
4457   }
4458   /* forward suggestion for queue creation to communicator */
4459   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4460               "Request #%u for `%s' communicator to create queue to `%s'\n",
4461               (unsigned int) idgen,
4462               prefix,
4463               address);
4464   alen = strlen (address) + 1;
4465   env = GNUNET_MQ_msg_extra (cqm,
4466                              alen,
4467                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
4468   cqm->request_id = htonl (idgen++);
4469   cqm->receiver = *pid;
4470   memcpy (&cqm[1],
4471           address,
4472           alen);
4473   GNUNET_MQ_send (tc->mq,
4474                   env);
4475 }
4476
4477
4478 /**
4479  * Communicator tells us that our request to create a queue "worked", that
4480  * is setting up the queue is now in process.
4481  *
4482  * @param cls the `struct TransportClient`
4483  * @param cqr confirmation message
4484  */ 
4485 static void
4486 handle_queue_create_ok (void *cls,
4487                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4488 {
4489   struct TransportClient *tc = cls;
4490
4491   if (CT_COMMUNICATOR != tc->type)
4492   {
4493     GNUNET_break (0);
4494     GNUNET_SERVICE_client_drop (tc->client);
4495     return;
4496   }
4497   GNUNET_STATISTICS_update (GST_stats,
4498                             "# ATS suggestions succeeded at communicator",
4499                             1,
4500                             GNUNET_NO);
4501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4502               "Request #%u for communicator to create queue succeeded\n",
4503               (unsigned int) ntohs (cqr->request_id));
4504   GNUNET_SERVICE_client_continue (tc->client);    
4505 }
4506
4507
4508 /**
4509  * Communicator tells us that our request to create a queue failed. This usually
4510  * indicates that the provided address is simply invalid or that the communicator's
4511  * resources are exhausted.
4512  *
4513  * @param cls the `struct TransportClient`
4514  * @param cqr failure message
4515  */ 
4516 static void
4517 handle_queue_create_fail (void *cls,
4518                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4519 {
4520   struct TransportClient *tc = cls;
4521
4522   if (CT_COMMUNICATOR != tc->type)
4523   {
4524     GNUNET_break (0);
4525     GNUNET_SERVICE_client_drop (tc->client);
4526     return;
4527   }
4528   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4529               "Request #%u for communicator to create queue failed\n",
4530               (unsigned int) ntohs (cqr->request_id));
4531   GNUNET_STATISTICS_update (GST_stats,
4532                             "# ATS suggestions failed in queue creation at communicator",
4533                             1,
4534                             GNUNET_NO);
4535   GNUNET_SERVICE_client_continue (tc->client);    
4536 }
4537
4538
4539 /**
4540  * Free neighbour entry.
4541  *
4542  * @param cls NULL
4543  * @param pid unused
4544  * @param value a `struct Neighbour`
4545  * @return #GNUNET_OK (always)
4546  */
4547 static int
4548 free_neighbour_cb (void *cls,
4549                    const struct GNUNET_PeerIdentity *pid,
4550                    void *value)
4551 {
4552   struct Neighbour *neighbour = value;
4553
4554   (void) cls;
4555   (void) pid;
4556   GNUNET_break (0); // should this ever happen?
4557   free_neighbour (neighbour);
4558
4559   return GNUNET_OK;
4560 }
4561
4562
4563 /**
4564  * Free DV route entry.
4565  *
4566  * @param cls NULL
4567  * @param pid unused
4568  * @param value a `struct DistanceVector`
4569  * @return #GNUNET_OK (always)
4570  */
4571 static int
4572 free_dv_routes_cb (void *cls,
4573                    const struct GNUNET_PeerIdentity *pid,
4574                    void *value)
4575 {
4576   struct DistanceVector *dv = value;
4577
4578   (void) cls;
4579   (void) pid;
4580   free_dv_route (dv);
4581
4582   return GNUNET_OK;
4583 }
4584
4585
4586 /**
4587  * Free ephemeral entry.
4588  *
4589  * @param cls NULL
4590  * @param pid unused
4591  * @param value a `struct Neighbour`
4592  * @return #GNUNET_OK (always)
4593  */
4594 static int
4595 free_ephemeral_cb (void *cls,
4596                    const struct GNUNET_PeerIdentity *pid,
4597                    void *value)
4598 {
4599   struct EphemeralCacheEntry *ece = value;
4600
4601   (void) cls;
4602   (void) pid;
4603   free_ephemeral (ece);
4604   return GNUNET_OK;
4605 }
4606
4607
4608 /**
4609  * Function called when the service shuts down.  Unloads our plugins
4610  * and cancels pending validations.
4611  *
4612  * @param cls closure, unused
4613  */
4614 static void
4615 do_shutdown (void *cls)
4616 {
4617   (void) cls;
4618
4619   if (NULL != ephemeral_task)
4620   {
4621     GNUNET_SCHEDULER_cancel (ephemeral_task);
4622     ephemeral_task = NULL;
4623   }
4624   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4625                                          &free_neighbour_cb,
4626                                          NULL);
4627   if (NULL != ats)
4628   {
4629     GNUNET_ATS_transport_done (ats);
4630     ats = NULL;
4631   }
4632   if (NULL != peerstore)
4633   {
4634     GNUNET_PEERSTORE_disconnect (peerstore,
4635                                  GNUNET_NO);
4636     peerstore = NULL;
4637   }
4638   if (NULL != GST_stats)
4639   {
4640     GNUNET_STATISTICS_destroy (GST_stats,
4641                                GNUNET_NO);
4642     GST_stats = NULL;
4643   }
4644   if (NULL != GST_my_private_key)
4645   {
4646     GNUNET_free (GST_my_private_key);
4647     GST_my_private_key = NULL;
4648   }
4649   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
4650   neighbours = NULL;
4651   GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
4652                                          &free_dv_routes_cb,
4653                                          NULL);
4654   GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
4655   dv_routes = NULL;
4656   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
4657                                          &free_ephemeral_cb,
4658                                          NULL);
4659   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
4660   ephemeral_map = NULL;
4661   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
4662   ephemeral_heap = NULL;
4663 }
4664
4665
4666 /**
4667  * Initiate transport service.
4668  *
4669  * @param cls closure
4670  * @param c configuration to use
4671  * @param service the initialized service
4672  */
4673 static void
4674 run (void *cls,
4675      const struct GNUNET_CONFIGURATION_Handle *c,
4676      struct GNUNET_SERVICE_Handle *service)
4677 {
4678   (void) cls;
4679   /* setup globals */
4680   GST_cfg = c;
4681   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
4682                                                      GNUNET_YES);
4683   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
4684                                                     GNUNET_YES);
4685   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
4686                                                         GNUNET_YES);
4687   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4688   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
4689   if (NULL == GST_my_private_key)
4690   {
4691     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4692                 _("Transport service is lacking key configuration settings. Exiting.\n"));
4693     GNUNET_SCHEDULER_shutdown ();
4694     return;
4695   }
4696   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
4697                                       &GST_my_identity.public_key);
4698   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
4699              "My identity is `%s'\n",
4700              GNUNET_i2s_full (&GST_my_identity));
4701   GST_stats = GNUNET_STATISTICS_create ("transport",
4702                                         GST_cfg);
4703   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
4704                                  NULL);
4705   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
4706   if (NULL == peerstore)
4707   {
4708     GNUNET_break (0);
4709     GNUNET_SCHEDULER_shutdown ();
4710     return;
4711   }
4712   ats = GNUNET_ATS_transport_init (GST_cfg,
4713                                    &ats_allocation_cb,
4714                                    NULL,
4715                                    &ats_suggestion_cb,
4716                                    NULL);
4717   if (NULL == ats)
4718   {
4719     GNUNET_break (0);
4720     GNUNET_SCHEDULER_shutdown ();
4721     return;
4722   }
4723 }
4724
4725
4726 /**
4727  * Define "main" method using service macro.
4728  */
4729 GNUNET_SERVICE_MAIN
4730 ("transport",
4731  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
4732  &run,
4733  &client_connect_cb,
4734  &client_disconnect_cb,
4735  NULL,
4736  /* communication with core */
4737  GNUNET_MQ_hd_fixed_size (client_start,
4738                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
4739                           struct StartMessage,
4740                           NULL),
4741  GNUNET_MQ_hd_var_size (client_send,
4742                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
4743                         struct OutboundMessage,
4744                         NULL),
4745  /* communication with communicators */
4746  GNUNET_MQ_hd_var_size (communicator_available,
4747                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
4748                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
4749                         NULL),
4750  GNUNET_MQ_hd_var_size (communicator_backchannel,
4751                         GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
4752                         struct GNUNET_TRANSPORT_CommunicatorBackchannel,
4753                         NULL),
4754  GNUNET_MQ_hd_var_size (add_address,
4755                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
4756                         struct GNUNET_TRANSPORT_AddAddressMessage,
4757                         NULL),
4758  GNUNET_MQ_hd_fixed_size (del_address,
4759                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
4760                           struct GNUNET_TRANSPORT_DelAddressMessage,
4761                           NULL),
4762  GNUNET_MQ_hd_var_size (incoming_msg,
4763                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
4764                         struct GNUNET_TRANSPORT_IncomingMessage,
4765                         NULL),
4766  GNUNET_MQ_hd_fixed_size (queue_create_ok,
4767                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
4768                           struct GNUNET_TRANSPORT_CreateQueueResponse,
4769                           NULL),
4770  GNUNET_MQ_hd_fixed_size (queue_create_fail,
4771                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
4772                           struct GNUNET_TRANSPORT_CreateQueueResponse,
4773                           NULL),
4774  GNUNET_MQ_hd_var_size (add_queue_message,
4775                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
4776                         struct GNUNET_TRANSPORT_AddQueueMessage,
4777                         NULL),
4778  GNUNET_MQ_hd_fixed_size (del_queue_message,
4779                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
4780                           struct GNUNET_TRANSPORT_DelQueueMessage,
4781                           NULL),
4782  GNUNET_MQ_hd_fixed_size (send_message_ack,
4783                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
4784                           struct GNUNET_TRANSPORT_SendMessageToAck,
4785                           NULL),
4786  /* communication with monitors */
4787  GNUNET_MQ_hd_fixed_size (monitor_start,
4788                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
4789                           struct GNUNET_TRANSPORT_MonitorStart,
4790                           NULL),
4791  GNUNET_MQ_handler_end ());
4792
4793
4794 /* end of file gnunet-service-transport.c */