implement basic reliability ACK handling
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement next:
36  * - backchannel message encryption & decryption
37  * - DV data structures:
38  *   + using DV routes!
39  *     - handling of DV-boxed messages that need to be forwarded
40  *     - route_message implementation, including using DV data structures
41  *       (but not when routing certain message types, like DV learn,
42  *        MUST pay attention to content here -- or pass extra flags?)
43  * - retransmission
44  * - track RTT, distance, loss, etc. => requires extra data structures!
45  *
46  * Later:
47  * - change transport-core API to provide proper flow control in both
48  *   directions, allow multiple messages per peer simultaneously (tag
49  *   confirmations with unique message ID), and replace quota-out with
50  *   proper flow control;
51  * - if messages are below MTU, consider adding ACKs and other stuff
52  *   (requires planning at receiver, and additional MST-style demultiplex
53  *    at receiver!)
54  * - could avoid copying body of message into each fragment and keep
55  *   fragments as just pointers into the original message and only
56  *   fully build fragments just before transmission (optimization, should
57  *   reduce CPU and memory use)
58  *
59  * Optimizations:
60  * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs
61  *   against our pending message queue (requires additional per neighbour
62  *   hash map to be maintained, avoids possible linear scan on pending msgs)
63  *
64  * Design realizations / discussion:
65  * - communicators do flow control by calling MQ "notify sent"
66  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
67  *   or explicitly via backchannel FC ACKs.  As long as the
68  *   channel is not full, they may 'notify sent' even if the other
69  *   peer has not yet confirmed receipt. The other peer confirming
70  *   is _only_ for FC, not for more reliable transmission; reliable
71  *   transmission (i.e. of fragments) is left to _transport_.
72  * - ACKs sent back in uni-directional communicators are done via
73  *   the background channel API; here transport _may_ initially
74  *   broadcast (with bounded # hops) if no path is known;
75  * - transport should _integrate_ DV-routing and build a view of
76  *   the network; then background channel traffic can be
77  *   routed via DV as well as explicit "DV" traffic.
78  * - background channel is also used for ACKs and NAT traversal support
79  * - transport service is responsible for AEAD'ing the background
80  *   channel, timestamps and monotonic time are used against replay
81  *   of old messages -> peerstore needs to be supplied with
82  *   "latest timestamps seen" data
83  * - if transport implements DV, we likely need a 3rd peermap
84  *   in addition to ephemerals and (direct) neighbours
85  *   ==> check if stuff needs to be moved out of "Neighbour"
86  * - transport should encapsualte core-level messages and do its
87  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
88  *   for retransmission
89  */
90 #include "platform.h"
91 #include "gnunet_util_lib.h"
92 #include "gnunet_statistics_service.h"
93 #include "gnunet_transport_monitor_service.h"
94 #include "gnunet_peerstore_service.h"
95 #include "gnunet_hello_lib.h"
96 #include "gnunet_signatures.h"
97 #include "transport.h"
98
99
100 /**
101  * What is the size we assume for a read operation in the
102  * absence of an MTU for the purpose of flow control?
103  */
104 #define IN_PACKET_SIZE_WITHOUT_MTU 128
105
106 /**
107  * Minimum number of hops we should forward DV learn messages
108  * even if they are NOT useful for us in hope of looping
109  * back to the initiator?
110  *
111  * FIXME: allow initiator some control here instead?
112  */
113 #define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
114
115 /**
116  * Maximum DV distance allowed ever.
117  */
118 #define MAX_DV_HOPS_ALLOWED 16
119
120 /**
121  * Maximum number of DV learning activities we may
122  * have pending at the same time.
123  */
124 #define MAX_DV_LEARN_PENDING 64
125
126 /**
127  * Maximum number of DV paths we keep simultaneously to the same target.
128  */
129 #define MAX_DV_PATHS_TO_TARGET 3
130
131 /**
132  * If a queue delays the next message by more than this number
133  * of seconds we log a warning. Note: this is for testing,
134  * the value chosen here might be too aggressively low!
135  */
136 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
137
138 /**
139  * We only consider queues as "quality" connections when
140  * suppressing the generation of DV initiation messages if
141  * the latency of the queue is below this threshold.
142  */
143 #define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
144
145 /**
146  * How long do we consider a DV path valid if we see no
147  * further updates on it? Note: the value chosen here might be too low!
148  */
149 #define DV_PATH_VALIDITY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
150
151 /**
152  * How long before paths expire would we like to (re)discover DV paths? Should
153  * be below #DV_PATH_VALIDITY_TIMEOUT.
154  */
155 #define DV_PATH_DISCOVERY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
156
157 /**
158  * How long are ephemeral keys valid?
159  */
160 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
161
162 /**
163  * How long do we keep partially reassembled messages around before giving up?
164  */
165 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
166
167 /**
168  * What is the fastest rate at which we send challenges *if* we keep learning
169  * an address (gossip, DHT, etc.)?
170  */
171 #define FAST_VALIDATION_CHALLENGE_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
172
173 /**
174  * What is the slowest rate at which we send challenges?
175  */
176 #define MAX_VALIDATION_CHALLENGE_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
177
178 /**
179  * What is the non-randomized base frequency at which we
180  * would initiate DV learn messages?
181  */
182 #define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
183
184 /**
185  * How many good connections (confirmed, bi-directional, not DV)
186  * do we need to have to suppress initiating DV learn messages?
187  */
188 #define DV_LEARN_QUALITY_THRESHOLD 100
189
190 /**
191  * When do we forget an invalid address for sure?
192  */
193 #define MAX_ADDRESS_VALID_UNTIL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
194 /**
195  * How long do we consider an address valid if we just checked?
196  */
197 #define ADDRESS_VALIDATION_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
198
199 /**
200  * What is the maximum frequency at which we do address validation?
201  * A random value between 0 and this value is added when scheduling
202  * the #validation_task (both to ensure we do not validate too often,
203  * and to randomize a bit).
204  */
205 #define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
206
207 /**
208  * How many network RTTs before an address validation expires should we begin
209  * trying to revalidate? (Note that the RTT used here is the one that we
210  * experienced during the last validation, not necessarily the latest RTT
211  * observed).
212  */
213 #define VALIDATION_RTT_BUFFER_FACTOR 3
214
215 /**
216  * How many messages can we have pending for a given communicator
217  * process before we start to throttle that communicator?
218  *
219  * Used if a communicator might be CPU-bound and cannot handle the traffic.
220  */
221 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
222
223 /**
224  * How many messages can we have pending for a given queue (queue to
225  * a particular peer via a communicator) process before we start to
226  * throttle that queue?
227  */
228 #define QUEUE_LENGTH_LIMIT 32
229
230
231 GNUNET_NETWORK_STRUCT_BEGIN
232
233 /**
234  * Outer layer of an encapsulated backchannel message.
235  */
236 struct TransportBackchannelEncapsulationMessage
237 {
238   /**
239    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
240    */
241   struct GNUNET_MessageHeader header;
242
243   /**
244    * Distance the backchannel message has traveled, to be updated at
245    * each hop.  Used to bound the number of hops in case a backchannel
246    * message is broadcast and thus travels without routing
247    * information (during initial backchannel discovery).
248    */
249   uint32_t distance;
250
251   /**
252    * Target's peer identity (as backchannels may be transmitted
253    * indirectly, or even be broadcast).
254    */
255   struct GNUNET_PeerIdentity target;
256
257   /**
258    * Ephemeral key setup by the sender for @e target, used
259    * to encrypt the payload.
260    */
261   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
262
263   // FIXME: probably should add random IV here as well,
264   // especially if we re-use ephemeral keys!
265
266   /**
267    * HMAC over the ciphertext of the encrypted, variable-size
268    * body that follows.  Verified via DH of @e target and
269    * @e ephemeral_key
270    */
271   struct GNUNET_HashCode hmac;
272
273   /* Followed by encrypted, variable-size payload */
274 };
275
276
277 /**
278  * Body by which a peer confirms that it is using an ephemeral key.
279  */
280 struct EphemeralConfirmation
281 {
282
283   /**
284    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
285    */
286   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
287
288   /**
289    * How long is this signature over the ephemeral key valid?
290    * Note that the receiver MUST IGNORE the absolute time, and
291    * only interpret the value as a mononic time and reject
292    * "older" values than the last one observed.  Even with this,
293    * there is no real guarantee against replay achieved here,
294    * as the latest timestamp is not persisted.  This is
295    * necessary as we do not want to require synchronized
296    * clocks and may not have a bidirectional communication
297    * channel.  Communicators must protect against replay
298    * attacks when using backchannel communication!
299    */
300   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
301
302   /**
303    * Target's peer identity.
304    */
305   struct GNUNET_PeerIdentity target;
306
307   /**
308    * Ephemeral key setup by the sender for @e target, used
309    * to encrypt the payload.
310    */
311   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
312
313 };
314
315
316 /**
317  * Plaintext of the variable-size payload that is encrypted
318  * within a `struct TransportBackchannelEncapsulationMessage`
319  */
320 struct TransportBackchannelRequestPayload
321 {
322
323   /**
324    * Sender's peer identity.
325    */
326   struct GNUNET_PeerIdentity sender;
327
328   /**
329    * Signature of the sender over an
330    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
331    */
332   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
333
334   /**
335    * How long is this signature over the ephemeral key
336    * valid?
337    */
338   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
339
340   /**
341    * Current monotonic time of the sending transport service.  Used to
342    * detect replayed messages.  Note that the receiver should remember
343    * a list of the recently seen timestamps and only reject messages
344    * if the timestamp is in the list, or the list is "full" and the
345    * timestamp is smaller than the lowest in the list.  This list of
346    * timestamps per peer should be persisted to guard against replays
347    * after restarts.
348    */
349   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
350
351   /* Followed by a `struct GNUNET_MessageHeader` with a message
352      for a communicator */
353
354   /* Followed by a 0-termianted string specifying the name of
355      the communicator which is to receive the message */
356
357 };
358
359
360 /**
361  * Outer layer of an encapsulated unfragmented application message sent
362  * over an unreliable channel.
363  */
364 struct TransportReliabilityBox
365 {
366   /**
367    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
368    */
369   struct GNUNET_MessageHeader header;
370
371   /**
372    * Number of messages still to be sent before a commulative
373    * ACK is requested.  Zero if an ACK is requested immediately.
374    * In NBO.  Note that the receiver may send the ACK faster
375    * if it believes that is reasonable.
376    */
377   uint32_t ack_countdown GNUNET_PACKED;
378
379   /**
380    * Unique ID of the message used for signalling receipt of
381    * messages sent over possibly unreliable channels.  Should
382    * be a random.
383    */
384   struct GNUNET_ShortHashCode msg_uuid;
385 };
386
387
388 /**
389  * Confirmation that the receiver got a
390  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
391  * confirmation may be transmitted over a completely different queue,
392  * so ACKs are identified by a combination of PID of sender and
393  * message UUID, without the queue playing any role!
394  */
395 struct TransportReliabilityAckMessage
396 {
397   /**
398    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
399    */
400   struct GNUNET_MessageHeader header;
401
402   /**
403    * Reserved. Zero.
404    */
405   uint32_t reserved GNUNET_PACKED;
406
407   /**
408    * How long was the ACK delayed relative to the average time of
409    * receipt of the messages being acknowledged?  Used to calculate
410    * the average RTT by taking the receipt time of the ack minus the
411    * average transmission time of the sender minus this value.
412    */
413   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
414
415   /* followed by any number of `struct GNUNET_ShortHashCode`
416      messages providing ACKs */
417 };
418
419
420 /**
421  * Outer layer of an encapsulated fragmented application message.
422  */
423 struct TransportFragmentBox
424 {
425   /**
426    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
427    */
428   struct GNUNET_MessageHeader header;
429
430   /**
431    * Unique ID of this fragment (and fragment transmission!). Will
432    * change even if a fragement is retransmitted to make each
433    * transmission attempt unique! Should be incremented by one for
434    * each fragment transmission. If a client receives a duplicate
435    * fragment (same @e frag_off), it must send
436    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
437    */
438   uint32_t frag_uuid GNUNET_PACKED;
439
440   /**
441    * Original message ID for of the message that all the1
442    * fragments belong to.  Must be the same for all fragments.
443    */
444   struct GNUNET_ShortHashCode msg_uuid;
445
446   /**
447    * Offset of this fragment in the overall message.
448    */
449   uint16_t frag_off GNUNET_PACKED;
450
451   /**
452    * Total size of the message that is being fragmented.
453    */
454   uint16_t msg_size GNUNET_PACKED;
455
456 };
457
458
459 /**
460  * Outer layer of an fragmented application message sent over a queue
461  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
462  * received, the receiver has two RTTs or 64 further fragments with
463  * the same basic message time to send an acknowledgement, possibly
464  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
465  * sent immediately once all fragments were sent.
466  */
467 struct TransportFragmentAckMessage
468 {
469   /**
470    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
471    */
472   struct GNUNET_MessageHeader header;
473
474   /**
475    * Unique ID of the lowest fragment UUID being acknowledged.
476    */
477   uint32_t frag_uuid GNUNET_PACKED;
478
479   /**
480    * Bitfield of up to 64 additional fragments following the
481    * @e msg_uuid being acknowledged by this message.
482    */
483   uint64_t extra_acks GNUNET_PACKED;
484
485   /**
486    * Original message ID for of the message that all the
487    * fragments belong to.
488    */
489   struct GNUNET_ShortHashCode msg_uuid;
490
491   /**
492    * How long was the ACK delayed relative to the average time of
493    * receipt of the fragments being acknowledged?  Used to calculate
494    * the average RTT by taking the receipt time of the ack minus the
495    * average transmission time of the sender minus this value.
496    */
497   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
498
499   /**
500    * How long until the receiver will stop trying reassembly
501    * of this message?
502    */
503   struct GNUNET_TIME_RelativeNBO reassembly_timeout;
504 };
505
506
507 /**
508  * Content signed by each peer during DV learning.
509  */
510 struct DvInitPS
511 {
512   /**
513    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
514    */
515   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
516
517   /**
518    * Challenge value used by the initiator to re-identify the path.
519    */
520   struct GNUNET_ShortHashCode challenge;
521
522 };
523
524
525 /**
526  * Content signed by each peer during DV learning.
527  */
528 struct DvHopPS
529 {
530   /**
531    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP
532    */
533   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
534
535   /**
536    * Identity of the previous peer on the path.
537    */
538   struct GNUNET_PeerIdentity pred;
539
540   /**
541    * Identity of the next peer on the path.
542    */
543   struct GNUNET_PeerIdentity succ;
544
545   /**
546    * Challenge value used by the initiator to re-identify the path.
547    */
548   struct GNUNET_ShortHashCode challenge;
549
550 };
551
552
553 /**
554  * An entry describing a peer on a path in a
555  * `struct TransportDVLearn` message.
556  */
557 struct DVPathEntryP
558 {
559   /**
560    * Identity of a peer on the path.
561    */
562   struct GNUNET_PeerIdentity hop;
563
564   /**
565    * Signature of this hop over the path, of purpose
566    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP
567    */
568   struct GNUNET_CRYPTO_EddsaSignature hop_sig;
569
570 };
571
572
573 /**
574  * Internal message used by transport for distance vector learning.
575  * If @e num_hops does not exceed the threshold, peers should append
576  * themselves to the peer list and flood the message (possibly only
577  * to a subset of their neighbours to limit discoverability of the
578  * network topology).  To the extend that the @e bidirectional bits
579  * are set, peers may learn the inverse paths even if they did not
580  * initiate.
581  *
582  * Unless received on a bidirectional queue and @e num_hops just
583  * zero, peers that can forward to the initator should always try to
584  * forward to the initiator.
585  */
586 struct TransportDVLearn
587 {
588   /**
589    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
590    */
591   struct GNUNET_MessageHeader header;
592
593   /**
594    * Number of hops this messages has travelled, in NBO. Zero if
595    * sent by initiator.
596    */
597   uint16_t num_hops GNUNET_PACKED;
598
599   /**
600    * Bitmask of the last 16 hops indicating whether they are confirmed
601    * available (without DV) in both directions or not, in NBO.  Used
602    * to possibly instantly learn a path in both directions.  Each peer
603    * should shift this value by one to the left, and then set the
604    * lowest bit IF the current sender can be reached from it (without
605    * DV routing).
606    */
607   uint16_t bidirectional GNUNET_PACKED;
608
609   /**
610    * Peers receiving this message and delaying forwarding to other
611    * peers for any reason should increment this value by the non-network
612    * delay created by the peer.
613    */
614   struct GNUNET_TIME_RelativeNBO non_network_delay;
615
616   /**
617    * Signature of this hop over the path, of purpose
618    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
619    */
620   struct GNUNET_CRYPTO_EddsaSignature init_sig;
621
622   /**
623    * Identity of the peer that started this learning activity.
624    */
625   struct GNUNET_PeerIdentity initiator;
626
627   /**
628    * Challenge value used by the initiator to re-identify the path.
629    */
630   struct GNUNET_ShortHashCode challenge;
631
632   /* Followed by @e num_hops `struct DVPathEntryP` values,
633      excluding the initiator of the DV trace; the last entry is the
634      current sender; the current peer must not be included. */
635
636 };
637
638
639 /**
640  * Outer layer of an encapsulated message send over multiple hops.
641  * The path given only includes the identities of the subsequent
642  * peers, i.e. it will be empty if we are the receiver. Each
643  * forwarding peer should scan the list from the end, and if it can,
644  * forward to the respective peer. The list should then be shortened
645  * by all the entries up to and including that peer.  Each hop should
646  * also increment @e total_hops to allow the receiver to get a precise
647  * estimate on the number of hops the message travelled.  Senders must
648  * provide a learned path that thus should work, but intermediaries
649  * know of a shortcut, they are allowed to send the message via that
650  * shortcut.
651  *
652  * If a peer finds itself still on the list, it must drop the message.
653  */
654 struct TransportDVBox
655 {
656   /**
657    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
658    */
659   struct GNUNET_MessageHeader header;
660
661   /**
662    * Number of total hops this messages travelled. In NBO.
663    * @e origin sets this to zero, to be incremented at
664    * each hop.
665    */
666   uint16_t total_hops GNUNET_PACKED;
667
668   /**
669    * Number of hops this messages includes. In NBO.
670    */
671   uint16_t num_hops GNUNET_PACKED;
672
673   /**
674    * Identity of the peer that originated the message.
675    */
676   struct GNUNET_PeerIdentity origin;
677
678   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
679      excluding the @e origin and the current peer, the last must be
680      the ultimate target; if @e num_hops is zero, the receiver of this
681      message is the ultimate target. */
682
683   /* Followed by the actual message, which itself may be
684      another box, but not a DV_LEARN or DV_BOX message! */
685 };
686
687
688 /**
689  * Message send to another peer to validate that it can indeed
690  * receive messages at a particular address.
691  */
692 struct TransportValidationChallenge
693 {
694
695   /**
696    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE
697    */
698   struct GNUNET_MessageHeader header;
699
700   /**
701    * Zero.
702    */
703   uint32_t reserved GNUNET_PACKED;
704
705   /**
706    * Challenge to be signed by the receiving peer.
707    */
708   struct GNUNET_ShortHashCode challenge;
709
710   /**
711    * Timestamp of the sender, to be copied into the reply
712    * to allow sender to calculate RTT.
713    */
714   struct GNUNET_TIME_AbsoluteNBO sender_time;
715 };
716
717
718 /**
719  * Message signed by a peer to confirm that it can indeed
720  * receive messages at a particular address.
721  */
722 struct TransportValidationPS
723 {
724
725   /**
726    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE
727    */
728   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
729
730   /**
731    * How long does the sender believe the address on
732    * which the challenge was received to remain valid?
733    */
734   struct GNUNET_TIME_RelativeNBO validity_duration;
735
736   /**
737    * Challenge signed by the receiving peer.
738    */
739   struct GNUNET_ShortHashCode challenge;
740
741 };
742
743
744 /**
745  * Message send to a peer to respond to a
746  * #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
747  */
748 struct TransportValidationResponse
749 {
750
751   /**
752    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE
753    */
754   struct GNUNET_MessageHeader header;
755
756   /**
757    * Zero.
758    */
759   uint32_t reserved GNUNET_PACKED;
760
761   /**
762    * The peer's signature matching the
763    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE purpose.
764    */
765   struct GNUNET_CRYPTO_EddsaSignature signature;
766
767   /**
768    * The challenge that was signed by the receiving peer.
769    */
770   struct GNUNET_ShortHashCode challenge;
771
772   /**
773    * Original timestamp of the sender (was @code{sender_time}),
774    * copied into the reply to allow sender to calculate RTT.
775    */
776   struct GNUNET_TIME_AbsoluteNBO origin_time;
777
778   /**
779    * How long does the sender believe this address to remain
780    * valid?
781    */
782   struct GNUNET_TIME_RelativeNBO validity_duration;
783 };
784
785
786
787 GNUNET_NETWORK_STRUCT_END
788
789
790 /**
791  * What type of client is the `struct TransportClient` about?
792  */
793 enum ClientType
794 {
795   /**
796    * We do not know yet (client is fresh).
797    */
798   CT_NONE = 0,
799
800   /**
801    * Is the CORE service, we need to forward traffic to it.
802    */
803   CT_CORE = 1,
804
805   /**
806    * It is a monitor, forward monitor data.
807    */
808   CT_MONITOR = 2,
809
810   /**
811    * It is a communicator, use for communication.
812    */
813   CT_COMMUNICATOR = 3,
814
815   /**
816    * "Application" telling us where to connect (i.e. TOPOLOGY, DHT or CADET).
817    */
818   CT_APPLICATION = 4
819 };
820
821
822 /**
823  * When did we launch this DV learning activity?
824  */
825 struct LearnLaunchEntry
826 {
827
828   /**
829    * Kept (also) in a DLL sorted by launch time.
830    */
831   struct LearnLaunchEntry *prev;
832
833   /**
834    * Kept (also) in a DLL sorted by launch time.
835    */
836   struct LearnLaunchEntry *next;
837
838   /**
839    * Challenge that uniquely identifies this activity.
840    */
841   struct GNUNET_ShortHashCode challenge;
842
843   /**
844    * When did we transmit the DV learn message (used to
845    * calculate RTT).
846    */
847   struct GNUNET_TIME_Absolute launch_time;
848
849 };
850
851
852 /**
853  * Entry in our cache of ephemeral keys we currently use.
854  * This way, we only sign an ephemeral once per @e target,
855  * and then can re-use it over multiple
856  * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION
857  * messages (as signing is expensive).
858  */
859 struct EphemeralCacheEntry
860 {
861
862   /**
863    * Target's peer identity (we don't re-use ephemerals
864    * to limit linkability of messages).
865    */
866   struct GNUNET_PeerIdentity target;
867
868   /**
869    * Signature affirming @e ephemeral_key of type
870    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
871    */
872   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
873
874   /**
875    * How long is @e sender_sig valid
876    */
877   struct GNUNET_TIME_Absolute ephemeral_validity;
878
879   /**
880    * Our ephemeral key.
881    */
882   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
883
884   /**
885    * Our private ephemeral key.
886    */
887   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
888
889   /**
890    * Node in the ephemeral cache for this entry.
891    * Used for expiration.
892    */
893   struct GNUNET_CONTAINER_HeapNode *hn;
894 };
895
896
897 /**
898  * Client connected to the transport service.
899  */
900 struct TransportClient;
901
902
903 /**
904  * A neighbour that at least one communicator is connected to.
905  */
906 struct Neighbour;
907
908
909 /**
910  * Entry in our #dv_routes table, representing a (set of) distance
911  * vector routes to a particular peer.
912  */
913 struct DistanceVector;
914
915 /**
916  * One possible hop towards a DV target.
917  */
918 struct DistanceVectorHop
919 {
920
921   /**
922    * Kept in a MDLL, sorted by @e timeout.
923    */
924   struct DistanceVectorHop *next_dv;
925
926   /**
927    * Kept in a MDLL, sorted by @e timeout.
928    */
929   struct DistanceVectorHop *prev_dv;
930
931   /**
932    * Kept in a MDLL.
933    */
934   struct DistanceVectorHop *next_neighbour;
935
936   /**
937    * Kept in a MDLL.
938    */
939   struct DistanceVectorHop *prev_neighbour;
940
941   /**
942    * What would be the next hop to @e target?
943    */
944   struct Neighbour *next_hop;
945
946   /**
947    * Distance vector entry this hop belongs with.
948    */
949   struct DistanceVector *dv;
950
951   /**
952    * Array of @e distance hops to the target, excluding @e next_hop.
953    * NULL if the entire path is us to @e next_hop to `target`. Allocated
954    * at the end of this struct.
955    */
956   const struct GNUNET_PeerIdentity *path;
957
958   /**
959    * At what time do we forget about this path unless we see it again
960    * while learning?
961    */
962   struct GNUNET_TIME_Absolute timeout;
963
964   /**
965    * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
966    * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
967    */
968   unsigned int distance;
969 };
970
971
972 /**
973  * Entry in our #dv_routes table, representing a (set of) distance
974  * vector routes to a particular peer.
975  */
976 struct DistanceVector
977 {
978
979   /**
980    * To which peer is this a route?
981    */
982   struct GNUNET_PeerIdentity target;
983
984   /**
985    * Known paths to @e target.
986    */
987   struct DistanceVectorHop *dv_head;
988
989   /**
990    * Known paths to @e target.
991    */
992   struct DistanceVectorHop *dv_tail;
993
994   /**
995    * Task scheduled to purge expired paths from @e dv_head MDLL.
996    */
997   struct GNUNET_SCHEDULER_Task *timeout_task;
998 };
999
1000
1001 /**
1002  * A queue is a message queue provided by a communicator
1003  * via which we can reach a particular neighbour.
1004  */
1005 struct Queue;
1006
1007
1008 /**
1009  * Entry identifying transmission in one of our `struct
1010  * Queue` which still awaits an ACK.  This is used to
1011  * ensure we do not overwhelm a communicator and limit the number of
1012  * messages outstanding per communicator (say in case communicator is
1013  * CPU bound) and per queue (in case bandwidth allocation exceeds
1014  * what the communicator can actually provide towards a particular
1015  * peer/target).
1016  */
1017 struct QueueEntry
1018 {
1019
1020   /**
1021    * Kept as a DLL.
1022    */
1023   struct QueueEntry *next;
1024
1025   /**
1026    * Kept as a DLL.
1027    */
1028   struct QueueEntry *prev;
1029
1030   /**
1031    * Queue this entry is queued with.
1032    */
1033   struct Queue *queue;
1034
1035   /**
1036    * Message ID used for this message with the queue used for transmission.
1037    */
1038   uint64_t mid;
1039 };
1040
1041
1042 /**
1043  * A queue is a message queue provided by a communicator
1044  * via which we can reach a particular neighbour.
1045  */
1046 struct Queue
1047 {
1048   /**
1049    * Kept in a MDLL.
1050    */
1051   struct Queue *next_neighbour;
1052
1053   /**
1054    * Kept in a MDLL.
1055    */
1056   struct Queue *prev_neighbour;
1057
1058   /**
1059    * Kept in a MDLL.
1060    */
1061   struct Queue *prev_client;
1062
1063   /**
1064    * Kept in a MDLL.
1065    */
1066   struct Queue *next_client;
1067
1068   /**
1069    * Head of DLL of unacked transmission requests.
1070    */
1071   struct QueueEntry *queue_head;
1072
1073   /**
1074    * End of DLL of unacked transmission requests.
1075    */
1076   struct QueueEntry *queue_tail;
1077
1078   /**
1079    * Which neighbour is this queue for?
1080    */
1081   struct Neighbour *neighbour;
1082
1083   /**
1084    * Which communicator offers this queue?
1085    */
1086   struct TransportClient *tc;
1087
1088   /**
1089    * Address served by the queue.
1090    */
1091   const char *address;
1092
1093   /**
1094    * Task scheduled for the time when this queue can (likely) transmit the
1095    * next message. Still needs to check with the @e tracker_out to be sure.
1096    */
1097   struct GNUNET_SCHEDULER_Task *transmit_task;
1098
1099   /**
1100    * Our current RTT estimate for this queue.
1101    */
1102   struct GNUNET_TIME_Relative rtt;
1103
1104   /**
1105    * Message ID generator for transmissions on this queue.
1106    */
1107   uint64_t mid_gen;
1108
1109   /**
1110    * Unique identifier of this queue with the communicator.
1111    */
1112   uint32_t qid;
1113
1114   /**
1115    * Maximum transmission unit supported by this queue.
1116    */
1117   uint32_t mtu;
1118
1119   /**
1120    * Distance to the target of this queue.
1121    * FIXME: needed? DV is done differently these days...
1122    */
1123   uint32_t distance;
1124
1125   /**
1126    * Messages pending.
1127    */
1128   uint32_t num_msg_pending;
1129
1130   /**
1131    * Bytes pending.
1132    */
1133   uint32_t num_bytes_pending;
1134
1135   /**
1136    * Length of the DLL starting at @e queue_head.
1137    */
1138   unsigned int queue_length;
1139
1140   /**
1141    * Network type offered by this queue.
1142    */
1143   enum GNUNET_NetworkType nt;
1144
1145   /**
1146    * Connection status for this queue.
1147    */
1148   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1149
1150   /**
1151    * How much outbound bandwidth do we have available for this queue?
1152    */
1153   struct GNUNET_BANDWIDTH_Tracker tracker_out;
1154
1155   /**
1156    * How much inbound bandwidth do we have available for this queue?
1157    */
1158   struct GNUNET_BANDWIDTH_Tracker tracker_in;
1159 };
1160
1161
1162 /**
1163  * Information we keep for a message that we are reassembling.
1164  */
1165 struct ReassemblyContext
1166 {
1167
1168   /**
1169    * Original message ID for of the message that all the
1170    * fragments belong to.
1171    */
1172   struct GNUNET_ShortHashCode msg_uuid;
1173
1174   /**
1175    * Which neighbour is this context for?
1176    */
1177   struct Neighbour *neighbour;
1178
1179   /**
1180    * Entry in the reassembly heap (sorted by expiration).
1181    */
1182   struct GNUNET_CONTAINER_HeapNode *hn;
1183
1184   /**
1185    * Bitfield with @e msg_size bits representing the positions
1186    * where we have received fragments.  When we receive a fragment,
1187    * we check the bits in @e bitfield before incrementing @e msg_missing.
1188    *
1189    * Allocated after the reassembled message.
1190    */
1191   uint8_t *bitfield;
1192
1193   /**
1194    * Task for sending ACK. We may send ACKs either because of hitting
1195    * the @e extra_acks limit, or based on time and @e num_acks.  This
1196    * task is for the latter case.
1197    */
1198   struct GNUNET_SCHEDULER_Task *ack_task;
1199
1200   /**
1201    * At what time will we give up reassembly of this message?
1202    */
1203   struct GNUNET_TIME_Absolute reassembly_timeout;
1204
1205   /**
1206    * Average delay of all acks in @e extra_acks and @e frag_uuid.
1207    * Should be reset to zero when @e num_acks is set to 0.
1208    */
1209   struct GNUNET_TIME_Relative avg_ack_delay;
1210
1211   /**
1212    * Time we received the last fragment.  @e avg_ack_delay must be
1213    * incremented by now - @e last_frag multiplied by @e num_acks.
1214    */
1215   struct GNUNET_TIME_Absolute last_frag;
1216
1217   /**
1218    * Bitfield of up to 64 additional fragments following @e frag_uuid
1219    * to be acknowledged in the next cummulative ACK.
1220    */
1221   uint64_t extra_acks;
1222
1223   /**
1224    * Unique ID of the lowest fragment UUID to be acknowledged in the
1225    * next cummulative ACK.  Only valid if @e num_acks > 0.
1226    */
1227   uint32_t frag_uuid;
1228
1229   /**
1230    * Number of ACKs we have accumulated so far.  Reset to 0
1231    * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
1232    */
1233   unsigned int num_acks;
1234
1235   /**
1236    * How big is the message we are reassembling in total?
1237    */
1238   uint16_t msg_size;
1239
1240   /**
1241    * How many bytes of the message are still missing?  Defragmentation
1242    * is complete when @e msg_missing == 0.
1243    */
1244   uint16_t msg_missing;
1245
1246   /* Followed by @e msg_size bytes of the (partially) defragmented original message */
1247
1248   /* Followed by @e bitfield data */
1249 };
1250
1251
1252 /**
1253  * A neighbour that at least one communicator is connected to.
1254  */
1255 struct Neighbour
1256 {
1257
1258   /**
1259    * Which peer is this about?
1260    */
1261   struct GNUNET_PeerIdentity pid;
1262
1263   /**
1264    * Map with `struct ReassemblyContext` structs for fragments under
1265    * reassembly. May be NULL if we currently have no fragments from
1266    * this @e pid (lazy initialization).
1267    */
1268   struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
1269
1270   /**
1271    * Heap with `struct ReassemblyContext` structs for fragments under
1272    * reassembly. May be NULL if we currently have no fragments from
1273    * this @e pid (lazy initialization).
1274    */
1275   struct GNUNET_CONTAINER_Heap *reassembly_heap;
1276
1277   /**
1278    * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
1279    */
1280   struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
1281
1282   /**
1283    * Head of list of messages pending for this neighbour.
1284    */
1285   struct PendingMessage *pending_msg_head;
1286
1287   /**
1288    * Tail of list of messages pending for this neighbour.
1289    */
1290   struct PendingMessage *pending_msg_tail;
1291
1292   /**
1293    * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1294    * purged if this neighbour goes down.
1295    */
1296   struct DistanceVectorHop *dv_head;
1297
1298   /**
1299    * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1300    * purged if this neighbour goes down.
1301    */
1302   struct DistanceVectorHop *dv_tail;
1303
1304   /**
1305    * Head of DLL of queues to this peer.
1306    */
1307   struct Queue *queue_head;
1308
1309   /**
1310    * Tail of DLL of queues to this peer.
1311    */
1312   struct Queue *queue_tail;
1313
1314   /**
1315    * Task run to cleanup pending messages that have exceeded their timeout.
1316    */
1317   struct GNUNET_SCHEDULER_Task *timeout_task;
1318
1319   /**
1320    * Quota at which CORE is allowed to transmit to this peer.
1321    *
1322    * FIXME: not yet used, tricky to get right given multiple queues!
1323    *        (=> Idea: measure???)
1324    * FIXME: how do we set this value initially when we tell CORE?
1325    *    Options: start at a minimum value or at literally zero?
1326    *         (=> Current thought: clean would be zero!)
1327    */
1328   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1329
1330   /**
1331    * What is the earliest timeout of any message in @e pending_msg_tail?
1332    */
1333   struct GNUNET_TIME_Absolute earliest_timeout;
1334
1335 };
1336
1337
1338 /**
1339  * A peer that an application (client) would like us to talk to directly.
1340  */
1341 struct PeerRequest
1342 {
1343
1344   /**
1345    * Which peer is this about?
1346    */
1347   struct GNUNET_PeerIdentity pid;
1348
1349   /**
1350    * Client responsible for the request.
1351    */
1352   struct TransportClient *tc;
1353
1354   /**
1355    * Handle for watching the peerstore for HELLOs for this peer.
1356    */
1357   struct GNUNET_PEERSTORE_WatchContext *wc;
1358
1359   /**
1360    * What kind of performance preference does this @e tc have?
1361    */
1362   enum GNUNET_MQ_PreferenceKind pk;
1363
1364   /**
1365    * How much bandwidth would this @e tc like to see?
1366    */
1367   struct GNUNET_BANDWIDTH_Value32NBO bw;
1368
1369 };
1370
1371
1372 /**
1373  * Types of different pending messages.
1374  */
1375 enum PendingMessageType
1376 {
1377
1378   /**
1379    * Ordinary message received from the CORE service.
1380    */
1381   PMT_CORE = 0,
1382
1383   /**
1384    * Fragment box.
1385    */
1386   PMT_FRAGMENT_BOX = 1,
1387
1388   /**
1389    * Reliability box.
1390    */
1391   PMT_RELIABILITY_BOX = 2,
1392
1393   /**
1394    * Any type of acknowledgement.
1395    */
1396   PMT_ACKNOWLEDGEMENT = 3
1397
1398 };
1399
1400
1401 /**
1402  * Transmission request that is awaiting delivery.  The original
1403  * transmission requests from CORE may be too big for some queues.
1404  * In this case, a *tree* of fragments is created.  At each
1405  * level of the tree, fragments are kept in a DLL ordered by which
1406  * fragment should be sent next (at the head).  The tree is searched
1407  * top-down, with the original message at the root.
1408  *
1409  * To select a node for transmission, first it is checked if the
1410  * current node's message fits with the MTU.  If it does not, we
1411  * either calculate the next fragment (based on @e frag_off) from the
1412  * current node, or, if all fragments have already been created,
1413  * descend to the @e head_frag.  Even though the node was already
1414  * fragmented, the fragment may be too big if the fragment was
1415  * generated for a queue with a larger MTU. In this case, the node
1416  * may be fragmented again, thus creating a tree.
1417  *
1418  * When acknowledgements for fragments are received, the tree
1419  * must be pruned, removing those parts that were already
1420  * acknowledged.  When fragments are sent over a reliable
1421  * channel, they can be immediately removed.
1422  *
1423  * If a message is ever fragmented, then the original "full" message
1424  * is never again transmitted (even if it fits below the MTU), and
1425  * only (remaining) fragments are sent.
1426  */
1427 struct PendingMessage
1428 {
1429   /**
1430    * Kept in a MDLL of messages for this @a target.
1431    */
1432   struct PendingMessage *next_neighbour;
1433
1434   /**
1435    * Kept in a MDLL of messages for this @a target.
1436    */
1437   struct PendingMessage *prev_neighbour;
1438
1439   /**
1440    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1441    */
1442   struct PendingMessage *next_client;
1443
1444   /**
1445    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
1446    */
1447   struct PendingMessage *prev_client;
1448
1449   /**
1450    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1451    */
1452   struct PendingMessage *next_frag;
1453
1454   /**
1455    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
1456    */
1457   struct PendingMessage *prev_frag;
1458
1459   /**
1460    * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1461    */
1462   struct PendingMessage *bpm;
1463
1464   /**
1465    * Target of the request.
1466    */
1467   struct Neighbour *target;
1468
1469   /**
1470    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1471    */
1472   struct TransportClient *client;
1473
1474   /**
1475    * Head of a MDLL of fragments created for this core message.
1476    */
1477   struct PendingMessage *head_frag;
1478
1479   /**
1480    * Tail of a MDLL of fragments created for this core message.
1481    */
1482   struct PendingMessage *tail_frag;
1483
1484   /**
1485    * Our parent in the fragmentation tree.
1486    */
1487   struct PendingMessage *frag_parent;
1488
1489   /**
1490    * At what time should we give up on the transmission (and no longer retry)?
1491    */
1492   struct GNUNET_TIME_Absolute timeout;
1493
1494   /**
1495    * What is the earliest time for us to retry transmission of this message?
1496    */
1497   struct GNUNET_TIME_Absolute next_attempt;
1498
1499   /**
1500    * UUID to use for this message (used for reassembly of fragments, only
1501    * initialized if @e msg_uuid_set is #GNUNET_YES).
1502    */
1503   struct GNUNET_ShortHashCode msg_uuid;
1504
1505   /**
1506    * Counter incremented per generated fragment.
1507    */
1508   uint32_t frag_uuidgen;
1509
1510   /**
1511    * Type of the pending message.
1512    */
1513   enum PendingMessageType pmt;
1514
1515   /**
1516    * Size of the original message.
1517    */
1518   uint16_t bytes_msg;
1519
1520   /**
1521    * Offset at which we should generate the next fragment.
1522    */
1523   uint16_t frag_off;
1524
1525   /**
1526    * #GNUNET_YES once @e msg_uuid was initialized
1527    */
1528   int16_t msg_uuid_set;
1529
1530   /* Followed by @e bytes_msg to transmit */
1531 };
1532
1533
1534 /**
1535  * One of the addresses of this peer.
1536  */
1537 struct AddressListEntry
1538 {
1539
1540   /**
1541    * Kept in a DLL.
1542    */
1543   struct AddressListEntry *next;
1544
1545   /**
1546    * Kept in a DLL.
1547    */
1548   struct AddressListEntry *prev;
1549
1550   /**
1551    * Which communicator provides this address?
1552    */
1553   struct TransportClient *tc;
1554
1555   /**
1556    * The actual address.
1557    */
1558   const char *address;
1559
1560   /**
1561    * Current context for storing this address in the peerstore.
1562    */
1563   struct GNUNET_PEERSTORE_StoreContext *sc;
1564
1565   /**
1566    * Task to periodically do @e st operation.
1567    */
1568   struct GNUNET_SCHEDULER_Task *st;
1569
1570   /**
1571    * What is a typical lifetime the communicator expects this
1572    * address to have? (Always from now.)
1573    */
1574   struct GNUNET_TIME_Relative expiration;
1575
1576   /**
1577    * Address identifier used by the communicator.
1578    */
1579   uint32_t aid;
1580
1581   /**
1582    * Network type offered by this address.
1583    */
1584   enum GNUNET_NetworkType nt;
1585
1586 };
1587
1588
1589 /**
1590  * Client connected to the transport service.
1591  */
1592 struct TransportClient
1593 {
1594
1595   /**
1596    * Kept in a DLL.
1597    */
1598   struct TransportClient *next;
1599
1600   /**
1601    * Kept in a DLL.
1602    */
1603   struct TransportClient *prev;
1604
1605   /**
1606    * Handle to the client.
1607    */
1608   struct GNUNET_SERVICE_Client *client;
1609
1610   /**
1611    * Message queue to the client.
1612    */
1613   struct GNUNET_MQ_Handle *mq;
1614
1615   /**
1616    * What type of client is this?
1617    */
1618   enum ClientType type;
1619
1620   union
1621   {
1622
1623     /**
1624      * Information for @e type #CT_CORE.
1625      */
1626     struct {
1627
1628       /**
1629        * Head of list of messages pending for this client, sorted by
1630        * transmission time ("next_attempt" + possibly internal prioritization).
1631        */
1632       struct PendingMessage *pending_msg_head;
1633
1634       /**
1635        * Tail of list of messages pending for this client.
1636        */
1637       struct PendingMessage *pending_msg_tail;
1638
1639     } core;
1640
1641     /**
1642      * Information for @e type #CT_MONITOR.
1643      */
1644     struct {
1645
1646       /**
1647        * Peer identity to monitor the addresses of.
1648        * Zero to monitor all neighbours.  Valid if
1649        * @e type is #CT_MONITOR.
1650        */
1651       struct GNUNET_PeerIdentity peer;
1652
1653       /**
1654        * Is this a one-shot monitor?
1655        */
1656       int one_shot;
1657
1658     } monitor;
1659
1660
1661     /**
1662      * Information for @e type #CT_COMMUNICATOR.
1663      */
1664     struct {
1665       /**
1666        * If @e type is #CT_COMMUNICATOR, this communicator
1667        * supports communicating using these addresses.
1668        */
1669       char *address_prefix;
1670
1671       /**
1672        * Head of DLL of queues offered by this communicator.
1673        */
1674       struct Queue *queue_head;
1675
1676       /**
1677        * Tail of DLL of queues offered by this communicator.
1678        */
1679       struct Queue *queue_tail;
1680
1681       /**
1682        * Head of list of the addresses of this peer offered by this communicator.
1683        */
1684       struct AddressListEntry *addr_head;
1685
1686       /**
1687        * Tail of list of the addresses of this peer offered by this communicator.
1688        */
1689       struct AddressListEntry *addr_tail;
1690
1691       /**
1692        * Number of queue entries in all queues to this communicator. Used
1693        * throttle sending to a communicator if we see that the communicator
1694        * is globally unable to keep up.
1695        */
1696       unsigned int total_queue_length;
1697
1698       /**
1699        * Characteristics of this communicator.
1700        */
1701       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1702
1703     } communicator;
1704
1705     /**
1706      * Information for @e type #CT_APPLICATION
1707      */
1708     struct {
1709
1710       /**
1711        * Map of requests for peers the given client application would like to
1712        * see connections for.  Maps from PIDs to `struct PeerRequest`.
1713        */
1714       struct GNUNET_CONTAINER_MultiPeerMap *requests;
1715
1716     } application;
1717
1718   } details;
1719
1720 };
1721
1722
1723 /**
1724  * State we keep for validation activities.  Each of these
1725  * is both in the #validation_heap and the #validation_map.
1726  */
1727 struct ValidationState
1728 {
1729
1730   /**
1731    * For which peer is @a address to be validated (or possibly valid)?
1732    * Serves as key in the #validation_map.
1733    */
1734   struct GNUNET_PeerIdentity pid;
1735
1736   /**
1737    * How long did the peer claim this @e address to be valid? Capped at
1738    * minimum of #MAX_ADDRESS_VALID_UNTIL relative to the time where we last
1739    * were told about the address and the value claimed by the other peer at
1740    * that time.  May be updated similarly when validation succeeds.
1741    */
1742   struct GNUNET_TIME_Absolute valid_until;
1743
1744   /**
1745    * How long do *we* consider this @e address to be valid?
1746    * In the past or zero if we have not yet validated it.
1747    */
1748   struct GNUNET_TIME_Absolute validated_until;
1749
1750   /**
1751    * When did we FIRST use the current @e challenge in a message?
1752    * Used to sanity-check @code{origin_time} in the response when
1753    * calculating the RTT. If the @code{origin_time} is not in
1754    * the expected range, the response is discarded as malicious.
1755    */
1756   struct GNUNET_TIME_Absolute first_challenge_use;
1757
1758   /**
1759    * When did we LAST use the current @e challenge in a message?
1760    * Used to sanity-check @code{origin_time} in the response when
1761    * calculating the RTT.  If the @code{origin_time} is not in
1762    * the expected range, the response is discarded as malicious.
1763    */
1764   struct GNUNET_TIME_Absolute last_challenge_use;
1765
1766   /**
1767    * Next time we will send the @e challenge to the peer, if this time is past
1768    * @e valid_until, this validation state is released at this time.  If the
1769    * address is valid, @e next_challenge is set to @e validated_until MINUS @e
1770    * validation_delay * #VALIDATION_RTT_BUFFER_FACTOR, such that we will try
1771    * to re-validate before the validity actually expires.
1772    */
1773   struct GNUNET_TIME_Absolute next_challenge;
1774
1775   /**
1776    * Current backoff factor we're applying for sending the @a challenge.
1777    * Reset to 0 if the @a challenge is confirmed upon validation.
1778    * Reduced to minimum of #FAST_VALIDATION_CHALLENGE_FREQ and half of the
1779    * existing value if we receive an unvalidated address again over
1780    * another channel (and thus should consider the information "fresh").
1781    * Maximum is #MAX_VALIDATION_CHALLENGE_FREQ.
1782    */
1783   struct GNUNET_TIME_Relative challenge_backoff;
1784
1785   /**
1786    * Initially set to "forever". Once @e validated_until is set, this value is
1787    * set to the RTT that tells us how long it took to receive the validation.
1788    */
1789   struct GNUNET_TIME_Relative validation_rtt;
1790
1791   /**
1792    * The challenge we sent to the peer to get it to validate the address. Note
1793    * that we rotate the challenge whenever we update @e validated_until to
1794    * avoid attacks where a peer simply replays an old challenge in the future.
1795    * (We must not rotate more often as otherwise we may discard valid answers
1796    * due to packet losses, latency and reorderings on the network).
1797    */
1798   struct GNUNET_ShortHashCode challenge;
1799
1800   /**
1801    * Claimed address of the peer.
1802    */
1803   char *address;
1804
1805   /**
1806    * Entry in the #validation_heap, which is sorted by @e next_challenge. The
1807    * heap is used to figure out when the next validation activity should be
1808    * run.
1809    */
1810   struct GNUNET_CONTAINER_HeapNode *hn;
1811
1812   /**
1813    * Handle to a PEERSTORE store operation for this @e address.  NULL if
1814    * no PEERSTORE operation is pending.
1815    */
1816   struct GNUNET_PEERSTORE_StoreContext *sc;
1817
1818   /**
1819    * We are technically ready to send the challenge, but we are waiting for
1820    * the respective queue to become available for transmission.
1821    */
1822   int awaiting_queue;
1823
1824 };
1825
1826
1827 /**
1828  * Head of linked list of all clients to this service.
1829  */
1830 static struct TransportClient *clients_head;
1831
1832 /**
1833  * Tail of linked list of all clients to this service.
1834  */
1835 static struct TransportClient *clients_tail;
1836
1837 /**
1838  * Statistics handle.
1839  */
1840 static struct GNUNET_STATISTICS_Handle *GST_stats;
1841
1842 /**
1843  * Configuration handle.
1844  */
1845 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1846
1847 /**
1848  * Our public key.
1849  */
1850 static struct GNUNET_PeerIdentity GST_my_identity;
1851
1852 /**
1853  * Our private key.
1854  */
1855 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1856
1857 /**
1858  * Map from PIDs to `struct Neighbour` entries.  A peer is
1859  * a neighbour if we have an MQ to it from some communicator.
1860  */
1861 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1862
1863 /**
1864  * Map from PIDs to `struct DistanceVector` entries describing
1865  * known paths to the peer.
1866  */
1867 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1868
1869 /**
1870  * Map from PIDs to `struct ValidationState` entries describing
1871  * addresses we are aware of and their validity state.
1872  */
1873 static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
1874
1875 /**
1876  * Map from challenges to `struct LearnLaunchEntry` values.
1877  */
1878 static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map;
1879
1880 /**
1881  * Head of a DLL sorted by launch time.
1882  */
1883 static struct LearnLaunchEntry *lle_head;
1884
1885 /**
1886  * Tail of a DLL sorted by launch time.
1887  */
1888 static struct LearnLaunchEntry *lle_tail;
1889
1890 /**
1891  * MIN Heap sorted by "next_challenge" to `struct ValidationState` entries
1892  * sorting addresses we are aware of by when we should next try to (re)validate
1893  * (or expire) them.
1894  */
1895 static struct GNUNET_CONTAINER_Heap *validation_heap;
1896
1897 /**
1898  * Database for peer's HELLOs.
1899  */
1900 static struct GNUNET_PEERSTORE_Handle *peerstore;
1901
1902 /**
1903  * Heap sorting `struct EphemeralCacheEntry` by their
1904  * key/signature validity.
1905  */
1906 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1907
1908 /**
1909  * Hash map for looking up `struct EphemeralCacheEntry`s
1910  * by peer identity. (We may have ephemerals in our
1911  * cache for which we do not have a neighbour entry,
1912  * and similar many neighbours may not need ephemerals,
1913  * so we use a second map.)
1914  */
1915 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1916
1917 /**
1918  * Task to free expired ephemerals.
1919  */
1920 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
1921
1922 /**
1923  * Task run to initiate DV learning.
1924  */
1925 static struct GNUNET_SCHEDULER_Task *dvlearn_task;
1926
1927 /**
1928  * Task to run address validation.
1929  */
1930 static struct GNUNET_SCHEDULER_Task *validation_task;
1931
1932
1933 /**
1934  * Free cached ephemeral key.
1935  *
1936  * @param ece cached signature to free
1937  */
1938 static void
1939 free_ephemeral (struct EphemeralCacheEntry *ece)
1940 {
1941   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1942                                         &ece->target,
1943                                         ece);
1944   GNUNET_CONTAINER_heap_remove_node (ece->hn);
1945   GNUNET_free (ece);
1946 }
1947
1948
1949 /**
1950  * Free validation state.
1951  *
1952  * @param vs validation state to free
1953  */
1954 static void
1955 free_validation_state (struct ValidationState *vs)
1956 {
1957   GNUNET_CONTAINER_multipeermap_remove (validation_map,
1958                                         &vs->pid,
1959                                         vs);
1960   GNUNET_CONTAINER_heap_remove_node (vs->hn);
1961   vs->hn = NULL;
1962   if (NULL != vs->sc)
1963   {
1964     GNUNET_PEERSTORE_store_cancel (vs->sc);
1965     vs->sc = NULL;
1966   }
1967   GNUNET_free (vs->address);
1968   GNUNET_free (vs);
1969 }
1970
1971
1972 /**
1973  * Lookup neighbour record for peer @a pid.
1974  *
1975  * @param pid neighbour to look for
1976  * @return NULL if we do not have this peer as a neighbour
1977  */
1978 static struct Neighbour *
1979 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1980 {
1981   return GNUNET_CONTAINER_multipeermap_get (neighbours,
1982                                             pid);
1983 }
1984
1985
1986 /**
1987  * Details about what to notify monitors about.
1988  */
1989 struct MonitorEvent
1990 {
1991   /**
1992    * @deprecated To be discussed if we keep these...
1993    */
1994   struct GNUNET_TIME_Absolute last_validation;
1995   struct GNUNET_TIME_Absolute valid_until;
1996   struct GNUNET_TIME_Absolute next_validation;
1997
1998   /**
1999    * Current round-trip time estimate.
2000    */
2001   struct GNUNET_TIME_Relative rtt;
2002
2003   /**
2004    * Connection status.
2005    */
2006   enum GNUNET_TRANSPORT_ConnectionStatus cs;
2007
2008   /**
2009    * Messages pending.
2010    */
2011   uint32_t num_msg_pending;
2012
2013   /**
2014    * Bytes pending.
2015    */
2016   uint32_t num_bytes_pending;
2017
2018
2019 };
2020
2021
2022 /**
2023  * Free a @dvh. Callers MAY want to check if this was the last path to the
2024  * `target`, and if so call #free_dv_route to also free the associated DV
2025  * entry in #dv_routes (if not, the associated scheduler job should eventually
2026  * take care of it).
2027  *
2028  * @param dvh hop to free
2029  */
2030 static void
2031 free_distance_vector_hop (struct DistanceVectorHop *dvh)
2032 {
2033   struct Neighbour *n = dvh->next_hop;
2034   struct DistanceVector *dv = dvh->dv;
2035
2036   GNUNET_CONTAINER_MDLL_remove (neighbour,
2037                                 n->dv_head,
2038                                 n->dv_tail,
2039                                 dvh);
2040   GNUNET_CONTAINER_MDLL_remove (dv,
2041                                 dv->dv_head,
2042                                 dv->dv_tail,
2043                                 dvh);
2044   GNUNET_free (dvh);
2045 }
2046
2047
2048 /**
2049  * Free entry in #dv_routes.  First frees all hops to the target, and
2050  * if there are no entries left, frees @a dv as well.
2051  *
2052  * @param dv route to free
2053  */
2054 static void
2055 free_dv_route (struct DistanceVector *dv)
2056 {
2057   struct DistanceVectorHop *dvh;
2058
2059   while (NULL != (dvh = dv->dv_head))
2060     free_distance_vector_hop (dvh);
2061   if (NULL == dv->dv_head)
2062   {
2063     GNUNET_assert (GNUNET_YES ==
2064                    GNUNET_CONTAINER_multipeermap_remove (dv_routes,
2065                                                          &dv->target,
2066                                                          dv));
2067     if (NULL != dv->timeout_task)
2068       GNUNET_SCHEDULER_cancel (dv->timeout_task);
2069     GNUNET_free (dv);
2070   }
2071 }
2072
2073
2074 /**
2075  * Notify monitor @a tc about an event.  That @a tc
2076  * cares about the event has already been checked.
2077  *
2078  * Send @a tc information in @a me about a @a peer's status with
2079  * respect to some @a address to all monitors that care.
2080  *
2081  * @param tc monitor to inform
2082  * @param peer peer the information is about
2083  * @param address address the information is about
2084  * @param nt network type associated with @a address
2085  * @param me detailed information to transmit
2086  */
2087 static void
2088 notify_monitor (struct TransportClient *tc,
2089                 const struct GNUNET_PeerIdentity *peer,
2090                 const char *address,
2091                 enum GNUNET_NetworkType nt,
2092                 const struct MonitorEvent *me)
2093 {
2094   struct GNUNET_MQ_Envelope *env;
2095   struct GNUNET_TRANSPORT_MonitorData *md;
2096   size_t addr_len = strlen (address) + 1;
2097
2098   env = GNUNET_MQ_msg_extra (md,
2099                              addr_len,
2100                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
2101   md->nt = htonl ((uint32_t) nt);
2102   md->peer = *peer;
2103   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
2104   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
2105   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
2106   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
2107   md->cs = htonl ((uint32_t) me->cs);
2108   md->num_msg_pending = htonl (me->num_msg_pending);
2109   md->num_bytes_pending = htonl (me->num_bytes_pending);
2110   memcpy (&md[1],
2111           address,
2112           addr_len);
2113   GNUNET_MQ_send (tc->mq,
2114                   env);
2115 }
2116
2117
2118 /**
2119  * Send information in @a me about a @a peer's status with respect
2120  * to some @a address to all monitors that care.
2121  *
2122  * @param peer peer the information is about
2123  * @param address address the information is about
2124  * @param nt network type associated with @a address
2125  * @param me detailed information to transmit
2126  */
2127 static void
2128 notify_monitors (const struct GNUNET_PeerIdentity *peer,
2129                  const char *address,
2130                  enum GNUNET_NetworkType nt,
2131                  const struct MonitorEvent *me)
2132 {
2133   for (struct TransportClient *tc = clients_head;
2134        NULL != tc;
2135        tc = tc->next)
2136   {
2137     if (CT_MONITOR != tc->type)
2138       continue;
2139     if (tc->details.monitor.one_shot)
2140       continue;
2141     if ( (0 != GNUNET_is_zero (&tc->details.monitor.peer)) &&
2142          (0 != GNUNET_memcmp (&tc->details.monitor.peer,
2143                               peer)) )
2144       continue;
2145     notify_monitor (tc,
2146                     peer,
2147                     address,
2148                     nt,
2149                     me);
2150   }
2151 }
2152
2153
2154 /**
2155  * Called whenever a client connects.  Allocates our
2156  * data structures associated with that client.
2157  *
2158  * @param cls closure, NULL
2159  * @param client identification of the client
2160  * @param mq message queue for the client
2161  * @return our `struct TransportClient`
2162  */
2163 static void *
2164 client_connect_cb (void *cls,
2165                    struct GNUNET_SERVICE_Client *client,
2166                    struct GNUNET_MQ_Handle *mq)
2167 {
2168   struct TransportClient *tc;
2169
2170   (void) cls;
2171   tc = GNUNET_new (struct TransportClient);
2172   tc->client = client;
2173   tc->mq = mq;
2174   GNUNET_CONTAINER_DLL_insert (clients_head,
2175                                clients_tail,
2176                                tc);
2177   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2178               "Client %p connected\n",
2179               tc);
2180   return tc;
2181 }
2182
2183
2184 /**
2185  * Free @a rc
2186  *
2187  * @param rc data structure to free
2188  */
2189 static void
2190 free_reassembly_context (struct ReassemblyContext *rc)
2191 {
2192   struct Neighbour *n = rc->neighbour;
2193
2194   GNUNET_assert (rc ==
2195                  GNUNET_CONTAINER_heap_remove_node (rc->hn));
2196   GNUNET_assert (GNUNET_OK ==
2197                  GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
2198                                                         &rc->msg_uuid,
2199                                                         rc));
2200   GNUNET_free (rc);
2201 }
2202
2203
2204 /**
2205  * Task run to clean up reassembly context of a neighbour that have expired.
2206  *
2207  * @param cls a `struct Neighbour`
2208  */
2209 static void
2210 reassembly_cleanup_task (void *cls)
2211 {
2212   struct Neighbour *n = cls;
2213   struct ReassemblyContext *rc;
2214
2215   n->reassembly_timeout_task = NULL;
2216   while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
2217   {
2218     if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
2219     {
2220       free_reassembly_context (rc);
2221       continue;
2222     }
2223     GNUNET_assert (NULL == n->reassembly_timeout_task);
2224     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
2225                                                           &reassembly_cleanup_task,
2226                                                           n);
2227     return;
2228   }
2229 }
2230
2231
2232 /**
2233  * function called to #free_reassembly_context().
2234  *
2235  * @param cls NULL
2236  * @param key unused
2237  * @param value a `struct ReassemblyContext` to free
2238  * @return #GNUNET_OK (continue iteration)
2239  */
2240 static int
2241 free_reassembly_cb (void *cls,
2242                     const struct GNUNET_ShortHashCode *key,
2243                     void *value)
2244 {
2245   struct ReassemblyContext *rc = value;
2246   (void) cls;
2247   (void) key;
2248
2249   free_reassembly_context (rc);
2250   return GNUNET_OK;
2251 }
2252
2253
2254 /**
2255  * Release memory used by @a neighbour.
2256  *
2257  * @param neighbour neighbour entry to free
2258  */
2259 static void
2260 free_neighbour (struct Neighbour *neighbour)
2261 {
2262   struct DistanceVectorHop *dvh;
2263
2264   GNUNET_assert (NULL == neighbour->queue_head);
2265   GNUNET_assert (GNUNET_YES ==
2266                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
2267                                                        &neighbour->pid,
2268                                                        neighbour));
2269   if (NULL != neighbour->timeout_task)
2270     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
2271   if (NULL != neighbour->reassembly_map)
2272   {
2273     GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
2274                                             &free_reassembly_cb,
2275                                             NULL);
2276     GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
2277     neighbour->reassembly_map = NULL;
2278     GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
2279     neighbour->reassembly_heap = NULL;
2280   }
2281   while (NULL != (dvh = neighbour->dv_head))
2282   {
2283     struct DistanceVector *dv = dvh->dv;
2284
2285     free_distance_vector_hop (dvh);
2286     if (NULL == dv->dv_head)
2287       free_dv_route (dv);
2288   }
2289   if (NULL != neighbour->reassembly_timeout_task)
2290     GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
2291   GNUNET_free (neighbour);
2292 }
2293
2294
2295 /**
2296  * Send message to CORE clients that we lost a connection.
2297  *
2298  * @param tc client to inform (must be CORE client)
2299  * @param pid peer the connection is for
2300  * @param quota_out current quota for the peer
2301  */
2302 static void
2303 core_send_connect_info (struct TransportClient *tc,
2304                         const struct GNUNET_PeerIdentity *pid,
2305                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2306 {
2307   struct GNUNET_MQ_Envelope *env;
2308   struct ConnectInfoMessage *cim;
2309
2310   GNUNET_assert (CT_CORE == tc->type);
2311   env = GNUNET_MQ_msg (cim,
2312                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2313   cim->quota_out = quota_out;
2314   cim->id = *pid;
2315   GNUNET_MQ_send (tc->mq,
2316                   env);
2317 }
2318
2319
2320 /**
2321  * Send message to CORE clients that we gained a connection
2322  *
2323  * @param pid peer the queue was for
2324  * @param quota_out current quota for the peer
2325  */
2326 static void
2327 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
2328                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2329 {
2330   for (struct TransportClient *tc = clients_head;
2331        NULL != tc;
2332        tc = tc->next)
2333   {
2334     if (CT_CORE != tc->type)
2335       continue;
2336     core_send_connect_info (tc,
2337                             pid,
2338                             quota_out);
2339   }
2340 }
2341
2342
2343 /**
2344  * Send message to CORE clients that we lost a connection.
2345  *
2346  * @param pid peer the connection was for
2347  */
2348 static void
2349 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
2350 {
2351   for (struct TransportClient *tc = clients_head;
2352        NULL != tc;
2353        tc = tc->next)
2354   {
2355     struct GNUNET_MQ_Envelope *env;
2356     struct DisconnectInfoMessage *dim;
2357
2358     if (CT_CORE != tc->type)
2359       continue;
2360     env = GNUNET_MQ_msg (dim,
2361                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
2362     dim->peer = *pid;
2363     GNUNET_MQ_send (tc->mq,
2364                     env);
2365   }
2366 }
2367
2368
2369 /**
2370  * We believe we are ready to transmit a message on a queue. Double-checks
2371  * with the queue's "tracker_out" and then gives the message to the
2372  * communicator for transmission (updating the tracker, and re-scheduling
2373  * itself if applicable).
2374  *
2375  * @param cls the `struct Queue` to process transmissions for
2376  */
2377 static void
2378 transmit_on_queue (void *cls);
2379
2380
2381 /**
2382  * Schedule next run of #transmit_on_queue().  Does NOTHING if
2383  * we should run immediately or if the message queue is empty.
2384  * Test for no task being added AND queue not being empty to
2385  * transmit immediately afterwards!  This function must only
2386  * be called if the message queue is non-empty!
2387  *
2388  * @param queue the queue to do scheduling for
2389  */
2390 static void
2391 schedule_transmit_on_queue (struct Queue *queue)
2392 {
2393   struct Neighbour *n = queue->neighbour;
2394   struct PendingMessage *pm = n->pending_msg_head;
2395   struct GNUNET_TIME_Relative out_delay;
2396   unsigned int wsize;
2397
2398   GNUNET_assert (NULL != pm);
2399   if (queue->tc->details.communicator.total_queue_length >=
2400       COMMUNICATOR_TOTAL_QUEUE_LIMIT)
2401   {
2402     GNUNET_STATISTICS_update (GST_stats,
2403                               "# Transmission throttled due to communicator queue limit",
2404                               1,
2405                               GNUNET_NO);
2406     return;
2407   }
2408   if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
2409   {
2410     GNUNET_STATISTICS_update (GST_stats,
2411                               "# Transmission throttled due to queue queue limit",
2412                               1,
2413                               GNUNET_NO);
2414     return;
2415   }
2416
2417   wsize = (0 == queue->mtu)
2418     ? pm->bytes_msg /* FIXME: add overheads? */
2419     : queue->mtu;
2420   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
2421                                                   wsize);
2422   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
2423                                         out_delay);
2424   if (0 == out_delay.rel_value_us)
2425     return; /* we should run immediately! */
2426   /* queue has changed since we were scheduled, reschedule again */
2427   queue->transmit_task
2428     = GNUNET_SCHEDULER_add_delayed (out_delay,
2429                                     &transmit_on_queue,
2430                                     queue);
2431   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
2432     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2433                 "Next transmission on queue `%s' in %s (high delay)\n",
2434                 queue->address,
2435                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2436                                                         GNUNET_YES));
2437   else
2438     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2439                 "Next transmission on queue `%s' in %s\n",
2440                 queue->address,
2441                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2442                                                         GNUNET_YES));
2443 }
2444
2445
2446 /**
2447  * Free @a queue.
2448  *
2449  * @param queue the queue to free
2450  */
2451 static void
2452 free_queue (struct Queue *queue)
2453 {
2454   struct Neighbour *neighbour = queue->neighbour;
2455   struct TransportClient *tc = queue->tc;
2456   struct MonitorEvent me = {
2457     .cs = GNUNET_TRANSPORT_CS_DOWN,
2458     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
2459   };
2460   struct QueueEntry *qe;
2461   int maxxed;
2462
2463   if (NULL != queue->transmit_task)
2464   {
2465     GNUNET_SCHEDULER_cancel (queue->transmit_task);
2466     queue->transmit_task = NULL;
2467   }
2468   GNUNET_CONTAINER_MDLL_remove (neighbour,
2469                                 neighbour->queue_head,
2470                                 neighbour->queue_tail,
2471                                 queue);
2472   GNUNET_CONTAINER_MDLL_remove (client,
2473                                 tc->details.communicator.queue_head,
2474                                 tc->details.communicator.queue_tail,
2475                                 queue);
2476   maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
2477   while (NULL != (qe = queue->queue_head))
2478   {
2479     GNUNET_CONTAINER_DLL_remove (queue->queue_head,
2480                                  queue->queue_tail,
2481                                  qe);
2482     queue->queue_length--;
2483     tc->details.communicator.total_queue_length--;
2484     GNUNET_free (qe);
2485   }
2486   GNUNET_assert (0 == queue->queue_length);
2487   if ( (maxxed) &&
2488        (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2489   {
2490     /* Communicator dropped below threshold, resume all queues */
2491     GNUNET_STATISTICS_update (GST_stats,
2492                               "# Transmission throttled due to communicator queue limit",
2493                               -1,
2494                               GNUNET_NO);
2495     for (struct Queue *s = tc->details.communicator.queue_head;
2496          NULL != s;
2497          s = s->next_client)
2498       schedule_transmit_on_queue (s);
2499   }
2500   notify_monitors (&neighbour->pid,
2501                    queue->address,
2502                    queue->nt,
2503                    &me);
2504   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2505   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2506   GNUNET_free (queue);
2507   if (NULL == neighbour->queue_head)
2508   {
2509     cores_send_disconnect_info (&neighbour->pid);
2510     free_neighbour (neighbour);
2511   }
2512 }
2513
2514
2515 /**
2516  * Free @a ale
2517  *
2518  * @param ale address list entry to free
2519  */
2520 static void
2521 free_address_list_entry (struct AddressListEntry *ale)
2522 {
2523   struct TransportClient *tc = ale->tc;
2524
2525   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2526                                tc->details.communicator.addr_tail,
2527                                ale);
2528   if (NULL != ale->sc)
2529   {
2530     GNUNET_PEERSTORE_store_cancel (ale->sc);
2531     ale->sc = NULL;
2532   }
2533   if (NULL != ale->st)
2534   {
2535     GNUNET_SCHEDULER_cancel (ale->st);
2536     ale->st = NULL;
2537   }
2538   GNUNET_free (ale);
2539 }
2540
2541
2542 /**
2543  * Stop the peer request in @a value.
2544  *
2545  * @param cls a `struct TransportClient` that no longer makes the request
2546  * @param pid the peer's identity
2547  * @param value a `struct PeerRequest`
2548  * @return #GNUNET_YES (always)
2549  */
2550 static int
2551 stop_peer_request (void *cls,
2552                    const struct GNUNET_PeerIdentity *pid,
2553                    void *value)
2554 {
2555   struct TransportClient *tc = cls;
2556   struct PeerRequest *pr = value;
2557
2558   GNUNET_PEERSTORE_watch_cancel (pr->wc);
2559   GNUNET_assert (GNUNET_YES ==
2560                  GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
2561                                                        pid,
2562                                                        pr));
2563   GNUNET_free (pr);
2564
2565   return GNUNET_OK;
2566 }
2567
2568
2569 /**
2570  * Called whenever a client is disconnected.  Frees our
2571  * resources associated with that client.
2572  *
2573  * @param cls closure, NULL
2574  * @param client identification of the client
2575  * @param app_ctx our `struct TransportClient`
2576  */
2577 static void
2578 client_disconnect_cb (void *cls,
2579                       struct GNUNET_SERVICE_Client *client,
2580                       void *app_ctx)
2581 {
2582   struct TransportClient *tc = app_ctx;
2583
2584   (void) cls;
2585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2586               "Client %p disconnected, cleaning up.\n",
2587               tc);
2588   GNUNET_CONTAINER_DLL_remove (clients_head,
2589                                clients_tail,
2590                                tc);
2591   switch (tc->type)
2592   {
2593   case CT_NONE:
2594     break;
2595   case CT_CORE:
2596     {
2597       struct PendingMessage *pm;
2598
2599       while (NULL != (pm = tc->details.core.pending_msg_head))
2600       {
2601         GNUNET_CONTAINER_MDLL_remove (client,
2602                                       tc->details.core.pending_msg_head,
2603                                       tc->details.core.pending_msg_tail,
2604                                       pm);
2605         pm->client = NULL;
2606       }
2607     }
2608     break;
2609   case CT_MONITOR:
2610     break;
2611   case CT_COMMUNICATOR:
2612     {
2613       struct Queue *q;
2614       struct AddressListEntry *ale;
2615
2616       while (NULL != (q = tc->details.communicator.queue_head))
2617         free_queue (q);
2618       while (NULL != (ale = tc->details.communicator.addr_head))
2619         free_address_list_entry (ale);
2620       GNUNET_free (tc->details.communicator.address_prefix);
2621     }
2622     break;
2623   case CT_APPLICATION:
2624     GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
2625                                            &stop_peer_request,
2626                                            tc);
2627     GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
2628     break;
2629   }
2630   GNUNET_free (tc);
2631 }
2632
2633
2634 /**
2635  * Iterator telling new CORE client about all existing
2636  * connections to peers.
2637  *
2638  * @param cls the new `struct TransportClient`
2639  * @param pid a connected peer
2640  * @param value the `struct Neighbour` with more information
2641  * @return #GNUNET_OK (continue to iterate)
2642  */
2643 static int
2644 notify_client_connect_info (void *cls,
2645                             const struct GNUNET_PeerIdentity *pid,
2646                             void *value)
2647 {
2648   struct TransportClient *tc = cls;
2649   struct Neighbour *neighbour = value;
2650
2651   core_send_connect_info (tc,
2652                           pid,
2653                           neighbour->quota_out);
2654   return GNUNET_OK;
2655 }
2656
2657
2658 /**
2659  * Initialize a "CORE" client.  We got a start message from this
2660  * client, so add it to the list of clients for broadcasting of
2661  * inbound messages.
2662  *
2663  * @param cls the client
2664  * @param start the start message that was sent
2665  */
2666 static void
2667 handle_client_start (void *cls,
2668                      const struct StartMessage *start)
2669 {
2670   struct TransportClient *tc = cls;
2671   uint32_t options;
2672
2673   options = ntohl (start->options);
2674   if ( (0 != (1 & options)) &&
2675        (0 !=
2676         GNUNET_memcmp (&start->self,
2677                        &GST_my_identity)) )
2678   {
2679     /* client thinks this is a different peer, reject */
2680     GNUNET_break (0);
2681     GNUNET_SERVICE_client_drop (tc->client);
2682     return;
2683   }
2684   if (CT_NONE != tc->type)
2685   {
2686     GNUNET_break (0);
2687     GNUNET_SERVICE_client_drop (tc->client);
2688     return;
2689   }
2690   tc->type = CT_CORE;
2691   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2692                                          &notify_client_connect_info,
2693                                          tc);
2694   GNUNET_SERVICE_client_continue (tc->client);
2695 }
2696
2697
2698 /**
2699  * Client asked for transmission to a peer.  Process the request.
2700  *
2701  * @param cls the client
2702  * @param obm the send message that was sent
2703  */
2704 static int
2705 check_client_send (void *cls,
2706                    const struct OutboundMessage *obm)
2707 {
2708   struct TransportClient *tc = cls;
2709   uint16_t size;
2710   const struct GNUNET_MessageHeader *obmm;
2711
2712   if (CT_CORE != tc->type)
2713   {
2714     GNUNET_break (0);
2715     return GNUNET_SYSERR;
2716   }
2717   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2718   if (size < sizeof (struct GNUNET_MessageHeader))
2719   {
2720     GNUNET_break (0);
2721     return GNUNET_SYSERR;
2722   }
2723   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2724   if (size != ntohs (obmm->size))
2725   {
2726     GNUNET_break (0);
2727     return GNUNET_SYSERR;
2728   }
2729   return GNUNET_OK;
2730 }
2731
2732
2733 /**
2734  * Free fragment tree below @e root, excluding @e root itself.
2735  *
2736  * @param root root of the tree to free
2737  */
2738 static void
2739 free_fragment_tree (struct PendingMessage *root)
2740 {
2741   struct PendingMessage *frag;
2742
2743   while (NULL != (frag = root->head_frag))
2744   {
2745     free_fragment_tree (frag);
2746     GNUNET_CONTAINER_MDLL_remove (frag,
2747                                   root->head_frag,
2748                                   root->tail_frag,
2749                                   frag);
2750     GNUNET_free (frag);
2751   }
2752 }
2753
2754
2755 /**
2756  * Release memory associated with @a pm and remove @a pm from associated
2757  * data structures.  @a pm must be a top-level pending message and not
2758  * a fragment in the tree.  The entire tree is freed (if applicable).
2759  *
2760  * @param pm the pending message to free
2761  */
2762 static void
2763 free_pending_message (struct PendingMessage *pm)
2764 {
2765   struct TransportClient *tc = pm->client;
2766   struct Neighbour *target = pm->target;
2767
2768   if (NULL != tc)
2769   {
2770     GNUNET_CONTAINER_MDLL_remove (client,
2771                                   tc->details.core.pending_msg_head,
2772                                   tc->details.core.pending_msg_tail,
2773                                   pm);
2774   }
2775   GNUNET_CONTAINER_MDLL_remove (neighbour,
2776                                 target->pending_msg_head,
2777                                 target->pending_msg_tail,
2778                                 pm);
2779   free_fragment_tree (pm);
2780   GNUNET_free_non_null (pm->bpm);
2781   GNUNET_free (pm);
2782 }
2783
2784
2785 /**
2786  * Send a response to the @a pm that we have processed a
2787  * "send" request with status @a success. We
2788  * transmitted @a bytes_physical on the actual wire.
2789  * Sends a confirmation to the "core" client responsible
2790  * for the original request and free's @a pm.
2791  *
2792  * @param pm handle to the original pending message
2793  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2794  *          for transmission failure
2795  * @param bytes_physical amount of bandwidth consumed
2796  */
2797 static void
2798 client_send_response (struct PendingMessage *pm,
2799                       int success,
2800                       uint32_t bytes_physical)
2801 {
2802   struct TransportClient *tc = pm->client;
2803   struct Neighbour *target = pm->target;
2804   struct GNUNET_MQ_Envelope *env;
2805   struct SendOkMessage *som;
2806
2807   if (NULL != tc)
2808   {
2809     env = GNUNET_MQ_msg (som,
2810                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2811     som->success = htonl ((uint32_t) success);
2812     som->bytes_msg = htons (pm->bytes_msg);
2813     som->bytes_physical = htonl (bytes_physical);
2814     som->peer = target->pid;
2815     GNUNET_MQ_send (tc->mq,
2816                     env);
2817   }
2818   free_pending_message (pm);
2819 }
2820
2821
2822 /**
2823  * Checks the message queue for a neighbour for messages that have timed
2824  * out and purges them.
2825  *
2826  * @param cls a `struct Neighbour`
2827  */
2828 static void
2829 check_queue_timeouts (void *cls)
2830 {
2831   struct Neighbour *n = cls;
2832   struct PendingMessage *pm;
2833   struct GNUNET_TIME_Absolute now;
2834   struct GNUNET_TIME_Absolute earliest_timeout;
2835
2836   n->timeout_task = NULL;
2837   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2838   now = GNUNET_TIME_absolute_get ();
2839   for (struct PendingMessage *pos = n->pending_msg_head;
2840        NULL != pos;
2841        pos = pm)
2842   {
2843     pm = pos->next_neighbour;
2844     if (pos->timeout.abs_value_us <= now.abs_value_us)
2845     {
2846       GNUNET_STATISTICS_update (GST_stats,
2847                                 "# messages dropped (timeout before confirmation)",
2848                                 1,
2849                                 GNUNET_NO);
2850       client_send_response (pm,
2851                             GNUNET_NO,
2852                             0);
2853       continue;
2854     }
2855     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2856                                                  pos->timeout);
2857   }
2858   n->earliest_timeout = earliest_timeout;
2859   if (NULL != n->pending_msg_head)
2860     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2861                                                &check_queue_timeouts,
2862                                                n);
2863 }
2864
2865
2866 /**
2867  * Client asked for transmission to a peer.  Process the request.
2868  *
2869  * @param cls the client
2870  * @param obm the send message that was sent
2871  */
2872 static void
2873 handle_client_send (void *cls,
2874                     const struct OutboundMessage *obm)
2875 {
2876   struct TransportClient *tc = cls;
2877   struct PendingMessage *pm;
2878   const struct GNUNET_MessageHeader *obmm;
2879   struct Neighbour *target;
2880   uint32_t bytes_msg;
2881   int was_empty;
2882
2883   GNUNET_assert (CT_CORE == tc->type);
2884   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2885   bytes_msg = ntohs (obmm->size);
2886   target = lookup_neighbour (&obm->peer);
2887   if (NULL == target)
2888   {
2889     /* Failure: don't have this peer as a neighbour (anymore).
2890        Might have gone down asynchronously, so this is NOT
2891        a protocol violation by CORE. Still count the event,
2892        as this should be rare. */
2893     struct GNUNET_MQ_Envelope *env;
2894     struct SendOkMessage *som;
2895
2896     env = GNUNET_MQ_msg (som,
2897                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2898     som->success = htonl (GNUNET_SYSERR);
2899     som->bytes_msg = htonl (bytes_msg);
2900     som->bytes_physical = htonl (0);
2901     som->peer = obm->peer;
2902     GNUNET_MQ_send (tc->mq,
2903                     env);
2904     GNUNET_SERVICE_client_continue (tc->client);
2905     GNUNET_STATISTICS_update (GST_stats,
2906                               "# messages dropped (neighbour unknown)",
2907                               1,
2908                               GNUNET_NO);
2909     return;
2910   }
2911   was_empty = (NULL == target->pending_msg_head);
2912   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2913   pm->client = tc;
2914   pm->target = target;
2915   pm->bytes_msg = bytes_msg;
2916   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2917   memcpy (&pm[1],
2918           &obm[1],
2919           bytes_msg);
2920   GNUNET_CONTAINER_MDLL_insert (neighbour,
2921                                 target->pending_msg_head,
2922                                 target->pending_msg_tail,
2923                                 pm);
2924   GNUNET_CONTAINER_MDLL_insert (client,
2925                                 tc->details.core.pending_msg_head,
2926                                 tc->details.core.pending_msg_tail,
2927                                 pm);
2928   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2929   {
2930     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2931     if (NULL != target->timeout_task)
2932       GNUNET_SCHEDULER_cancel (target->timeout_task);
2933     target->timeout_task
2934       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2935                                  &check_queue_timeouts,
2936                                  target);
2937   }
2938   if (! was_empty)
2939     return; /* all queues must already be busy */
2940   for (struct Queue *queue = target->queue_head;
2941        NULL != queue;
2942        queue = queue->next_neighbour)
2943   {
2944     /* try transmission on any queue that is idle */
2945     if (NULL == queue->transmit_task)
2946       queue->transmit_task = GNUNET_SCHEDULER_add_now (&transmit_on_queue,
2947                                                        queue);
2948   }
2949 }
2950
2951
2952 /**
2953  * Communicator started.  Test message is well-formed.
2954  *
2955  * @param cls the client
2956  * @param cam the send message that was sent
2957  */
2958 static int
2959 check_communicator_available (void *cls,
2960                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2961 {
2962   struct TransportClient *tc = cls;
2963   uint16_t size;
2964
2965   if (CT_NONE != tc->type)
2966   {
2967     GNUNET_break (0);
2968     return GNUNET_SYSERR;
2969   }
2970   tc->type = CT_COMMUNICATOR;
2971   size = ntohs (cam->header.size) - sizeof (*cam);
2972   if (0 == size)
2973     return GNUNET_OK; /* receive-only communicator */
2974   GNUNET_MQ_check_zero_termination (cam);
2975   return GNUNET_OK;
2976 }
2977
2978
2979 /**
2980  * Communicator started.  Process the request.
2981  *
2982  * @param cls the client
2983  * @param cam the send message that was sent
2984  */
2985 static void
2986 handle_communicator_available (void *cls,
2987                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2988 {
2989   struct TransportClient *tc = cls;
2990   uint16_t size;
2991
2992   size = ntohs (cam->header.size) - sizeof (*cam);
2993   if (0 == size)
2994     return; /* receive-only communicator */
2995   tc->details.communicator.address_prefix
2996     = GNUNET_strdup ((const char *) &cam[1]);
2997   tc->details.communicator.cc
2998     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2999   GNUNET_SERVICE_client_continue (tc->client);
3000 }
3001
3002
3003 /**
3004  * Communicator requests backchannel transmission.  Check the request.
3005  *
3006  * @param cls the client
3007  * @param cb the send message that was sent
3008  * @return #GNUNET_OK if message is well-formed
3009  */
3010 static int
3011 check_communicator_backchannel (void *cls,
3012                                 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
3013 {
3014   const struct GNUNET_MessageHeader *inbox;
3015   const char *is;
3016   uint16_t msize;
3017   uint16_t isize;
3018
3019   (void) cls;
3020   msize = ntohs (cb->header.size) - sizeof (*cb);
3021   if (UINT16_MAX - msize >
3022       sizeof (struct TransportBackchannelEncapsulationMessage) +
3023       sizeof (struct TransportBackchannelRequestPayload) )
3024   {
3025     GNUNET_break (0);
3026     return GNUNET_SYSERR;
3027   }
3028   inbox = (const struct GNUNET_MessageHeader *) &cb[1];
3029   isize = ntohs (inbox->size);
3030   if (isize >= msize)
3031   {
3032     GNUNET_break (0);
3033     return GNUNET_SYSERR;
3034   }
3035   is = (const char *) inbox;
3036   is += isize;
3037   msize -= isize;
3038   GNUNET_assert (msize > 0);
3039   if ('\0' != is[msize-1])
3040   {
3041     GNUNET_break (0);
3042     return GNUNET_SYSERR;
3043   }
3044   return GNUNET_OK;
3045 }
3046
3047
3048 /**
3049  * Remove memory used by expired ephemeral keys.
3050  *
3051  * @param cls NULL
3052  */
3053 static void
3054 expire_ephemerals (void *cls)
3055 {
3056   struct EphemeralCacheEntry *ece;
3057
3058   (void) cls;
3059   ephemeral_task = NULL;
3060   while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
3061   {
3062     if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
3063     {
3064       free_ephemeral (ece);
3065       continue;
3066     }
3067     ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
3068                                               &expire_ephemerals,
3069                                               NULL);
3070     return;
3071   }
3072 }
3073
3074
3075 /**
3076  * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
3077  * one, cache it and return it.
3078  *
3079  * @param pid peer to look up ephemeral for
3080  * @param private_key[out] set to the private key
3081  * @param ephemeral_key[out] set to the key
3082  * @param ephemeral_sender_sig[out] set to the signature
3083  * @param ephemeral_validity[out] set to the validity expiration time
3084  */
3085 static void
3086 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
3087                   struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
3088                   struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
3089                   struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
3090                   struct GNUNET_TIME_Absolute *ephemeral_validity)
3091 {
3092   struct EphemeralCacheEntry *ece;
3093   struct EphemeralConfirmation ec;
3094
3095   ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
3096                                            pid);
3097   if ( (NULL != ece) &&
3098        (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
3099   {
3100     free_ephemeral (ece);
3101     ece = NULL;
3102   }
3103   if (NULL == ece)
3104   {
3105     ece = GNUNET_new (struct EphemeralCacheEntry);
3106     ece->target = *pid;
3107     ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
3108                                                         EPHEMERAL_VALIDITY);
3109     GNUNET_assert (GNUNET_OK ==
3110                    GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
3111     GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
3112                                         &ece->ephemeral_key);
3113     ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
3114     ec.purpose.size = htonl (sizeof (ec));
3115     ec.target = *pid;
3116     ec.ephemeral_key = ece->ephemeral_key;
3117     GNUNET_assert (GNUNET_OK ==
3118                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
3119                                              &ec.purpose,
3120                                              &ece->sender_sig));
3121     ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
3122                                             ece,
3123                                             ece->ephemeral_validity.abs_value_us);
3124     GNUNET_assert (GNUNET_OK ==
3125                    GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
3126                                                       &ece->target,
3127                                                       ece,
3128                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3129     if (NULL == ephemeral_task)
3130       ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
3131                                                 &expire_ephemerals,
3132                                                 NULL);
3133   }
3134   *private_key = ece->private_key;
3135   *ephemeral_key = ece->ephemeral_key;
3136   *ephemeral_sender_sig = ece->sender_sig;
3137   *ephemeral_validity = ece->ephemeral_validity;
3138 }
3139
3140
3141 /**
3142  * We need to transmit @a hdr to @a target.  If necessary, this may
3143  * involve DV routing or even broadcasting and fragmentation.
3144  *
3145  * @param target peer to receive @a hdr
3146  * @param hdr header of the message to route
3147  */
3148 static void
3149 route_message (const struct GNUNET_PeerIdentity *target,
3150                struct GNUNET_MessageHeader *hdr)
3151 {
3152   // FIXME: this one is tricky:
3153   // - we could try a direct, reliable channel
3154   // - if that is unavailable / for load balancing, we may try:
3155   //   * multiple (?) direct unreliable channels - depending on loss rate?
3156   //   * some (?) DV channels - if above unavailable / too lossy?
3157   //   * _random_ other peers ("broadcasting") in hope of *discovering*
3158   //      a path back! - if all else fails
3159   // => need more on DV first!
3160
3161   // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
3162   GNUNET_free (hdr);
3163 }
3164
3165
3166 /**
3167  * Communicator requests backchannel transmission.  Process the request.
3168  *
3169  * @param cls the client
3170  * @param cb the send message that was sent
3171  */
3172 static void
3173 handle_communicator_backchannel (void *cls,
3174                                  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
3175 {
3176   struct TransportClient *tc = cls;
3177   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
3178   struct GNUNET_TIME_Absolute ephemeral_validity;
3179   struct TransportBackchannelEncapsulationMessage *enc;
3180   struct TransportBackchannelRequestPayload ppay;
3181   char *mpos;
3182   uint16_t msize;
3183
3184   /* encapsulate and encrypt message */
3185   msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
3186   enc = GNUNET_malloc (sizeof (*enc) + msize);
3187   enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
3188   enc->header.size = htons (sizeof (*enc) + msize);
3189   enc->target = cb->pid;
3190   lookup_ephemeral (&cb->pid,
3191                     &private_key,
3192                     &enc->ephemeral_key,
3193                     &ppay.sender_sig,
3194                     &ephemeral_validity);
3195   // FIXME: setup 'iv'
3196 #if FIXME
3197   dh_key_derive (&private_key,
3198                  &cb->pid,
3199                  &enc->iv,
3200                  &key);
3201 #endif
3202   ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
3203   ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
3204   mpos = (char *) &enc[1];
3205 #if FIXME
3206   encrypt (key,
3207            &ppay,
3208            &mpos,
3209            sizeof (ppay));
3210   encrypt (key,
3211            &cb[1],
3212            &mpos,
3213            ntohs (cb->header.size) - sizeof (*cb));
3214   hmac (key,
3215         &enc->hmac);
3216 #endif
3217   route_message (&cb->pid,
3218                  &enc->header);
3219   GNUNET_SERVICE_client_continue (tc->client);
3220 }
3221
3222
3223 /**
3224  * Address of our peer added.  Test message is well-formed.
3225  *
3226  * @param cls the client
3227  * @param aam the send message that was sent
3228  * @return #GNUNET_OK if message is well-formed
3229  */
3230 static int
3231 check_add_address (void *cls,
3232                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
3233 {
3234   struct TransportClient *tc = cls;
3235
3236   if (CT_COMMUNICATOR != tc->type)
3237   {
3238     GNUNET_break (0);
3239     return GNUNET_SYSERR;
3240   }
3241   GNUNET_MQ_check_zero_termination (aam);
3242   return GNUNET_OK;
3243 }
3244
3245
3246 /**
3247  * Ask peerstore to store our address.
3248  *
3249  * @param cls an `struct AddressListEntry *`
3250  */
3251 static void
3252 store_pi (void *cls);
3253
3254
3255 /**
3256  * Function called when peerstore is done storing our address.
3257  *
3258  * @param cls a `struct AddressListEntry`
3259  * @param success #GNUNET_YES if peerstore was successful
3260  */
3261 static void
3262 peerstore_store_own_cb (void *cls,
3263                         int success)
3264 {
3265   struct AddressListEntry *ale = cls;
3266
3267   ale->sc = NULL;
3268   if (GNUNET_YES != success)
3269     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3270                 "Failed to store our own address `%s' in peerstore!\n",
3271                 ale->address);
3272   /* refresh period is 1/4 of expiration time, that should be plenty
3273      without being excessive. */
3274   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
3275                                                                        4ULL),
3276                                           &store_pi,
3277                                           ale);
3278 }
3279
3280
3281 /**
3282  * Ask peerstore to store our address.
3283  *
3284  * @param cls an `struct AddressListEntry *`
3285  */
3286 static void
3287 store_pi (void *cls)
3288 {
3289   struct AddressListEntry *ale = cls;
3290   void *addr;
3291   size_t addr_len;
3292   struct GNUNET_TIME_Absolute expiration;
3293
3294   ale->st = NULL;
3295   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
3296   GNUNET_HELLO_sign_address (ale->address,
3297                              ale->nt,
3298                              expiration,
3299                              GST_my_private_key,
3300                              &addr,
3301                              &addr_len);
3302   ale->sc = GNUNET_PEERSTORE_store (peerstore,
3303                                     "transport",
3304                                     &GST_my_identity,
3305                                     GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
3306                                     addr,
3307                                     addr_len,
3308                                     expiration,
3309                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
3310                                     &peerstore_store_own_cb,
3311                                     ale);
3312   GNUNET_free (addr);
3313   if (NULL == ale->sc)
3314   {
3315     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3316                 "Failed to store our address `%s' with peerstore\n",
3317                 ale->address);
3318     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
3319                                             &store_pi,
3320                                             ale);
3321   }
3322 }
3323
3324
3325 /**
3326  * Address of our peer added.  Process the request.
3327  *
3328  * @param cls the client
3329  * @param aam the send message that was sent
3330  */
3331 static void
3332 handle_add_address (void *cls,
3333                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
3334 {
3335   struct TransportClient *tc = cls;
3336   struct AddressListEntry *ale;
3337   size_t slen;
3338
3339   slen = ntohs (aam->header.size) - sizeof (*aam);
3340   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
3341   ale->tc = tc;
3342   ale->address = (const char *) &ale[1];
3343   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
3344   ale->aid = aam->aid;
3345   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
3346   memcpy (&ale[1],
3347           &aam[1],
3348           slen);
3349   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
3350                                tc->details.communicator.addr_tail,
3351                                ale);
3352   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
3353                                       ale);
3354   GNUNET_SERVICE_client_continue (tc->client);
3355 }
3356
3357
3358 /**
3359  * Address of our peer deleted.  Process the request.
3360  *
3361  * @param cls the client
3362  * @param dam the send message that was sent
3363  */
3364 static void
3365 handle_del_address (void *cls,
3366                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
3367 {
3368   struct TransportClient *tc = cls;
3369
3370   if (CT_COMMUNICATOR != tc->type)
3371   {
3372     GNUNET_break (0);
3373     GNUNET_SERVICE_client_drop (tc->client);
3374     return;
3375   }
3376   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
3377        NULL != ale;
3378        ale = ale->next)
3379   {
3380     if (dam->aid != ale->aid)
3381       continue;
3382     GNUNET_assert (ale->tc == tc);
3383     free_address_list_entry (ale);
3384     GNUNET_SERVICE_client_continue (tc->client);
3385   }
3386   GNUNET_break (0);
3387   GNUNET_SERVICE_client_drop (tc->client);
3388 }
3389
3390
3391 /**
3392  * Context from #handle_incoming_msg().  Closure for many
3393  * message handlers below.
3394  */
3395 struct CommunicatorMessageContext
3396 {
3397   /**
3398    * Which communicator provided us with the message.
3399    */
3400   struct TransportClient *tc;
3401
3402   /**
3403    * Additional information for flow control and about the sender.
3404    */
3405   struct GNUNET_TRANSPORT_IncomingMessage im;
3406
3407   /**
3408    * Number of hops the message has travelled (if DV-routed).
3409    * FIXME: make use of this in ACK handling!
3410    */
3411   uint16_t total_hops;
3412 };
3413
3414
3415 /**
3416  * Given an inbound message @a msg from a communicator @a cmc,
3417  * demultiplex it based on the type calling the right handler.
3418  *
3419  * @param cmc context for demultiplexing
3420  * @param msg message to demultiplex
3421  */
3422 static void
3423 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3424                       const struct GNUNET_MessageHeader *msg);
3425
3426
3427 /**
3428  * Send ACK to communicator (if requested) and free @a cmc.
3429  *
3430  * @param cmc context for which we are done handling the message
3431  */
3432 static void
3433 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
3434 {
3435   if (0 != ntohl (cmc->im.fc_on))
3436   {
3437     /* send ACK when done to communicator for flow control! */
3438     struct GNUNET_MQ_Envelope *env;
3439     struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
3440
3441     env = GNUNET_MQ_msg (ack,
3442                          GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
3443     ack->reserved = htonl (0);
3444     ack->fc_id = cmc->im.fc_id;
3445     ack->sender = cmc->im.sender;
3446     GNUNET_MQ_send (cmc->tc->mq,
3447                     env);
3448   }
3449   GNUNET_SERVICE_client_continue (cmc->tc->client);
3450   GNUNET_free (cmc);
3451 }
3452
3453
3454 /**
3455  * Communicator gave us an unencapsulated message to pass as-is to
3456  * CORE.  Process the request.
3457  *
3458  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3459  * @param mh the message that was received
3460  */
3461 static void
3462 handle_raw_message (void *cls,
3463                     const struct GNUNET_MessageHeader *mh)
3464 {
3465   struct CommunicatorMessageContext *cmc = cls;
3466   uint16_t size = ntohs (mh->size);
3467
3468   if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
3469        (size < sizeof (struct GNUNET_MessageHeader)) )
3470   {
3471     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3472
3473     GNUNET_break (0);
3474     finish_cmc_handling (cmc);
3475     GNUNET_SERVICE_client_drop (client);
3476     return;
3477   }
3478   /* Forward to all CORE clients */
3479   for (struct TransportClient *tc = clients_head;
3480        NULL != tc;
3481        tc = tc->next)
3482   {
3483     struct GNUNET_MQ_Envelope *env;
3484     struct InboundMessage *im;
3485
3486     if (CT_CORE != tc->type)
3487       continue;
3488     env = GNUNET_MQ_msg_extra (im,
3489                                size,
3490                                GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
3491     im->peer = cmc->im.sender;
3492     memcpy (&im[1],
3493             mh,
3494             size);
3495     GNUNET_MQ_send (tc->mq,
3496                     env);
3497   }
3498   /* FIXME: consider doing this _only_ once the message
3499      was drained from the CORE MQs to extend flow control to CORE!
3500      (basically, increment counter in cmc, decrement on MQ send continuation! */
3501   finish_cmc_handling (cmc);
3502 }
3503
3504
3505 /**
3506  * Communicator gave us a fragment box.  Check the message.
3507  *
3508  * @param cls a `struct CommunicatorMessageContext`
3509  * @param fb the send message that was sent
3510  * @return #GNUNET_YES if message is well-formed
3511  */
3512 static int
3513 check_fragment_box (void *cls,
3514                     const struct TransportFragmentBox *fb)
3515 {
3516   uint16_t size = ntohs (fb->header.size);
3517   uint16_t bsize = size - sizeof (*fb);
3518
3519   if (0 == bsize)
3520   {
3521     GNUNET_break_op (0);
3522     return GNUNET_SYSERR;
3523   }
3524   if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
3525   {
3526     GNUNET_break_op (0);
3527     return GNUNET_SYSERR;
3528   }
3529   if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
3530   {
3531     GNUNET_break_op (0);
3532     return GNUNET_SYSERR;
3533   }
3534   return GNUNET_YES;
3535 }
3536
3537
3538 /**
3539  * Generate a fragment acknowledgement for an @a rc.
3540  *
3541  * @param rc context to generate ACK for, @a rc ACK state is reset
3542  */
3543 static void
3544 send_fragment_ack (struct ReassemblyContext *rc)
3545 {
3546   struct TransportFragmentAckMessage *ack;
3547
3548   ack = GNUNET_new (struct TransportFragmentAckMessage);
3549   ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3550   ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3551   ack->frag_uuid = htonl (rc->frag_uuid);
3552   ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3553   ack->msg_uuid = rc->msg_uuid;
3554   ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3555   if (0 == rc->msg_missing)
3556     ack->reassembly_timeout
3557       = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3558   else
3559     ack->reassembly_timeout
3560       = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3561   route_message (&rc->neighbour->pid,
3562                  &ack->header);
3563   rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3564   rc->num_acks = 0;
3565   rc->extra_acks = 0LLU;
3566 }
3567
3568
3569 /**
3570  * Communicator gave us a fragment.  Process the request.
3571  *
3572  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3573  * @param fb the message that was received
3574  */
3575 static void
3576 handle_fragment_box (void *cls,
3577                      const struct TransportFragmentBox *fb)
3578 {
3579   struct CommunicatorMessageContext *cmc = cls;
3580   struct Neighbour *n;
3581   struct ReassemblyContext *rc;
3582   const struct GNUNET_MessageHeader *msg;
3583   uint16_t msize;
3584   uint16_t fsize;
3585   uint16_t frag_off;
3586   uint32_t frag_uuid;
3587   char *target;
3588   struct GNUNET_TIME_Relative cdelay;
3589   int ack_now;
3590
3591   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3592                                          &cmc->im.sender);
3593   if (NULL == n)
3594   {
3595     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3596
3597     GNUNET_break (0);
3598     finish_cmc_handling (cmc);
3599     GNUNET_SERVICE_client_drop (client);
3600     return;
3601   }
3602   if (NULL == n->reassembly_map)
3603   {
3604     n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
3605                                                                GNUNET_YES);
3606     n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3607     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
3608                                                                &reassembly_cleanup_task,
3609                                                                n);
3610   }
3611   msize = ntohs (fb->msg_size);
3612   rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
3613                                            &fb->msg_uuid);
3614   if (NULL == rc)
3615   {
3616     rc = GNUNET_malloc (sizeof (*rc) +
3617                         msize + /* reassembly payload buffer */
3618                         (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
3619     rc->msg_uuid = fb->msg_uuid;
3620     rc->neighbour = n;
3621     rc->msg_size = msize;
3622     rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
3623     rc->last_frag = GNUNET_TIME_absolute_get ();
3624     rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
3625                                            rc,
3626                                            rc->reassembly_timeout.abs_value_us);
3627     GNUNET_assert (GNUNET_OK ==
3628                    GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
3629                                                        &rc->msg_uuid,
3630                                                        rc,
3631                                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3632     target = (char *) &rc[1];
3633     rc->bitfield = (uint8_t *) (target + rc->msg_size);
3634     rc->msg_missing = rc->msg_size;
3635   }
3636   else
3637   {
3638     target = (char *) &rc[1];
3639   }
3640   if (msize != rc->msg_size)
3641   {
3642     GNUNET_break (0);
3643     finish_cmc_handling (cmc);
3644     return;
3645   }
3646
3647   /* reassemble */
3648   fsize = ntohs (fb->header.size) - sizeof (*fb);
3649   frag_off = ntohs (fb->frag_off);
3650   memcpy (&target[frag_off],
3651           &fb[1],
3652           fsize);
3653   /* update bitfield and msg_missing */
3654   for (unsigned int i=frag_off;i<frag_off+fsize;i++)
3655   {
3656     if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
3657     {
3658       rc->bitfield[i / 8] |= (1 << (i % 8));
3659       rc->msg_missing--;
3660     }
3661   }
3662
3663   /* Compute cummulative ACK */
3664   frag_uuid = ntohl (fb->frag_uuid);
3665   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
3666   cdelay = GNUNET_TIME_relative_multiply (cdelay,
3667                                           rc->num_acks);
3668   rc->last_frag = GNUNET_TIME_absolute_get ();
3669   rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
3670                                                 cdelay);
3671   ack_now = GNUNET_NO;
3672   if (0 == rc->num_acks)
3673   {
3674     /* case one: first ack */
3675     rc->frag_uuid = frag_uuid;
3676     rc->extra_acks = 0LLU;
3677     rc->num_acks = 1;
3678   }
3679   else if ( (frag_uuid >= rc->frag_uuid) &&
3680             (frag_uuid <= rc->frag_uuid + 64) )
3681   {
3682     /* case two: ack fits after existing min UUID */
3683     if ( (frag_uuid == rc->frag_uuid) ||
3684          (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
3685     {
3686       /* duplicate fragment, ack now! */
3687       ack_now = GNUNET_YES;
3688     }
3689     else
3690     {
3691       rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
3692       rc->num_acks++;
3693     }
3694   }
3695   else if ( (rc->frag_uuid > frag_uuid) &&
3696             ( ( (rc->frag_uuid == frag_uuid + 64) &&
3697                 (0 == rc->extra_acks) ) ||
3698               ( (rc->frag_uuid < frag_uuid + 64) &&
3699                 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
3700   {
3701     /* can fit ack by shifting extra acks and starting at
3702        frag_uid, test above esured that the bits we will
3703        shift 'extra_acks' by are all zero. */
3704     rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
3705     rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
3706     rc->frag_uuid = frag_uuid;
3707     rc->num_acks++;
3708   }
3709   if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
3710     ack_now = GNUNET_YES; /* maximum acks received */
3711   // FIXME: possibly also ACK based on RTT (but for that we'd need to
3712   // determine the queue used for the ACK first!)
3713
3714   /* is reassembly complete? */
3715   if (0 != rc->msg_missing)
3716   {
3717     if (ack_now)
3718       send_fragment_ack (rc);
3719     finish_cmc_handling (cmc);
3720     return;
3721   }
3722   /* reassembly is complete, verify result */
3723   msg = (const struct GNUNET_MessageHeader *) &rc[1];
3724   if (ntohs (msg->size) != rc->msg_size)
3725   {
3726     GNUNET_break (0);
3727     free_reassembly_context (rc);
3728     finish_cmc_handling (cmc);
3729     return;
3730   }
3731   /* successful reassembly */
3732   send_fragment_ack (rc);
3733   demultiplex_with_cmc (cmc,
3734                         msg);
3735   /* FIXME: really free here? Might be bad if fragments are still
3736      en-route and we forget that we finished this reassembly immediately!
3737      -> keep around until timeout?
3738      -> shorten timeout based on ACK? */
3739   free_reassembly_context (rc);
3740 }
3741
3742
3743 /**
3744  * Check the @a fa against the fragments associated with @a pm.
3745  * If it matches, remove the matching fragments from the transmission
3746  * list.
3747  *
3748  * @param pm pending message to check against the ack
3749  * @param fa the ack that was received
3750  * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
3751  */
3752 static int
3753 check_ack_against_pm (struct PendingMessage *pm,
3754                       const struct TransportFragmentAckMessage *fa)
3755 {
3756   int match;
3757   struct PendingMessage *nxt;
3758   uint32_t fs = ntohl (fa->frag_uuid);
3759   uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
3760
3761   match = GNUNET_NO;
3762   for (struct PendingMessage *frag = pm->head_frag;
3763        NULL != frag;
3764        frag = nxt)
3765   {
3766     const struct TransportFragmentBox *tfb
3767       = (const struct TransportFragmentBox *) &pm[1];
3768     uint32_t fu = ntohl (tfb->frag_uuid);
3769
3770     GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
3771     nxt = frag->next_frag;
3772     /* Check for exact match or match in the 'xtra' bitmask */
3773     if ( (fu == fs) ||
3774          ( (fu > fs) &&
3775            (fu <= fs + 64) &&
3776            (0 != (1LLU << (fu - fs - 1) & xtra)) ) )
3777     {
3778       match = GNUNET_YES;
3779       free_fragment_tree (frag);
3780     }
3781   }
3782   return match;
3783 }
3784
3785
3786 /**
3787  * Communicator gave us a fragment acknowledgement.  Process the request.
3788  *
3789  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3790  * @param fa the message that was received
3791  */
3792 static void
3793 handle_fragment_ack (void *cls,
3794                      const struct TransportFragmentAckMessage *fa)
3795 {
3796   struct CommunicatorMessageContext *cmc = cls;
3797   struct Neighbour *n;
3798   int matched;
3799
3800   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3801                                          &cmc->im.sender);
3802   if (NULL == n)
3803   {
3804     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3805
3806     GNUNET_break (0);
3807     finish_cmc_handling (cmc);
3808     GNUNET_SERVICE_client_drop (client);
3809     return;
3810   }
3811   /* FIXME-OPTIMIZE: maybe use another hash map here? */
3812   matched = GNUNET_NO;
3813   for (struct PendingMessage *pm = n->pending_msg_head;
3814        NULL != pm;
3815        pm = pm->prev_neighbour)
3816   {
3817     if (0 !=
3818         GNUNET_memcmp (&fa->msg_uuid,
3819                        &pm->msg_uuid))
3820       continue;
3821     matched = GNUNET_YES;
3822     if (GNUNET_YES ==
3823         check_ack_against_pm (pm,
3824                               fa))
3825     {
3826       struct GNUNET_TIME_Relative avg_ack_delay
3827         = GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
3828       // FIXME: update RTT and other reliability data!
3829       // ISSUE: we don't know which of n's queues the message(s)
3830       // took (and in fact the different messages might have gone
3831       // over different queues and possibly over multiple).
3832       // => track queues with PendingMessages, and update RTT only if
3833       //    the queue used is unique?
3834       //    -> how can we get loss rates?
3835       //    -> or, add extra state to Box and ACK to identify queue?
3836       // IDEA: generate MULTIPLE frag-uuids per fragment and track
3837       //    the queue with the fragment! (-> this logic must
3838       //    be moved into check_ack_against_pm!)
3839       (void) avg_ack_delay;
3840     }
3841     else
3842     {
3843       GNUNET_STATISTICS_update (GST_stats,
3844                                 "# FRAGMENT_ACKS dropped, no matching fragment",
3845                                 1,
3846                                 GNUNET_NO);
3847     }
3848     if (NULL == pm->head_frag)
3849     {
3850       // if entire message is ACKed, handle that as well.
3851       // => clean up PM, any post actions?
3852       free_pending_message (pm);
3853     }
3854     else
3855     {
3856       struct GNUNET_TIME_Relative reassembly_timeout
3857         = GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
3858       // OPTIMIZE-FIXME: adjust retransmission strategy based on reassembly_timeout!
3859       (void) reassembly_timeout;
3860     }
3861     break;
3862   }
3863   if (GNUNET_NO == matched)
3864   {
3865     GNUNET_STATISTICS_update (GST_stats,
3866                               "# FRAGMENT_ACKS dropped, no matching pending message",
3867                               1,
3868                               GNUNET_NO);
3869   }
3870   finish_cmc_handling (cmc);
3871 }
3872
3873
3874 /**
3875  * Communicator gave us a reliability box.  Check the message.
3876  *
3877  * @param cls a `struct CommunicatorMessageContext`
3878  * @param rb the send message that was sent
3879  * @return #GNUNET_YES if message is well-formed
3880  */
3881 static int
3882 check_reliability_box (void *cls,
3883                        const struct TransportReliabilityBox *rb)
3884 {
3885   GNUNET_MQ_check_boxed_message (rb);
3886   return GNUNET_YES;
3887 }
3888
3889
3890 /**
3891  * Communicator gave us a reliability box.  Process the request.
3892  *
3893  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3894  * @param rb the message that was received
3895  */
3896 static void
3897 handle_reliability_box (void *cls,
3898                         const struct TransportReliabilityBox *rb)
3899 {
3900   struct CommunicatorMessageContext *cmc = cls;
3901   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
3902
3903   if (0 == ntohl (rb->ack_countdown))
3904   {
3905     struct TransportReliabilityAckMessage *ack;
3906
3907     /* FIXME: implement cummulative ACKs and ack_countdown,
3908        then setting the avg_ack_delay field below: */
3909     ack = GNUNET_malloc (sizeof (*ack) +
3910                          sizeof (struct GNUNET_ShortHashCode));
3911     ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
3912     ack->header.size = htons (sizeof (*ack) +
3913                               sizeof (struct GNUNET_ShortHashCode));
3914     memcpy (&ack[1],
3915             &rb->msg_uuid,
3916             sizeof (struct GNUNET_ShortHashCode));
3917     route_message (&cmc->im.sender,
3918                    &ack->header);
3919   }
3920   /* continue with inner message */
3921   demultiplex_with_cmc (cmc,
3922                         inbox);
3923 }
3924
3925
3926 /**
3927  * Communicator gave us a reliability ack.  Process the request.
3928  *
3929  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3930  * @param ra the message that was received
3931  */
3932 static void
3933 handle_reliability_ack (void *cls,
3934                         const struct TransportReliabilityAckMessage *ra)
3935 {
3936   struct CommunicatorMessageContext *cmc = cls;
3937   struct Neighbour *n;
3938   unsigned int n_acks;
3939   const struct GNUNET_ShortHashCode *msg_uuids;
3940   struct PendingMessage *nxt;
3941   int matched;
3942
3943   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3944                                          &cmc->im.sender);
3945   if (NULL == n)
3946   {
3947     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3948
3949     GNUNET_break (0);
3950     finish_cmc_handling (cmc);
3951     GNUNET_SERVICE_client_drop (client);
3952     return;
3953   }
3954   n_acks = (ntohs (ra->header.size) - sizeof (*ra))
3955     / sizeof (struct GNUNET_ShortHashCode);
3956   msg_uuids = (const struct GNUNET_ShortHashCode *) &ra[1];
3957
3958   /* FIXME-OPTIMIZE: maybe use another hash map here? */
3959   matched = GNUNET_NO;
3960   for (struct PendingMessage *pm = n->pending_msg_head;
3961        NULL != pm;
3962        pm = nxt)
3963   {
3964     int in_list;
3965
3966     nxt = pm->next_neighbour;
3967     in_list = GNUNET_NO;
3968     for (unsigned int i=0;i<n_acks;i++)
3969     {
3970       if (0 !=
3971           GNUNET_memcmp (&msg_uuids[i],
3972                          &pm->msg_uuid))
3973         continue;
3974       in_list = GNUNET_YES;
3975       break;
3976     }
3977     if (GNUNET_NO == in_list)
3978       continue;
3979
3980     /* this pm was acked! */
3981     matched = GNUNET_YES;
3982     free_pending_message (pm);
3983
3984     {
3985       struct GNUNET_TIME_Relative avg_ack_delay
3986         = GNUNET_TIME_relative_ntoh (ra->avg_ack_delay);
3987       // FIXME: update RTT and other reliability data!
3988       // ISSUE: we don't know which of n's queues the message(s)
3989       // took (and in fact the different messages might have gone
3990       // over different queues and possibly over multiple).
3991       // => track queues with PendingMessages, and update RTT only if
3992       //    the queue used is unique?
3993       //    -> how can we get loss rates?
3994       //    -> or, add extra state to MSG and ACKs to identify queue?
3995       //    -> if we do this, might just do the same for the avg_ack_delay!
3996       (void) avg_ack_delay;
3997     }
3998   }
3999   if (GNUNET_NO == matched)
4000   {
4001     GNUNET_STATISTICS_update (GST_stats,
4002                               "# FRAGMENT_ACKS dropped, no matching pending message",
4003                               1,
4004                               GNUNET_NO);
4005   }
4006   finish_cmc_handling (cmc);
4007 }
4008
4009
4010 /**
4011  * Communicator gave us a backchannel encapsulation.  Check the message.
4012  *
4013  * @param cls a `struct CommunicatorMessageContext`
4014  * @param be the send message that was sent
4015  * @return #GNUNET_YES if message is well-formed
4016  */
4017 static int
4018 check_backchannel_encapsulation (void *cls,
4019                                  const struct TransportBackchannelEncapsulationMessage *be)
4020 {
4021   uint16_t size = ntohs (be->header.size);
4022
4023   (void) cls;
4024   if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
4025   {
4026     GNUNET_break_op (0);
4027     return GNUNET_SYSERR;
4028   }
4029   return GNUNET_YES;
4030 }
4031
4032
4033 /**
4034  * Communicator gave us a backchannel encapsulation.  Process the request.
4035  *
4036  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4037  * @param be the message that was received
4038  */
4039 static void
4040 handle_backchannel_encapsulation (void *cls,
4041                                   const struct TransportBackchannelEncapsulationMessage *be)
4042 {
4043   struct CommunicatorMessageContext *cmc = cls;
4044
4045   if (0 != GNUNET_memcmp (&be->target,
4046                           &GST_my_identity))
4047   {
4048     /* not for me, try to route to target */
4049     route_message (&be->target,
4050                    GNUNET_copy_message (&be->header));
4051     finish_cmc_handling (cmc);
4052     return;
4053   }
4054   // FIXME: compute shared secret
4055   // FIXME: check HMAC
4056   // FIXME: decrypt payload
4057   // FIXME: forward to specified communicator!
4058   // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
4059   finish_cmc_handling (cmc);
4060 }
4061
4062
4063 /**
4064  * Task called when we should check if any of the DV paths
4065  * we have learned to a target are due for garbage collection.
4066  *
4067  * Collects stale paths, and possibly frees the entire DV
4068  * entry if no paths are left. Otherwise re-schedules itself.
4069  *
4070  * @param cls a `struct DistanceVector`
4071  */
4072 static void
4073 path_cleanup_cb (void *cls)
4074 {
4075   struct DistanceVector *dv = cls;
4076   struct DistanceVectorHop *pos;
4077
4078   dv->timeout_task = NULL;
4079   while (NULL != (pos = dv->dv_head))
4080   {
4081     GNUNET_assert (dv == pos->dv);
4082     if (GNUNET_TIME_absolute_get_remaining (pos->timeout).rel_value_us > 0)
4083       break;
4084     free_distance_vector_hop (pos);
4085   }
4086   if (NULL == pos)
4087   {
4088     free_dv_route (dv);
4089     return;
4090   }
4091   dv->timeout_task = GNUNET_SCHEDULER_add_at (pos->timeout,
4092                                               &path_cleanup_cb,
4093                                               dv);
4094 }
4095
4096
4097 /**
4098  * We have learned a @a path through the network to some other peer, add it to
4099  * our DV data structure (returning #GNUNET_YES on success).
4100  *
4101  * We do not add paths if we have a sufficient number of shorter
4102  * paths to this target already (returning #GNUNET_NO).
4103  *
4104  * We also do not add problematic paths, like those where we lack the first
4105  * hop in our neighbour list (i.e. due to a topology change) or where some
4106  * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
4107  *
4108  * @param path the path we learned, path[0] should be us,
4109  *             and then path contains a valid path from us to `path[path_len-1]`
4110  *             path[1] should be a direct neighbour (we should check!)
4111  * @param path_len number of entries on the @a path, at least three!
4112  * @param network_latency how long does the message take from us to `path[path_len-1]`?
4113  *          set to "forever" if unknown
4114  * @return #GNUNET_YES on success,
4115  *         #GNUNET_NO if we have better path(s) to the target
4116  *         #GNUNET_SYSERR if the path is useless and/or invalid
4117  *                         (i.e. path[1] not a direct neighbour
4118  *                        or path[i+1] is a direct neighbour for i>0)
4119  */
4120 static int
4121 learn_dv_path (const struct GNUNET_PeerIdentity *path,
4122                unsigned int path_len,
4123                struct GNUNET_TIME_Relative network_latency)
4124 {
4125   struct DistanceVectorHop *hop;
4126   struct DistanceVector *dv;
4127   struct Neighbour *next_hop;
4128   unsigned int shorter_distance;
4129
4130   if (path_len < 3)
4131   {
4132     /* what a boring path! not allowed! */
4133     GNUNET_break (0);
4134     return GNUNET_SYSERR;
4135   }
4136   GNUNET_assert (0 ==
4137                  GNUNET_memcmp (&GST_my_identity,
4138                                 &path[0]));
4139   next_hop = GNUNET_CONTAINER_multipeermap_get (neighbours,
4140                                                 &path[1]);
4141   if (NULL == next_hop)
4142   {
4143     /* next hop must be a neighbour, otherwise this whole thing is useless! */
4144     GNUNET_break (0);
4145     return GNUNET_SYSERR;
4146   }
4147   for (unsigned int i=2;i<path_len;i++)
4148     if (NULL !=
4149         GNUNET_CONTAINER_multipeermap_get (neighbours,
4150                                            &path[i]))
4151     {
4152       /* Useless path, we have a direct connection to some hop
4153          in the middle of the path, so this one doesn't even
4154          seem terribly useful for redundancy */
4155       return GNUNET_SYSERR;
4156     }
4157   dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
4158                                           &path[path_len - 1]);
4159   if (NULL == dv)
4160   {
4161     dv = GNUNET_new (struct DistanceVector);
4162     dv->target = path[path_len - 1];
4163     dv->timeout_task = GNUNET_SCHEDULER_add_delayed (DV_PATH_VALIDITY_TIMEOUT,
4164                                                      &path_cleanup_cb,
4165                                                      dv);
4166     GNUNET_assert (GNUNET_OK ==
4167                    GNUNET_CONTAINER_multipeermap_put (dv_routes,
4168                                                       &dv->target,
4169                                                       dv,
4170                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4171   }
4172   /* Check if we have this path already! */
4173   shorter_distance = 0;
4174   for (struct DistanceVectorHop *pos = dv->dv_head;
4175        NULL != pos;
4176        pos = pos->next_dv)
4177   {
4178     if (pos->distance < path_len - 2)
4179       shorter_distance++;
4180     /* Note that the distances in 'pos' excludes us (path[0]) and
4181        the next_hop (path[1]), so we need to subtract two
4182        and check next_hop explicitly */
4183     if ( (pos->distance == path_len - 2) &&
4184          (pos->next_hop == next_hop) )
4185     {
4186       int match = GNUNET_YES;
4187
4188       for (unsigned int i=0;i<pos->distance;i++)
4189       {
4190         if (0 !=
4191             GNUNET_memcmp (&pos->path[i],
4192                            &path[i+2]))
4193         {
4194           match = GNUNET_NO;
4195           break;
4196         }
4197       }
4198       if (GNUNET_YES == match)
4199       {
4200         struct GNUNET_TIME_Relative last_timeout;
4201
4202         /* Re-discovered known path, update timeout */
4203         GNUNET_STATISTICS_update (GST_stats,
4204                                   "# Known DV path refreshed",
4205                                   1,
4206                                   GNUNET_NO);
4207         last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
4208         pos->timeout
4209           = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
4210         GNUNET_CONTAINER_MDLL_remove (dv,
4211                                       dv->dv_head,
4212                                       dv->dv_tail,
4213                                       pos);
4214         GNUNET_CONTAINER_MDLL_insert (dv,
4215                                       dv->dv_head,
4216                                       dv->dv_tail,
4217                                       pos);
4218         if (last_timeout.rel_value_us <
4219             GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
4220                                            DV_PATH_DISCOVERY_FREQUENCY).rel_value_us)
4221         {
4222           /* Some peer send DV learn messages too often, we are learning
4223              the same path faster than it would be useful; do not forward! */
4224           return GNUNET_NO;
4225         }
4226         return GNUNET_YES;
4227       }
4228     }
4229   }
4230   /* Count how many shorter paths we have (incl. direct
4231      neighbours) before simply giving up on this one! */
4232   if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
4233   {
4234     /* We have a shorter path already! */
4235     return GNUNET_NO;
4236   }
4237   /* create new DV path entry */
4238   hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
4239                        sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
4240   hop->next_hop = next_hop;
4241   hop->dv = dv;
4242   hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
4243   memcpy (&hop[1],
4244           &path[2],
4245           sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
4246   hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
4247   hop->distance = path_len - 2;
4248   GNUNET_CONTAINER_MDLL_insert (dv,
4249                                 dv->dv_head,
4250                                 dv->dv_tail,
4251                                 hop);
4252   GNUNET_CONTAINER_MDLL_insert (neighbour,
4253                                 next_hop->dv_head,
4254                                 next_hop->dv_tail,
4255                                 hop);
4256   return GNUNET_YES;
4257 }
4258
4259
4260 /**
4261  * Communicator gave us a DV learn message.  Check the message.
4262  *
4263  * @param cls a `struct CommunicatorMessageContext`
4264  * @param dvl the send message that was sent
4265  * @return #GNUNET_YES if message is well-formed
4266  */
4267 static int
4268 check_dv_learn (void *cls,
4269                 const struct TransportDVLearn *dvl)
4270 {
4271   uint16_t size = ntohs (dvl->header.size);
4272   uint16_t num_hops = ntohs (dvl->num_hops);
4273   const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
4274
4275   (void) cls;
4276   if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
4277   {
4278     GNUNET_break_op (0);
4279     return GNUNET_SYSERR;
4280   }
4281   if (num_hops > MAX_DV_HOPS_ALLOWED)
4282   {
4283     GNUNET_break_op (0);
4284     return GNUNET_SYSERR;
4285   }
4286   for (unsigned int i=0;i<num_hops;i++)
4287   {
4288     if (0 == GNUNET_memcmp (&dvl->initiator,
4289                             &hops[i].hop))
4290     {
4291       GNUNET_break_op (0);
4292       return GNUNET_SYSERR;
4293     }
4294     if (0 == GNUNET_memcmp (&GST_my_identity,
4295                             &hops[i].hop))
4296     {
4297       GNUNET_break_op (0);
4298       return GNUNET_SYSERR;
4299     }
4300   }
4301   return GNUNET_YES;
4302 }
4303
4304
4305 /**
4306  * Build and forward a DV learn message to @a next_hop.
4307  *
4308  * @param next_hop peer to send the message to
4309  * @param msg message received
4310  * @param bi_history bitmask specifying hops on path that were bidirectional
4311  * @param nhops length of the @a hops array
4312  * @param hops path the message traversed so far
4313  * @param in_time when did we receive the message, used to calculate network delay
4314  */
4315 static void
4316 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
4317                   const struct TransportDVLearn *msg,
4318                   uint16_t bi_history,
4319                   uint16_t nhops,
4320                   const struct DVPathEntryP *hops,
4321                   struct GNUNET_TIME_Absolute in_time)
4322 {
4323   struct DVPathEntryP *dhops;
4324   struct TransportDVLearn *fwd;
4325   struct GNUNET_TIME_Relative nnd;
4326
4327   /* compute message for forwarding */
4328   GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
4329   fwd = GNUNET_malloc (sizeof (struct TransportDVLearn) +
4330                        (nhops + 1) * sizeof (struct DVPathEntryP));
4331   fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
4332   fwd->header.size = htons (sizeof (struct TransportDVLearn) +
4333                             (nhops + 1) * sizeof (struct DVPathEntryP));
4334   fwd->num_hops = htons (nhops + 1);
4335   fwd->bidirectional = htons (bi_history);
4336   nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
4337                                   GNUNET_TIME_relative_ntoh (msg->non_network_delay));
4338   fwd->non_network_delay = GNUNET_TIME_relative_hton (nnd);
4339   fwd->init_sig = msg->init_sig;
4340   fwd->initiator = msg->initiator;
4341   fwd->challenge = msg->challenge;
4342   dhops = (struct DVPathEntryP *) &fwd[1];
4343   GNUNET_memcpy (dhops,
4344                  hops,
4345                  sizeof (struct DVPathEntryP) * nhops);
4346   dhops[nhops].hop = GST_my_identity;
4347   {
4348     struct DvHopPS dhp = {
4349       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
4350       .purpose.size = htonl (sizeof (dhp)),
4351       .pred = dhops[nhops-1].hop,
4352       .succ = *next_hop,
4353       .challenge = msg->challenge
4354     };
4355
4356     GNUNET_assert (GNUNET_OK ==
4357                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
4358                                              &dhp.purpose,
4359                                              &dhops[nhops].hop_sig));
4360   }
4361   route_message (next_hop,
4362                  &fwd->header);
4363 }
4364
4365
4366 /**
4367  * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
4368  *
4369  * @param init the signer
4370  * @param challenge the challenge that was signed
4371  * @param init_sig signature presumably by @a init
4372  * @return #GNUNET_OK if the signature is valid
4373  */
4374 static int
4375 validate_dv_initiator_signature (const struct GNUNET_PeerIdentity *init,
4376                                  const struct GNUNET_ShortHashCode *challenge,
4377                                  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
4378 {
4379   struct DvInitPS ip = {
4380     .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
4381     .purpose.size = htonl (sizeof (ip)),
4382     .challenge = *challenge
4383   };
4384
4385   if (GNUNET_OK !=
4386       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
4387                                   &ip.purpose,
4388                                   init_sig,
4389                                   &init->public_key))
4390   {
4391     GNUNET_break_op (0);
4392     return GNUNET_SYSERR;
4393   }
4394   return GNUNET_OK;
4395 }
4396
4397
4398 /**
4399  * Communicator gave us a DV learn message.  Process the request.
4400  *
4401  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4402  * @param dvl the message that was received
4403  */
4404 static void
4405 handle_dv_learn (void *cls,
4406                  const struct TransportDVLearn *dvl)
4407 {
4408   struct CommunicatorMessageContext *cmc = cls;
4409   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
4410   int bi_hop;
4411   uint16_t nhops;
4412   uint16_t bi_history;
4413   const struct DVPathEntryP *hops;
4414   int do_fwd;
4415   int did_initiator;
4416   struct GNUNET_TIME_Absolute in_time;
4417
4418   nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
4419   bi_history = ntohs (dvl->bidirectional);
4420   hops = (const struct DVPathEntryP *) &dvl[1];
4421   if (0 == nhops)
4422   {
4423     /* sanity check */
4424     if (0 != GNUNET_memcmp (&dvl->initiator,
4425                             &cmc->im.sender))
4426     {
4427       GNUNET_break (0);
4428       finish_cmc_handling (cmc);
4429       return;
4430     }
4431   }
4432   else
4433   {
4434     /* sanity check */
4435     if (0 != GNUNET_memcmp (&hops[nhops - 1].hop,
4436                             &cmc->im.sender))
4437     {
4438       GNUNET_break (0);
4439       finish_cmc_handling (cmc);
4440       return;
4441     }
4442   }
4443
4444   GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
4445   cc = cmc->tc->details.communicator.cc;
4446   bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE == cc); // FIXME: add bi-directional flag to cc?
4447   in_time = GNUNET_TIME_absolute_get ();
4448
4449   /* continue communicator here, everything else can happen asynchronous! */
4450   finish_cmc_handling (cmc);
4451
4452   // FIXME: should we bother to verify _every_ DV initiator signature?
4453   if (GNUNET_OK !=
4454       validate_dv_initiator_signature (&dvl->initiator,
4455                                        &dvl->challenge,
4456                                        &dvl->init_sig))
4457   {
4458     GNUNET_break_op (0);
4459     return;
4460   }
4461   // FIXME: asynchronously (!) verify hop-by-hop signatures!
4462   // => if signature verification load too high, implement random drop strategy!
4463
4464   do_fwd = GNUNET_YES;
4465   if (0 == GNUNET_memcmp (&GST_my_identity,
4466                           &dvl->initiator))
4467   {
4468     struct GNUNET_PeerIdentity path[nhops + 1];
4469     struct GNUNET_TIME_Relative host_latency_sum;
4470     struct GNUNET_TIME_Relative latency;
4471     struct GNUNET_TIME_Relative network_latency;
4472
4473     /* We initiated this, learn the forward path! */
4474     path[0] = GST_my_identity;
4475     path[1] = hops[0].hop;
4476     host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
4477
4478     // Need also something to lookup initiation time
4479     // to compute RTT! -> add RTT argument here?
4480     latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
4481     // (based on dvl->challenge, we can identify time of origin!)
4482
4483     network_latency = GNUNET_TIME_relative_subtract (latency,
4484                                                      host_latency_sum);
4485     /* assumption: latency on all links is the same */
4486     network_latency = GNUNET_TIME_relative_divide (network_latency,
4487                                                    nhops);
4488
4489     for (unsigned int i=2;i<=nhops;i++)
4490     {
4491       struct GNUNET_TIME_Relative ilat;
4492
4493       /* assumption: linear latency increase per hop */
4494       ilat = GNUNET_TIME_relative_multiply (network_latency,
4495                                             i);
4496       path[i] = hops[i-1].hop;
4497       learn_dv_path (path,
4498                      i,
4499                      ilat);
4500     }
4501     /* as we initiated, do not forward again (would be circular!) */
4502     do_fwd = GNUNET_NO;
4503     return;
4504   }
4505   else if (bi_hop)
4506   {
4507     /* last hop was bi-directional, we could learn something here! */
4508     struct GNUNET_PeerIdentity path[nhops + 2];
4509
4510     path[0] = GST_my_identity;
4511     path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
4512     for (unsigned int i=0;i<nhops;i++)
4513     {
4514       int iret;
4515
4516       if (0 == (bi_history & (1 << i)))
4517         break; /* i-th hop not bi-directional, stop learning! */
4518       if (i == nhops)
4519       {
4520         path[i + 2] = dvl->initiator;
4521       }
4522       else
4523       {
4524         path[i + 2] = hops[nhops - i - 2].hop;
4525       }
4526
4527       iret = learn_dv_path (path,
4528                             i + 2,
4529                             GNUNET_TIME_UNIT_FOREVER_REL);
4530       if (GNUNET_SYSERR == iret)
4531       {
4532         /* path invalid or too long to be interesting for US, thus should also
4533            not be interesting to our neighbours, cut path when forwarding to
4534            'i' hops, except of course for the one that goes back to the
4535            initiator */
4536         GNUNET_STATISTICS_update (GST_stats,
4537                                   "# DV learn not forwarded due invalidity of path",
4538                                   1,
4539                                   GNUNET_NO);
4540         do_fwd = GNUNET_NO;
4541         break;
4542       }
4543       if ( (GNUNET_NO == iret) &&
4544            (nhops == i + 1) )
4545       {
4546         /* we have better paths, and this is the longest target,
4547            so there cannot be anything interesting later */
4548         GNUNET_STATISTICS_update (GST_stats,
4549                                   "# DV learn not forwarded, got better paths",
4550                                   1,
4551                                   GNUNET_NO);
4552         do_fwd = GNUNET_NO;
4553         break;
4554       }
4555     }
4556   }
4557
4558   if (MAX_DV_HOPS_ALLOWED == nhops)
4559   {
4560     /* At limit, we're out of here! */
4561     finish_cmc_handling (cmc);
4562     return;
4563   }
4564
4565   /* Forward to initiator, if path non-trivial and possible */
4566   bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
4567   did_initiator = GNUNET_NO;
4568   if ( (1 < nhops) &&
4569        (GNUNET_YES ==
4570         GNUNET_CONTAINER_multipeermap_contains (neighbours,
4571                                                 &dvl->initiator)) )
4572   {
4573     /* send back to origin! */
4574     forward_dv_learn (&dvl->initiator,
4575                       dvl,
4576                       bi_history,
4577                       nhops,
4578                       hops,
4579                       in_time);
4580     did_initiator = GNUNET_YES;
4581   }
4582   /* We forward under two conditions: either we still learned something
4583      ourselves (do_fwd), or the path was darn short and thus the initiator is
4584      likely to still be very interested in this (and we did NOT already
4585      send it back to the initiator) */
4586   if ( (do_fwd) ||
4587        ( (nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
4588          (GNUNET_NO == did_initiator) ) )
4589   {
4590     /* FIXME: loop over all neighbours, pick those with low
4591        queues AND that are not yet on the path; possibly
4592        adapt threshold to nhops! */
4593 #if FIXME
4594     forward_dv_learn (NULL, // fill in peer from iterator here!
4595                       dvl,
4596                       bi_history,
4597                       nhops,
4598                       hops,
4599                       in_time);
4600 #endif
4601   }
4602 }
4603
4604
4605 /**
4606  * Communicator gave us a DV box.  Check the message.
4607  *
4608  * @param cls a `struct CommunicatorMessageContext`
4609  * @param dvb the send message that was sent
4610  * @return #GNUNET_YES if message is well-formed
4611  */
4612 static int
4613 check_dv_box (void *cls,
4614               const struct TransportDVBox *dvb)
4615 {
4616   uint16_t size = ntohs (dvb->header.size);
4617   uint16_t num_hops = ntohs (dvb->num_hops);
4618   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
4619   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
4620   uint16_t isize;
4621   uint16_t itype;
4622
4623   (void) cls;
4624   if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
4625   {
4626     GNUNET_break_op (0);
4627     return GNUNET_SYSERR;
4628   }
4629   isize = ntohs (inbox->size);
4630   if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
4631   {
4632     GNUNET_break_op (0);
4633     return GNUNET_SYSERR;
4634   }
4635   itype = ntohs (inbox->type);
4636   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
4637        (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
4638   {
4639     GNUNET_break_op (0);
4640     return GNUNET_SYSERR;
4641   }
4642   return GNUNET_YES;
4643 }
4644
4645
4646 /**
4647  * Communicator gave us a DV box.  Process the request.
4648  *
4649  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4650  * @param dvb the message that was received
4651  */
4652 static void
4653 handle_dv_box (void *cls,
4654                const struct TransportDVBox *dvb)
4655 {
4656   struct CommunicatorMessageContext *cmc = cls;
4657   uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
4658   uint16_t num_hops = ntohs (dvb->num_hops);
4659   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
4660   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
4661
4662   if (num_hops > 0)
4663   {
4664     // FIXME: if we are not the target, shorten path and forward along.
4665     // Try from the _end_ of hops array if we know the given
4666     // neighbour (shortening the path!).
4667     // NOTE: increment total_hops!
4668     finish_cmc_handling (cmc);
4669     return;
4670   }
4671   /* We are the target. Unbox and handle message. */
4672   cmc->im.sender = dvb->origin;
4673   cmc->total_hops = ntohs (dvb->total_hops);
4674   demultiplex_with_cmc (cmc,
4675                         inbox);
4676 }
4677
4678
4679 /**
4680  * Client notified us about transmission from a peer.  Process the request.
4681  *
4682  * @param cls a `struct TransportClient` which sent us the message
4683  * @param obm the send message that was sent
4684  * @return #GNUNET_YES if message is well-formed
4685  */
4686 static int
4687 check_incoming_msg (void *cls,
4688                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
4689 {
4690   struct TransportClient *tc = cls;
4691
4692   if (CT_COMMUNICATOR != tc->type)
4693   {
4694     GNUNET_break (0);
4695     return GNUNET_SYSERR;
4696   }
4697   GNUNET_MQ_check_boxed_message (im);
4698   return GNUNET_OK;
4699 }
4700
4701
4702 /**
4703  * Communicator gave us a transport address validation challenge.  Process the request.
4704  *
4705  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4706  * @param tvc the message that was received
4707  */
4708 static void
4709 handle_validation_challenge (void *cls,
4710                              const struct TransportValidationChallenge *tvc)
4711 {
4712   struct CommunicatorMessageContext *cmc = cls;
4713   struct TransportValidationResponse *tvr;
4714
4715   if (cmc->total_hops > 0)
4716   {
4717     /* DV routing is not allowed for validation challenges! */
4718     GNUNET_break_op (0);
4719     finish_cmc_handling (cmc);
4720     return;
4721   }
4722   tvr = GNUNET_new (struct TransportValidationResponse);
4723   tvr->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
4724   tvr->header.size = htons (sizeof (*tvr));
4725   tvr->challenge = tvc->challenge;
4726   tvr->origin_time = tvc->sender_time;
4727   tvr->validity_duration = cmc->im.expected_address_validity;
4728   {
4729     /* create signature */
4730     struct TransportValidationPS tvp = {
4731       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
4732       .purpose.size = htonl (sizeof (tvp)),
4733       .validity_duration = tvr->validity_duration,
4734       .challenge = tvc->challenge
4735     };
4736
4737     GNUNET_assert (GNUNET_OK ==
4738                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
4739                                              &tvp.purpose,
4740                                              &tvr->signature));
4741   }
4742   route_message (&cmc->im.sender,
4743                  &tvr->header);
4744   finish_cmc_handling (cmc);
4745 }
4746
4747
4748 /**
4749  * Closure for #check_known_challenge.
4750  */
4751 struct CheckKnownChallengeContext
4752 {
4753   /**
4754    * Set to the challenge we are looking for.
4755    */
4756   const struct GNUNET_ShortHashCode *challenge;
4757
4758   /**
4759    * Set to a matching validation state, if one was found.
4760    */
4761   struct ValidationState *vs;
4762 };
4763
4764
4765 /**
4766  * Test if the validation state in @a value matches the
4767  * challenge from @a cls.
4768  *
4769  * @param cls a `struct CheckKnownChallengeContext`
4770  * @param pid unused (must match though)
4771  * @param value a `struct ValidationState`
4772  * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
4773  */
4774 static int
4775 check_known_challenge (void *cls,
4776                        const struct GNUNET_PeerIdentity *pid,
4777                        void *value)
4778 {
4779   struct CheckKnownChallengeContext *ckac = cls;
4780   struct ValidationState *vs = value;
4781
4782   (void) pid;
4783   if (0 != GNUNET_memcmp (&vs->challenge,
4784                           ckac->challenge))
4785     return GNUNET_OK;
4786   ckac->vs = vs;
4787   return GNUNET_NO;
4788 }
4789
4790
4791 /**
4792  * Function called when peerstore is done storing a
4793  * validated address.
4794  *
4795  * @param cls a `struct ValidationState`
4796  * @param success #GNUNET_YES on success
4797  */
4798 static void
4799 peerstore_store_validation_cb (void *cls,
4800                                int success)
4801 {
4802   struct ValidationState *vs = cls;
4803
4804   vs->sc = NULL;
4805   if (GNUNET_YES == success)
4806     return;
4807   GNUNET_STATISTICS_update (GST_stats,
4808                             "# Peerstore failed to store foreign address",
4809                             1,
4810                             GNUNET_NO);
4811 }
4812
4813
4814 /**
4815  * Task run periodically to validate some address based on #validation_heap.
4816  *
4817  * @param cls NULL
4818  */
4819 static void
4820 validation_start_cb (void *cls);
4821
4822
4823 /**
4824  * Set the time for next_challenge of @a vs to @a new_time.
4825  * Updates the heap and if necessary reschedules the job.
4826  *
4827  * @param vs validation state to update
4828  * @param new_time new time for revalidation
4829  */
4830 static void
4831 update_next_challenge_time (struct ValidationState *vs,
4832                             struct GNUNET_TIME_Absolute new_time)
4833 {
4834   struct GNUNET_TIME_Relative delta;
4835
4836   if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
4837     return; /* be lazy */
4838   vs->next_challenge = new_time;
4839   if (NULL == vs->hn)
4840     vs->hn = GNUNET_CONTAINER_heap_insert (validation_heap,
4841                                            vs,
4842                                            new_time.abs_value_us);
4843   else
4844     GNUNET_CONTAINER_heap_update_cost (vs->hn,
4845                                        new_time.abs_value_us);
4846   if ( (vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
4847        (NULL != validation_task) )
4848     return;
4849   if (NULL != validation_task)
4850     GNUNET_SCHEDULER_cancel (validation_task);
4851   /* randomize a bit */
4852   delta.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
4853                                                  MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
4854   new_time = GNUNET_TIME_absolute_add (new_time,
4855                                        delta);
4856   validation_task = GNUNET_SCHEDULER_add_at (new_time,
4857                                              &validation_start_cb,
4858                                              NULL);
4859 }
4860
4861
4862 /**
4863  * Communicator gave us a transport address validation response.  Process the request.
4864  *
4865  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4866  * @param tvr the message that was received
4867  */
4868 static void
4869 handle_validation_response (void *cls,
4870                             const struct TransportValidationResponse *tvr)
4871 {
4872   struct CommunicatorMessageContext *cmc = cls;
4873   struct ValidationState *vs;
4874   struct CheckKnownChallengeContext ckac = {
4875     .challenge = &tvr->challenge,
4876     .vs = NULL
4877   };
4878   struct GNUNET_TIME_Absolute origin_time;
4879
4880   /* check this is one of our challenges */
4881   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
4882                                                      &cmc->im.sender,
4883                                                      &check_known_challenge,
4884                                                      &ckac);
4885   if (NULL == (vs = ckac.vs))
4886   {
4887     /* This can happen simply if we 'forgot' the challenge by now,
4888        i.e. because we received the validation response twice */
4889     GNUNET_STATISTICS_update (GST_stats,
4890                               "# Validations dropped, challenge unknown",
4891                               1,
4892                               GNUNET_NO);
4893     finish_cmc_handling (cmc);
4894     return;
4895   }
4896
4897   /* sanity check on origin time */
4898   origin_time = GNUNET_TIME_absolute_ntoh (tvr->origin_time);
4899   if ( (origin_time.abs_value_us < vs->first_challenge_use.abs_value_us) ||
4900        (origin_time.abs_value_us > vs->last_challenge_use.abs_value_us) )
4901   {
4902     GNUNET_break_op (0);
4903     finish_cmc_handling (cmc);
4904     return;
4905   }
4906
4907   {
4908     /* check signature */
4909     struct TransportValidationPS tvp = {
4910       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
4911       .purpose.size = htonl (sizeof (tvp)),
4912       .validity_duration = tvr->validity_duration,
4913       .challenge = tvr->challenge
4914     };
4915
4916     if (GNUNET_OK !=
4917         GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE,
4918                                     &tvp.purpose,
4919                                     &tvr->signature,
4920                                     &cmc->im.sender.public_key))
4921     {
4922       GNUNET_break_op (0);
4923       finish_cmc_handling (cmc);
4924       return;
4925     }
4926   }
4927
4928   /* validity is capped by our willingness to keep track of the
4929      validation entry and the maximum the other peer allows */
4930   vs->valid_until
4931     = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_min (GNUNET_TIME_relative_ntoh (tvr->validity_duration),
4932                                                                   MAX_ADDRESS_VALID_UNTIL));
4933   vs->validated_until
4934     = GNUNET_TIME_absolute_min (vs->valid_until,
4935                                 GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME));
4936   vs->validation_rtt = GNUNET_TIME_absolute_get_duration (origin_time);
4937   vs->challenge_backoff = GNUNET_TIME_UNIT_ZERO;
4938   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
4939                               &vs->challenge,
4940                               sizeof (vs->challenge));
4941   vs->first_challenge_use = GNUNET_TIME_absolute_subtract (vs->validated_until,
4942                                                            GNUNET_TIME_relative_multiply (vs->validation_rtt,
4943                                                                                           VALIDATION_RTT_BUFFER_FACTOR));
4944   vs->last_challenge_use = GNUNET_TIME_UNIT_ZERO_ABS; /* challenge was not yet used */
4945   update_next_challenge_time (vs,
4946                               vs->first_challenge_use);
4947   vs->sc = GNUNET_PEERSTORE_store (peerstore,
4948                                    "transport",
4949                                    &cmc->im.sender,
4950                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
4951                                    vs->address,
4952                                    strlen (vs->address) + 1,
4953                                    vs->valid_until,
4954                                    GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
4955                                    &peerstore_store_validation_cb,
4956                                    vs);
4957   // FIXME: should we find the matching queue and update the RTT?
4958   finish_cmc_handling (cmc);
4959 }
4960
4961
4962 /**
4963  * Incoming meessage.  Process the request.
4964  *
4965  * @param im the send message that was received
4966  */
4967 static void
4968 handle_incoming_msg (void *cls,
4969                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
4970 {
4971   struct TransportClient *tc = cls;
4972   struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
4973
4974   cmc->tc = tc;
4975   cmc->im = *im;
4976   demultiplex_with_cmc (cmc,
4977                         (const struct GNUNET_MessageHeader *) &im[1]);
4978 }
4979
4980
4981 /**
4982  * Given an inbound message @a msg from a communicator @a cmc,
4983  * demultiplex it based on the type calling the right handler.
4984  *
4985  * @param cmc context for demultiplexing
4986  * @param msg message to demultiplex
4987  */
4988 static void
4989 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
4990                       const struct GNUNET_MessageHeader *msg)
4991 {
4992   struct GNUNET_MQ_MessageHandler handlers[] = {
4993     GNUNET_MQ_hd_var_size (fragment_box,
4994                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
4995                            struct TransportFragmentBox,
4996                            &cmc),
4997     GNUNET_MQ_hd_fixed_size (fragment_ack,
4998                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
4999                              struct TransportFragmentAckMessage,
5000                              &cmc),
5001     GNUNET_MQ_hd_var_size (reliability_box,
5002                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
5003                            struct TransportReliabilityBox,
5004                            &cmc),
5005     GNUNET_MQ_hd_fixed_size (reliability_ack,
5006                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
5007                              struct TransportReliabilityAckMessage,
5008                              &cmc),
5009     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
5010                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
5011                            struct TransportBackchannelEncapsulationMessage,
5012                            &cmc),
5013     GNUNET_MQ_hd_var_size (dv_learn,
5014                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
5015                            struct TransportDVLearn,
5016                            &cmc),
5017     GNUNET_MQ_hd_var_size (dv_box,
5018                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
5019                            struct TransportDVBox,
5020                            &cmc),
5021     GNUNET_MQ_hd_fixed_size (validation_challenge,
5022                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
5023                              struct TransportValidationChallenge,
5024                              &cmc),
5025     GNUNET_MQ_hd_fixed_size (validation_response,
5026                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
5027                              struct TransportValidationResponse,
5028                              &cmc),
5029     GNUNET_MQ_handler_end()
5030   };
5031   int ret;
5032
5033   ret = GNUNET_MQ_handle_message (handlers,
5034                                   msg);
5035   if (GNUNET_SYSERR == ret)
5036   {
5037     GNUNET_break (0);
5038     GNUNET_SERVICE_client_drop (cmc->tc->client);
5039     GNUNET_free (cmc);
5040     return;
5041   }
5042   if (GNUNET_NO == ret)
5043   {
5044     /* unencapsulated 'raw' message */
5045     handle_raw_message (&cmc,
5046                         msg);
5047   }
5048 }
5049
5050
5051 /**
5052  * New queue became available.  Check message.
5053  *
5054  * @param cls the client
5055  * @param aqm the send message that was sent
5056  */
5057 static int
5058 check_add_queue_message (void *cls,
5059                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
5060 {
5061   struct TransportClient *tc = cls;
5062
5063   if (CT_COMMUNICATOR != tc->type)
5064   {
5065     GNUNET_break (0);
5066     return GNUNET_SYSERR;
5067   }
5068   GNUNET_MQ_check_zero_termination (aqm);
5069   return GNUNET_OK;
5070 }
5071
5072
5073 /**
5074  * Bandwidth tracker informs us that the delay until we should receive
5075  * more has changed.
5076  *
5077  * @param cls a `struct Queue` for which the delay changed
5078  */
5079 static void
5080 tracker_update_in_cb (void *cls)
5081 {
5082   struct Queue *queue = cls;
5083   struct GNUNET_TIME_Relative in_delay;
5084   unsigned int rsize;
5085
5086   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
5087   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
5088                                                  rsize);
5089   // FIXME: how exactly do we do inbound flow control?
5090 }
5091
5092
5093 /**
5094  * If necessary, generates the UUID for a @a pm
5095  *
5096  * @param pm pending message to generate UUID for.
5097  */
5098 static void
5099 set_pending_message_uuid (struct PendingMessage *pm)
5100 {
5101   if (pm->msg_uuid_set)
5102     return;
5103   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
5104                               &pm->msg_uuid,
5105                               sizeof (pm->msg_uuid));
5106   pm->msg_uuid_set = GNUNET_YES;
5107 }
5108
5109
5110 /**
5111  * Fragment the given @a pm to the given @a mtu.  Adds
5112  * additional fragments to the neighbour as well. If the
5113  * @a mtu is too small, generates and error for the @a pm
5114  * and returns NULL.
5115  *
5116  * @param pm pending message to fragment for transmission
5117  * @param mtu MTU to apply
5118  * @return new message to transmit
5119  */
5120 static struct PendingMessage *
5121 fragment_message (struct PendingMessage *pm,
5122                   uint16_t mtu)
5123 {
5124   struct PendingMessage *ff;
5125
5126   set_pending_message_uuid (pm);
5127
5128   /* This invariant is established in #handle_add_queue_message() */
5129   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
5130
5131   /* select fragment for transmission, descending the tree if it has
5132      been expanded until we are at a leaf or at a fragment that is small enough */
5133   ff = pm;
5134   while ( ( (ff->bytes_msg > mtu) ||
5135             (pm == ff) ) &&
5136           (ff->frag_off == ff->bytes_msg) &&
5137           (NULL != ff->head_frag) )
5138   {
5139     ff = ff->head_frag; /* descent into fragmented fragments */
5140   }
5141
5142   if ( ( (ff->bytes_msg > mtu) ||
5143          (pm == ff) ) &&
5144        (pm->frag_off < pm->bytes_msg) )
5145   {
5146     /* Did not yet calculate all fragments, calculate next fragment */
5147     struct PendingMessage *frag;
5148     struct TransportFragmentBox tfb;
5149     const char *orig;
5150     char *msg;
5151     uint16_t fragmax;
5152     uint16_t fragsize;
5153     uint16_t msize;
5154     uint16_t xoff = 0;
5155
5156     orig = (const char *) &ff[1];
5157     msize = ff->bytes_msg;
5158     if (pm != ff)
5159     {
5160       const struct TransportFragmentBox *tfbo;
5161
5162       tfbo = (const struct TransportFragmentBox *) orig;
5163       orig += sizeof (struct TransportFragmentBox);
5164       msize -= sizeof (struct TransportFragmentBox);
5165       xoff = ntohs (tfbo->frag_off);
5166     }
5167     fragmax = mtu - sizeof (struct TransportFragmentBox);
5168     fragsize = GNUNET_MIN (msize - ff->frag_off,
5169                            fragmax);
5170     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
5171                           sizeof (struct TransportFragmentBox) +
5172                           fragsize);
5173     frag->target = pm->target;
5174     frag->frag_parent = ff;
5175     frag->timeout = pm->timeout;
5176     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
5177     frag->pmt = PMT_FRAGMENT_BOX;
5178     msg = (char *) &frag[1];
5179     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
5180     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
5181                              fragsize);
5182     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
5183     tfb.msg_uuid = pm->msg_uuid;
5184     tfb.frag_off = htons (ff->frag_off + xoff);
5185     tfb.msg_size = htons (pm->bytes_msg);
5186     memcpy (msg,
5187             &tfb,
5188             sizeof (tfb));
5189     memcpy (&msg[sizeof (tfb)],
5190             &orig[ff->frag_off],
5191             fragsize);
5192     GNUNET_CONTAINER_MDLL_insert (frag,
5193                                   ff->head_frag,
5194                                   ff->tail_frag,
5195                                   frag);
5196     ff->frag_off += fragsize;
5197     ff = frag;
5198   }
5199
5200   /* Move head to the tail and return it */
5201   GNUNET_CONTAINER_MDLL_remove (frag,
5202                                 ff->frag_parent->head_frag,
5203                                 ff->frag_parent->tail_frag,
5204                                 ff);
5205   GNUNET_CONTAINER_MDLL_insert_tail (frag,
5206                                      ff->frag_parent->head_frag,
5207                                      ff->frag_parent->tail_frag,
5208                                      ff);
5209   return ff;
5210 }
5211
5212
5213 /**
5214  * Reliability-box the given @a pm. On error (can there be any), NULL
5215  * may be returned, otherwise the "replacement" for @a pm (which
5216  * should then be added to the respective neighbour's queue instead of
5217  * @a pm).  If the @a pm is already fragmented or reliability boxed,
5218  * or itself an ACK, this function simply returns @a pm.
5219  *
5220  * @param pm pending message to box for transmission over unreliabile queue
5221  * @return new message to transmit
5222  */
5223 static struct PendingMessage *
5224 reliability_box_message (struct PendingMessage *pm)
5225 {
5226   struct TransportReliabilityBox rbox;
5227   struct PendingMessage *bpm;
5228   char *msg;
5229
5230   if (PMT_CORE != pm->pmt)
5231     return pm;  /* already fragmented or reliability boxed, or control message: do nothing */
5232   if (NULL != pm->bpm)
5233     return pm->bpm; /* already computed earlier: do nothing */
5234   GNUNET_assert (NULL == pm->head_frag);
5235   if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
5236   {
5237     /* failed hard */
5238     GNUNET_break (0);
5239     client_send_response (pm,
5240                           GNUNET_NO,
5241                           0);
5242     return NULL;
5243   }
5244   bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
5245                        sizeof (rbox) +
5246                        pm->bytes_msg);
5247   bpm->target = pm->target;
5248   bpm->frag_parent = pm;
5249   GNUNET_CONTAINER_MDLL_insert (frag,
5250                                 pm->head_frag,
5251                                 pm->tail_frag,
5252                                 bpm);
5253   bpm->timeout = pm->timeout;
5254   bpm->pmt = PMT_RELIABILITY_BOX;
5255   bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
5256   set_pending_message_uuid (bpm);
5257   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
5258   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
5259   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
5260   rbox.msg_uuid = pm->msg_uuid;
5261   msg = (char *) &bpm[1];
5262   memcpy (msg,
5263           &rbox,
5264           sizeof (rbox));
5265   memcpy (&msg[sizeof (rbox)],
5266           &pm[1],
5267           pm->bytes_msg);
5268   pm->bpm = bpm;
5269   return bpm;
5270 }
5271
5272
5273 /**
5274  * We believe we are ready to transmit a message on a queue. Double-checks
5275  * with the queue's "tracker_out" and then gives the message to the
5276  * communicator for transmission (updating the tracker, and re-scheduling
5277  * itself if applicable).
5278  *
5279  * @param cls the `struct Queue` to process transmissions for
5280  */
5281 static void
5282 transmit_on_queue (void *cls)
5283 {
5284   struct Queue *queue = cls;
5285   struct Neighbour *n = queue->neighbour;
5286   struct PendingMessage *pm;
5287   struct PendingMessage *s;
5288   uint32_t overhead;
5289   struct GNUNET_TRANSPORT_SendMessageTo *smt;
5290   struct GNUNET_MQ_Envelope *env;
5291
5292   queue->transmit_task = NULL;
5293   if (NULL == (pm = n->pending_msg_head))
5294   {
5295     /* no message pending, nothing to do here! */
5296     return;
5297   }
5298   schedule_transmit_on_queue (queue);
5299   if (NULL != queue->transmit_task)
5300     return; /* do it later */
5301   overhead = 0;
5302   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
5303     overhead += sizeof (struct TransportReliabilityBox);
5304   s = pm;
5305   if ( ( (0 != queue->mtu) &&
5306          (pm->bytes_msg + overhead > queue->mtu) ) ||
5307        (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
5308        (NULL != pm->head_frag /* fragments already exist, should
5309                                  respect that even if MTU is 0 for
5310                                  this queue */) )
5311     s = fragment_message (s,
5312                           (0 == queue->mtu)
5313                           ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
5314                           : queue->mtu);
5315   if (NULL == s)
5316   {
5317     /* Fragmentation failed, try next message... */
5318     schedule_transmit_on_queue (queue);
5319     return;
5320   }
5321   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
5322     s = reliability_box_message (s);
5323   if (NULL == s)
5324   {
5325     /* Reliability boxing failed, try next message... */
5326     schedule_transmit_on_queue (queue);
5327     return;
5328   }
5329
5330   /* Pass 's' for transission to the communicator */
5331   env = GNUNET_MQ_msg_extra (smt,
5332                              s->bytes_msg,
5333                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
5334   smt->qid = queue->qid;
5335   smt->mid = queue->mid_gen;
5336   smt->receiver = n->pid;
5337   memcpy (&smt[1],
5338           &s[1],
5339           s->bytes_msg);
5340   {
5341     /* Pass the env to the communicator of queue for transmission. */
5342     struct QueueEntry *qe;
5343
5344     qe = GNUNET_new (struct QueueEntry);
5345     qe->mid = queue->mid_gen++;
5346     qe->queue = queue;
5347     // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
5348     GNUNET_CONTAINER_DLL_insert (queue->queue_head,
5349                                  queue->queue_tail,
5350                                  qe);
5351     GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
5352     queue->queue_length++;
5353     queue->tc->details.communicator.total_queue_length++;
5354     GNUNET_MQ_send (queue->tc->mq,
5355                     env);
5356   }
5357
5358   // FIXME: do something similar to the logic below
5359   // in defragmentation / reliability ACK handling!
5360
5361   /* Check if this transmission somehow conclusively finished handing 'pm'
5362      even without any explicit ACKs */
5363   if ( (PMT_CORE == s->pmt) &&
5364        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
5365   {
5366     /* Full message sent, and over reliabile channel */
5367     client_send_response (pm,
5368                           GNUNET_YES,
5369                           pm->bytes_msg);
5370   }
5371   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
5372             (PMT_FRAGMENT_BOX == s->pmt) )
5373   {
5374     struct PendingMessage *pos;
5375
5376     /* Fragment sent over reliabile channel */
5377     free_fragment_tree (s);
5378     pos = s->frag_parent;
5379     GNUNET_CONTAINER_MDLL_remove (frag,
5380                                   pos->head_frag,
5381                                   pos->tail_frag,
5382                                   s);
5383     GNUNET_free (s);
5384     /* check if subtree is done */
5385     while ( (NULL == pos->head_frag) &&
5386             (pos->frag_off == pos->bytes_msg) &&
5387             (pos != pm) )
5388     {
5389       s = pos;
5390       pos = s->frag_parent;
5391       GNUNET_CONTAINER_MDLL_remove (frag,
5392                                     pos->head_frag,
5393                                     pos->tail_frag,
5394                                     s);
5395       GNUNET_free (s);
5396     }
5397
5398     /* Was this the last applicable fragmment? */
5399     if ( (NULL == pm->head_frag) &&
5400          (pm->frag_off == pm->bytes_msg) )
5401       client_send_response (pm,
5402                             GNUNET_YES,
5403                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
5404   }
5405   else if (PMT_CORE != pm->pmt)
5406   {
5407     /* This was an acknowledgement of some type, always free */
5408     free_pending_message (pm);
5409   }
5410   else
5411   {
5412     /* message not finished, waiting for acknowledgement */
5413     struct Neighbour *neighbour = pm->target;
5414     /* Update time by which we might retransmit 's' based on queue
5415        characteristics (i.e. RTT); it takes one RTT for the message to
5416        arrive and the ACK to come back in the best case; but the other
5417        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
5418        retransmitting.  Note that in the future this heuristic should
5419        likely be improved further (measure RTT stability, consider
5420        message urgency and size when delaying ACKs, etc.) */
5421     s->next_attempt = GNUNET_TIME_relative_to_absolute
5422       (GNUNET_TIME_relative_multiply (queue->rtt,
5423                                       4));
5424     if (s == pm)
5425     {
5426       struct PendingMessage *pos;
5427
5428       /* re-insert sort in neighbour list */
5429       GNUNET_CONTAINER_MDLL_remove (neighbour,
5430                                     neighbour->pending_msg_head,
5431                                     neighbour->pending_msg_tail,
5432                                     pm);
5433       pos = neighbour->pending_msg_tail;
5434       while ( (NULL != pos) &&
5435               (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
5436         pos = pos->prev_neighbour;
5437       GNUNET_CONTAINER_MDLL_insert_after (neighbour,
5438                                           neighbour->pending_msg_head,
5439                                           neighbour->pending_msg_tail,
5440                                           pos,
5441                                           pm);
5442     }
5443     else
5444     {
5445       /* re-insert sort in fragment list */
5446       struct PendingMessage *fp = s->frag_parent;
5447       struct PendingMessage *pos;
5448
5449       GNUNET_CONTAINER_MDLL_remove (frag,
5450                                     fp->head_frag,
5451                                     fp->tail_frag,
5452                                     s);
5453       pos = fp->tail_frag;
5454       while ( (NULL != pos) &&
5455               (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
5456         pos = pos->prev_frag;
5457       GNUNET_CONTAINER_MDLL_insert_after (frag,
5458                                           fp->head_frag,
5459                                           fp->tail_frag,
5460                                           pos,
5461                                           s);
5462     }
5463   }
5464
5465   /* finally, re-schedule queue transmission task itself */
5466   schedule_transmit_on_queue (queue);
5467 }
5468
5469
5470 /**
5471  * Bandwidth tracker informs us that the delay until we
5472  * can transmit again changed.
5473  *
5474  * @param cls a `struct Queue` for which the delay changed
5475  */
5476 static void
5477 tracker_update_out_cb (void *cls)
5478 {
5479   struct Queue *queue = cls;
5480   struct Neighbour *n = queue->neighbour;
5481
5482   if (NULL == n->pending_msg_head)
5483   {
5484     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5485                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
5486                 queue->address);
5487     return; /* no message pending, nothing to do here! */
5488   }
5489   GNUNET_SCHEDULER_cancel (queue->transmit_task);
5490   queue->transmit_task = NULL;
5491   schedule_transmit_on_queue (queue);
5492 }
5493
5494
5495 /**
5496  * Bandwidth tracker informs us that excessive outbound bandwidth was
5497  * allocated which is not being used.
5498  *
5499  * @param cls a `struct Queue` for which the excess was noted
5500  */
5501 static void
5502 tracker_excess_out_cb (void *cls)
5503 {
5504   (void) cls;
5505
5506   /* FIXME: trigger excess bandwidth report to core? Right now,
5507      this is done internally within transport_api2_core already,
5508      but we probably want to change the logic and trigger it
5509      from here via a message instead! */
5510   /* TODO: maybe inform someone at this point? */
5511   GNUNET_STATISTICS_update (GST_stats,
5512                             "# Excess outbound bandwidth reported",
5513                             1,
5514                             GNUNET_NO);
5515 }
5516
5517
5518
5519 /**
5520  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
5521  * which is not being used.
5522  *
5523  * @param cls a `struct Queue` for which the excess was noted
5524  */
5525 static void
5526 tracker_excess_in_cb (void *cls)
5527 {
5528   (void) cls;
5529
5530   /* TODO: maybe inform somone at this point? */
5531   GNUNET_STATISTICS_update (GST_stats,
5532                             "# Excess inbound bandwidth reported",
5533                             1,
5534                             GNUNET_NO);
5535 }
5536
5537
5538 /**
5539  * Queue to a peer went down.  Process the request.
5540  *
5541  * @param cls the client
5542  * @param dqm the send message that was sent
5543  */
5544 static void
5545 handle_del_queue_message (void *cls,
5546                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
5547 {
5548   struct TransportClient *tc = cls;
5549
5550   if (CT_COMMUNICATOR != tc->type)
5551   {
5552     GNUNET_break (0);
5553     GNUNET_SERVICE_client_drop (tc->client);
5554     return;
5555   }
5556   for (struct Queue *queue = tc->details.communicator.queue_head;
5557        NULL != queue;
5558        queue = queue->next_client)
5559   {
5560     struct Neighbour *neighbour = queue->neighbour;
5561
5562     if ( (dqm->qid != queue->qid) ||
5563          (0 != GNUNET_memcmp (&dqm->receiver,
5564                               &neighbour->pid)) )
5565       continue;
5566     free_queue (queue);
5567     GNUNET_SERVICE_client_continue (tc->client);
5568     return;
5569   }
5570   GNUNET_break (0);
5571   GNUNET_SERVICE_client_drop (tc->client);
5572 }
5573
5574
5575 /**
5576  * Message was transmitted.  Process the request.
5577  *
5578  * @param cls the client
5579  * @param sma the send message that was sent
5580  */
5581 static void
5582 handle_send_message_ack (void *cls,
5583                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
5584 {
5585   struct TransportClient *tc = cls;
5586   struct QueueEntry *qe;
5587
5588   if (CT_COMMUNICATOR != tc->type)
5589   {
5590     GNUNET_break (0);
5591     GNUNET_SERVICE_client_drop (tc->client);
5592     return;
5593   }
5594
5595   /* find our queue entry matching the ACK */
5596   qe = NULL;
5597   for (struct Queue *queue = tc->details.communicator.queue_head;
5598        NULL != queue;
5599        queue = queue->next_client)
5600   {
5601     if (0 != GNUNET_memcmp (&queue->neighbour->pid,
5602                             &sma->receiver))
5603       continue;
5604     for (struct QueueEntry *qep = queue->queue_head;
5605          NULL != qep;
5606          qep = qep->next)
5607     {
5608       if (qep->mid != sma->mid)
5609         continue;
5610       qe = qep;
5611       break;
5612     }
5613     break;
5614   }
5615   if (NULL == qe)
5616   {
5617     /* this should never happen */
5618     GNUNET_break (0);
5619     GNUNET_SERVICE_client_drop (tc->client);
5620     return;
5621   }
5622   GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
5623                                qe->queue->queue_tail,
5624                                qe);
5625   qe->queue->queue_length--;
5626   tc->details.communicator.total_queue_length--;
5627   GNUNET_SERVICE_client_continue (tc->client);
5628
5629   /* if applicable, resume transmissions that waited on ACK */
5630   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
5631   {
5632     /* Communicator dropped below threshold, resume all queues */
5633     GNUNET_STATISTICS_update (GST_stats,
5634                               "# Transmission throttled due to communicator queue limit",
5635                               -1,
5636                               GNUNET_NO);
5637     for (struct Queue *queue = tc->details.communicator.queue_head;
5638          NULL != queue;
5639          queue = queue->next_client)
5640       schedule_transmit_on_queue (queue);
5641   }
5642   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
5643   {
5644     /* queue dropped below threshold; only resume this one queue */
5645     GNUNET_STATISTICS_update (GST_stats,
5646                               "# Transmission throttled due to queue queue limit",
5647                               -1,
5648                               GNUNET_NO);
5649     schedule_transmit_on_queue (qe->queue);
5650   }
5651
5652   /* TODO: we also should react on the status! */
5653   // FIXME: this probably requires queue->pm = s assignment!
5654   // FIXME: react to communicator status about transmission request. We got:
5655   sma->status; // OK success, SYSERR failure
5656
5657   GNUNET_free (qe);
5658 }
5659
5660
5661 /**
5662  * Iterator telling new MONITOR client about all existing
5663  * queues to peers.
5664  *
5665  * @param cls the new `struct TransportClient`
5666  * @param pid a connected peer
5667  * @param value the `struct Neighbour` with more information
5668  * @return #GNUNET_OK (continue to iterate)
5669  */
5670 static int
5671 notify_client_queues (void *cls,
5672                       const struct GNUNET_PeerIdentity *pid,
5673                       void *value)
5674 {
5675   struct TransportClient *tc = cls;
5676   struct Neighbour *neighbour = value;
5677
5678   GNUNET_assert (CT_MONITOR == tc->type);
5679   for (struct Queue *q = neighbour->queue_head;
5680        NULL != q;
5681        q = q->next_neighbour)
5682   {
5683     struct MonitorEvent me = {
5684       .rtt = q->rtt,
5685       .cs = q->cs,
5686       .num_msg_pending = q->num_msg_pending,
5687       .num_bytes_pending = q->num_bytes_pending
5688     };
5689
5690     notify_monitor (tc,
5691                     pid,
5692                     q->address,
5693                     q->nt,
5694                     &me);
5695   }
5696   return GNUNET_OK;
5697 }
5698
5699
5700 /**
5701  * Initialize a monitor client.
5702  *
5703  * @param cls the client
5704  * @param start the start message that was sent
5705  */
5706 static void
5707 handle_monitor_start (void *cls,
5708                       const struct GNUNET_TRANSPORT_MonitorStart *start)
5709 {
5710   struct TransportClient *tc = cls;
5711
5712   if (CT_NONE != tc->type)
5713   {
5714     GNUNET_break (0);
5715     GNUNET_SERVICE_client_drop (tc->client);
5716     return;
5717   }
5718   tc->type = CT_MONITOR;
5719   tc->details.monitor.peer = start->peer;
5720   tc->details.monitor.one_shot = ntohl (start->one_shot);
5721   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
5722                                          &notify_client_queues,
5723                                          tc);
5724   GNUNET_SERVICE_client_mark_monitor (tc->client);
5725   GNUNET_SERVICE_client_continue (tc->client);
5726 }
5727
5728
5729 /**
5730  * Find transport client providing communication service
5731  * for the protocol @a prefix.
5732  *
5733  * @param prefix communicator name
5734  * @return NULL if no such transport client is available
5735  */
5736 static struct TransportClient *
5737 lookup_communicator (const char *prefix)
5738 {
5739   for (struct TransportClient *tc = clients_head;
5740        NULL != tc;
5741        tc = tc->next)
5742   {
5743     if (CT_COMMUNICATOR != tc->type)
5744       continue;
5745     if (0 == strcmp (prefix,
5746                      tc->details.communicator.address_prefix))
5747       return tc;
5748   }
5749   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
5750               "Somone suggested use of communicator for `%s', but we do not have such a communicator!\n",
5751               prefix);
5752   return NULL;
5753 }
5754
5755
5756 /**
5757  * Signature of a function called with a communicator @a address of a peer
5758  * @a pid that an application wants us to connect to.
5759  *
5760  * @param pid target peer
5761  * @param address the address to try
5762  */
5763 static void
5764 suggest_to_connect (const struct GNUNET_PeerIdentity *pid,
5765                     const char *address)
5766 {
5767   static uint32_t idgen;
5768   struct TransportClient *tc;
5769   char *prefix;
5770   struct GNUNET_TRANSPORT_CreateQueue *cqm;
5771   struct GNUNET_MQ_Envelope *env;
5772   size_t alen;
5773
5774   prefix = GNUNET_HELLO_address_to_prefix (address);
5775   if (NULL == prefix)
5776   {
5777     GNUNET_break (0); /* We got an invalid address!? */
5778     return;
5779   }
5780   tc = lookup_communicator (prefix);
5781   if (NULL == tc)
5782   {
5783     GNUNET_STATISTICS_update (GST_stats,
5784                               "# Suggestions ignored due to missing communicator",
5785                               1,
5786                               GNUNET_NO);
5787     return;
5788   }
5789   /* forward suggestion for queue creation to communicator */
5790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5791               "Request #%u for `%s' communicator to create queue to `%s'\n",
5792               (unsigned int) idgen,
5793               prefix,
5794               address);
5795   alen = strlen (address) + 1;
5796   env = GNUNET_MQ_msg_extra (cqm,
5797                              alen,
5798                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
5799   cqm->request_id = htonl (idgen++);
5800   cqm->receiver = *pid;
5801   memcpy (&cqm[1],
5802           address,
5803           alen);
5804   GNUNET_MQ_send (tc->mq,
5805                   env);
5806 }
5807
5808
5809 /**
5810  * The queue @a q (which matches the peer and address in @a vs) is
5811  * ready for queueing. We should now queue the validation request.
5812  *
5813  * @param q queue to send on
5814  * @param vs state to derive validation challenge from
5815  */
5816 static void
5817 validation_transmit_on_queue (struct Queue *q,
5818                               struct ValidationState *vs)
5819 {
5820   struct GNUNET_MQ_Envelope *env;
5821   struct TransportValidationChallenge *tvc;
5822
5823   vs->last_challenge_use = GNUNET_TIME_absolute_get ();
5824   env = GNUNET_MQ_msg (tvc,
5825                        GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
5826   tvc->reserved = htonl (0);
5827   tvc->challenge = vs->challenge;
5828   tvc->sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
5829   // FIXME: not so easy, need to BOX this message
5830   // in a transmission request! (mistake also done elsewhere!)
5831   GNUNET_MQ_send (q->tc->mq,
5832                   env);
5833 }
5834
5835
5836 /**
5837  * Task run periodically to validate some address based on #validation_heap.
5838  *
5839  * @param cls NULL
5840  */
5841 static void
5842 validation_start_cb (void *cls)
5843 {
5844   struct ValidationState *vs;
5845   struct Neighbour *n;
5846   struct Queue *q;
5847
5848   (void) cls;
5849   validation_task = NULL;
5850   vs = GNUNET_CONTAINER_heap_peek (validation_heap);
5851   /* drop validations past their expiration */
5852   while ( (NULL != vs) &&
5853           (0 == GNUNET_TIME_absolute_get_remaining (vs->valid_until).rel_value_us) )
5854   {
5855     free_validation_state (vs);
5856     vs = GNUNET_CONTAINER_heap_peek (validation_heap);
5857   }
5858   if (NULL == vs)
5859     return; /* woopsie, no more addresses known, should only
5860                happen if we're really a lonely peer */
5861   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
5862                                          &vs->pid);
5863   q = NULL;
5864   if (NULL != n)
5865   {
5866     for (struct Queue *pos = n->queue_head;
5867          NULL != pos;
5868          pos = pos->next_neighbour)
5869     {
5870       if (0 == strcmp (pos->address,
5871                        vs->address))
5872       {
5873         q = pos;
5874         break;
5875       }
5876     }
5877   }
5878   if (NULL == q)
5879   {
5880     vs->awaiting_queue = GNUNET_YES;
5881     suggest_to_connect (&vs->pid,
5882                         vs->address);
5883   }
5884   else
5885     validation_transmit_on_queue (q,
5886                                   vs);
5887   /* Finally, reschedule next attempt */
5888   vs->challenge_backoff = GNUNET_TIME_randomized_backoff (vs->challenge_backoff,
5889                                                           MAX_VALIDATION_CHALLENGE_FREQ);
5890   update_next_challenge_time (vs,
5891                               GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
5892 }
5893
5894
5895 /**
5896  * Closure for #check_connection_quality.
5897  */
5898 struct QueueQualityContext
5899 {
5900   /**
5901    * Set to the @e k'th queue encountered.
5902    */
5903   struct Queue *q;
5904
5905   /**
5906    * Set to the number of quality queues encountered.
5907    */
5908   unsigned int quality_count;
5909
5910   /**
5911    * Set to the total number of queues encountered.
5912    */
5913   unsigned int num_queues;
5914
5915   /**
5916    * Decremented for each queue, for selection of the
5917    * k-th queue in @e q.
5918    */
5919   unsigned int k;
5920
5921 };
5922
5923
5924 /**
5925  * Check whether any queue to the given neighbour is
5926  * of a good "quality" and if so, increment the counter.
5927  * Also counts the total number of queues, and returns
5928  * the k-th queue found.
5929  *
5930  * @param cls a `struct QueueQualityContext *` with counters
5931  * @param pid peer this is about
5932  * @param value a `struct Neighbour`
5933  * @return #GNUNET_OK (continue to iterate)
5934  */
5935 static int
5936 check_connection_quality (void *cls,
5937                           const struct GNUNET_PeerIdentity *pid,
5938                           void *value)
5939 {
5940   struct QueueQualityContext *ctx = cls;
5941   struct Neighbour *n = value;
5942   int do_inc;
5943
5944   (void) pid;
5945   do_inc = GNUNET_NO;
5946   for (struct Queue *q = n->queue_head;
5947        NULL != q;
5948        q = q->next_neighbour)
5949   {
5950     if (0 != q->distance)
5951       continue; /* DV does not count */
5952     ctx->num_queues++;
5953     if (0 == ctx->k--)
5954       ctx->q = q;
5955     /* OPTIMIZE-FIXME: in the future, add reliability / goodput
5956        statistics and consider those as well here? */
5957     if (q->rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
5958       do_inc = GNUNET_YES;
5959   }
5960   if (GNUNET_YES == do_inc)
5961     ctx->quality_count++;
5962   return GNUNET_OK;
5963 }
5964
5965
5966 /**
5967  * Task run when we CONSIDER initiating a DV learn
5968  * process. We first check that sending out a message is
5969  * even possible (queues exist), then that it is desirable
5970  * (if not, reschedule the task for later), and finally
5971  * we may then begin the job.  If there are too many
5972  * entries in the #dvlearn_map, we purge the oldest entry
5973  * using #lle_tail.
5974  *
5975  * @param cls NULL
5976  */
5977 static void
5978 start_dv_learn (void *cls)
5979 {
5980   struct LearnLaunchEntry *lle;
5981   struct QueueQualityContext qqc;
5982   struct GNUNET_MQ_Envelope *env;
5983   struct TransportDVLearn *dvl;
5984
5985   (void) cls;
5986   dvlearn_task = NULL;
5987   if (0 ==
5988       GNUNET_CONTAINER_multipeermap_size (neighbours))
5989     return; /* lost all connectivity, cannot do learning */
5990   qqc.quality_count = 0;
5991   qqc.num_queues = 0;
5992   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
5993                                          &check_connection_quality,
5994                                          &qqc);
5995   if (qqc.quality_count > DV_LEARN_QUALITY_THRESHOLD)
5996   {
5997     struct GNUNET_TIME_Relative delay;
5998     unsigned int factor;
5999
6000     /* scale our retries by how far we are above the threshold */
6001     factor = qqc.quality_count / DV_LEARN_QUALITY_THRESHOLD;
6002     delay = GNUNET_TIME_relative_multiply (DV_LEARN_BASE_FREQUENCY,
6003                                            factor);
6004     dvlearn_task = GNUNET_SCHEDULER_add_delayed (delay,
6005                                                  &start_dv_learn,
6006                                                  NULL);
6007     return;
6008   }
6009   /* remove old entries in #dvlearn_map if it has grown too big */
6010   while (MAX_DV_LEARN_PENDING >=
6011          GNUNET_CONTAINER_multishortmap_size (dvlearn_map))
6012   {
6013     lle = lle_tail;
6014     GNUNET_assert (GNUNET_YES ==
6015                    GNUNET_CONTAINER_multishortmap_remove (dvlearn_map,
6016                                                           &lle->challenge,
6017                                                           lle));
6018     GNUNET_CONTAINER_DLL_remove (lle_head,
6019                                  lle_tail,
6020                                  lle);
6021     GNUNET_free (lle);
6022   }
6023   /* setup data structure for learning */
6024   lle = GNUNET_new (struct LearnLaunchEntry);
6025   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
6026                               &lle->challenge,
6027                               sizeof (lle->challenge));
6028   GNUNET_CONTAINER_DLL_insert (lle_head,
6029                                lle_tail,
6030                                lle);
6031   GNUNET_break (GNUNET_YES ==
6032                 GNUNET_CONTAINER_multishortmap_put (dvlearn_map,
6033                                                     &lle->challenge,
6034                                                     lle,
6035                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6036   env = GNUNET_MQ_msg (dvl,
6037                        GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
6038   dvl->num_hops = htons (0);
6039   dvl->bidirectional = htons (0);
6040   dvl->non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
6041   {
6042     struct DvInitPS dvip = {
6043       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
6044       .purpose.size = htonl (sizeof (dvip)),
6045       .challenge = lle->challenge
6046     };
6047
6048     GNUNET_assert (GNUNET_OK ==
6049                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
6050                                              &dvip.purpose,
6051                                              &dvl->init_sig));
6052   }
6053   dvl->initiator = GST_my_identity;
6054   dvl->challenge = lle->challenge;
6055
6056   qqc.quality_count = 0;
6057   qqc.k = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
6058                                     qqc.num_queues);
6059   qqc.num_queues = 0;
6060   qqc.q = NULL;
6061   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
6062                                          &check_connection_quality,
6063                                          &qqc);
6064   GNUNET_assert (NULL != qqc.q);
6065
6066   /* Do this as close to transmission time as possible! */
6067   lle->launch_time = GNUNET_TIME_absolute_get ();
6068   // FIXME: not so easy, need to BOX this message
6069   // in a transmission request! (mistake also done elsewhere!)
6070   GNUNET_MQ_send (qqc.q->tc->mq,
6071                   env);
6072
6073   /* reschedule this job, randomizing the time it runs (but no
6074      actual backoff!) */
6075   dvlearn_task
6076     = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY),
6077                                     &start_dv_learn,
6078                                     NULL);
6079 }
6080
6081
6082 /**
6083  * A new queue has been created, check if any address validation
6084  * requests have been waiting for it.
6085  *
6086  * @param cls a `struct Queue`
6087  * @param pid peer concerned (unused)
6088  * @param value a `struct ValidationState`
6089  * @return #GNUNET_NO if a match was found and we can stop looking
6090  */
6091 static int
6092 check_validation_request_pending (void *cls,
6093                                   const struct GNUNET_PeerIdentity *pid,
6094                                   void *value)
6095 {
6096   struct Queue *q = cls;
6097   struct ValidationState *vs = value;
6098
6099   (void) pid;
6100   if ( (GNUNET_YES == vs->awaiting_queue) &&
6101        (0 == strcmp (vs->address,
6102                      q->address)) )
6103   {
6104     vs->awaiting_queue = GNUNET_NO;
6105     validation_transmit_on_queue (q,
6106                                   vs);
6107     return GNUNET_NO;
6108   }
6109   return GNUNET_OK;
6110 }
6111
6112
6113 /**
6114  * New queue became available.  Process the request.
6115  *
6116  * @param cls the client
6117  * @param aqm the send message that was sent
6118  */
6119 static void
6120 handle_add_queue_message (void *cls,
6121                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
6122 {
6123   struct TransportClient *tc = cls;
6124   struct Queue *queue;
6125   struct Neighbour *neighbour;
6126   const char *addr;
6127   uint16_t addr_len;
6128
6129   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
6130   {
6131     /* MTU so small as to be useless for transmissions,
6132        required for #fragment_message()! */
6133     GNUNET_break_op (0);
6134     GNUNET_SERVICE_client_drop (tc->client);
6135     return;
6136   }
6137   neighbour = lookup_neighbour (&aqm->receiver);
6138   if (NULL == neighbour)
6139   {
6140     neighbour = GNUNET_new (struct Neighbour);
6141     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
6142     neighbour->pid = aqm->receiver;
6143     GNUNET_assert (GNUNET_OK ==
6144                    GNUNET_CONTAINER_multipeermap_put (neighbours,
6145                                                       &neighbour->pid,
6146                                                       neighbour,
6147                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6148     cores_send_connect_info (&neighbour->pid,
6149                              GNUNET_BANDWIDTH_ZERO);
6150   }
6151   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
6152   addr = (const char *) &aqm[1];
6153
6154   queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
6155   queue->tc = tc;
6156   queue->address = (const char *) &queue[1];
6157   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
6158   queue->qid = aqm->qid;
6159   queue->mtu = ntohl (aqm->mtu);
6160   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
6161   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
6162   queue->neighbour = neighbour;
6163   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
6164                                   &tracker_update_in_cb,
6165                                   queue,
6166                                   GNUNET_BANDWIDTH_ZERO,
6167                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
6168                                   &tracker_excess_in_cb,
6169                                   queue);
6170   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
6171                                   &tracker_update_out_cb,
6172                                   queue,
6173                                   GNUNET_BANDWIDTH_ZERO,
6174                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
6175                                   &tracker_excess_out_cb,
6176                                   queue);
6177   memcpy (&queue[1],
6178           addr,
6179           addr_len);
6180   /* notify monitors about new queue */
6181   {
6182     struct MonitorEvent me = {
6183       .rtt = queue->rtt,
6184       .cs = queue->cs
6185     };
6186
6187     notify_monitors (&neighbour->pid,
6188                      queue->address,
6189                      queue->nt,
6190                      &me);
6191   }
6192   GNUNET_CONTAINER_MDLL_insert (neighbour,
6193                                 neighbour->queue_head,
6194                                 neighbour->queue_tail,
6195                                 queue);
6196   GNUNET_CONTAINER_MDLL_insert (client,
6197                                 tc->details.communicator.queue_head,
6198                                 tc->details.communicator.queue_tail,
6199                                 queue);
6200   /* check if valdiations are waiting for the queue */
6201   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
6202                                                      &aqm->receiver,
6203                                                      &check_validation_request_pending,
6204                                                      queue);
6205   /* might be our first queue, try launching DV learning */
6206   if (NULL == dvlearn_task)
6207     dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn,
6208                                              NULL);
6209   GNUNET_SERVICE_client_continue (tc->client);
6210 }
6211
6212
6213 /**
6214  * Communicator tells us that our request to create a queue "worked", that
6215  * is setting up the queue is now in process.
6216  *
6217  * @param cls the `struct TransportClient`
6218  * @param cqr confirmation message
6219  */
6220 static void
6221 handle_queue_create_ok (void *cls,
6222                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
6223 {
6224   struct TransportClient *tc = cls;
6225
6226   if (CT_COMMUNICATOR != tc->type)
6227   {
6228     GNUNET_break (0);
6229     GNUNET_SERVICE_client_drop (tc->client);
6230     return;
6231   }
6232   GNUNET_STATISTICS_update (GST_stats,
6233                             "# Suggestions succeeded at communicator",
6234                             1,
6235                             GNUNET_NO);
6236   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6237               "Request #%u for communicator to create queue succeeded\n",
6238               (unsigned int) ntohs (cqr->request_id));
6239   GNUNET_SERVICE_client_continue (tc->client);
6240 }
6241
6242
6243 /**
6244  * Communicator tells us that our request to create a queue failed. This usually
6245  * indicates that the provided address is simply invalid or that the communicator's
6246  * resources are exhausted.
6247  *
6248  * @param cls the `struct TransportClient`
6249  * @param cqr failure message
6250  */
6251 static void
6252 handle_queue_create_fail (void *cls,
6253                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
6254 {
6255   struct TransportClient *tc = cls;
6256
6257   if (CT_COMMUNICATOR != tc->type)
6258   {
6259     GNUNET_break (0);
6260     GNUNET_SERVICE_client_drop (tc->client);
6261     return;
6262   }
6263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6264               "Request #%u for communicator to create queue failed\n",
6265               (unsigned int) ntohs (cqr->request_id));
6266   GNUNET_STATISTICS_update (GST_stats,
6267                             "# Suggestions failed in queue creation at communicator",
6268                             1,
6269                             GNUNET_NO);
6270   GNUNET_SERVICE_client_continue (tc->client);
6271 }
6272
6273
6274 /**
6275  * We have received a `struct ExpressPreferenceMessage` from an application client.
6276  *
6277  * @param cls handle to the client
6278  * @param msg the start message
6279  */
6280 static void
6281 handle_suggest_cancel (void *cls,
6282                        const struct ExpressPreferenceMessage *msg)
6283 {
6284   struct TransportClient *tc = cls;
6285   struct PeerRequest *pr;
6286
6287   if (CT_APPLICATION != tc->type)
6288   {
6289     GNUNET_break (0);
6290     GNUNET_SERVICE_client_drop (tc->client);
6291     return;
6292   }
6293   pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests,
6294                                           &msg->peer);
6295   if (NULL == pr)
6296   {
6297     GNUNET_break (0);
6298     GNUNET_SERVICE_client_drop (tc->client);
6299     return;
6300   }
6301   (void) stop_peer_request (tc,
6302                             &pr->pid,
6303                             pr);
6304   GNUNET_SERVICE_client_continue (tc->client);
6305 }
6306
6307
6308 /**
6309  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
6310  * messages. We do nothing here, real verification is done later.
6311  *
6312  * @param cls a `struct TransportClient *`
6313  * @param msg message to verify
6314  * @return #GNUNET_OK
6315  */
6316 static int
6317 check_address_consider_verify (void *cls,
6318                                const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
6319 {
6320   (void) cls;
6321   (void) hdr;
6322   return GNUNET_OK;
6323 }
6324
6325
6326 /**
6327  * Closure for #check_known_address.
6328  */
6329 struct CheckKnownAddressContext
6330 {
6331   /**
6332    * Set to the address we are looking for.
6333    */
6334   const char *address;
6335
6336   /**
6337    * Set to a matching validation state, if one was found.
6338    */
6339   struct ValidationState *vs;
6340 };
6341
6342
6343 /**
6344  * Test if the validation state in @a value matches the
6345  * address from @a cls.
6346  *
6347  * @param cls a `struct CheckKnownAddressContext`
6348  * @param pid unused (must match though)
6349  * @param value a `struct ValidationState`
6350  * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
6351  */
6352 static int
6353 check_known_address (void *cls,
6354                      const struct GNUNET_PeerIdentity *pid,
6355                      void *value)
6356 {
6357   struct CheckKnownAddressContext *ckac = cls;
6358   struct ValidationState *vs = value;
6359
6360   (void) pid;
6361   if (0 != strcmp (vs->address,
6362                    ckac->address))
6363     return GNUNET_OK;
6364   ckac->vs = vs;
6365   return GNUNET_NO;
6366 }
6367
6368
6369 /**
6370  * Start address validation.
6371  *
6372  * @param pid peer the @a address is for
6373  * @param address an address to reach @a pid (presumably)
6374  * @param expiration when did @a pid claim @a address will become invalid
6375  */
6376 static void
6377 start_address_validation (const struct GNUNET_PeerIdentity *pid,
6378                           const char *address,
6379                           struct GNUNET_TIME_Absolute expiration)
6380 {
6381   struct GNUNET_TIME_Absolute now;
6382   struct ValidationState *vs;
6383   struct CheckKnownAddressContext ckac = {
6384     .address = address,
6385     .vs = NULL
6386   };
6387
6388   if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
6389     return; /* expired */
6390   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
6391                                                      pid,
6392                                                      &check_known_address,
6393                                                      &ckac);
6394   if (NULL != (vs = ckac.vs))
6395   {
6396     /* if 'vs' is not currently valid, we need to speed up retrying the validation */
6397     if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
6398     {
6399       /* reduce backoff as we got a fresh advertisement */
6400       vs->challenge_backoff = GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
6401                                                         GNUNET_TIME_relative_divide (vs->challenge_backoff,
6402                                                                                      2));
6403       update_next_challenge_time (vs,
6404                                   GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
6405     }
6406     return;
6407   }
6408   now = GNUNET_TIME_absolute_get();
6409   vs = GNUNET_new (struct ValidationState);
6410   vs->pid = *pid;
6411   vs->valid_until = expiration;
6412   vs->first_challenge_use = now;
6413   vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
6414   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
6415                               &vs->challenge,
6416                               sizeof (vs->challenge));
6417   vs->address = GNUNET_strdup (address);
6418   GNUNET_assert (GNUNET_YES ==
6419                  GNUNET_CONTAINER_multipeermap_put (validation_map,
6420                                                     &vs->pid,
6421                                                     vs,
6422                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6423   update_next_challenge_time (vs,
6424                               now);
6425 }
6426
6427
6428 /**
6429  * Function called by PEERSTORE for each matching record.
6430  *
6431  * @param cls closure
6432  * @param record peerstore record information
6433  * @param emsg error message, or NULL if no errors
6434  */
6435 static void
6436 handle_hello (void *cls,
6437               const struct GNUNET_PEERSTORE_Record *record,
6438               const char *emsg)
6439 {
6440   struct PeerRequest *pr = cls;
6441   const char *val;
6442
6443   if (NULL != emsg)
6444   {
6445     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
6446                 "Got failure from PEERSTORE: %s\n",
6447                 emsg);
6448     return;
6449   }
6450   val = record->value;
6451   if ( (0 == record->value_size) ||
6452        ('\0' != val[record->value_size - 1]) )
6453   {
6454     GNUNET_break (0);
6455     return;
6456   }
6457   start_address_validation (&pr->pid,
6458                             (const char *) record->value,
6459                             record->expiry);
6460 }
6461
6462
6463 /**
6464  * We have received a `struct ExpressPreferenceMessage` from an application client.
6465  *
6466  * @param cls handle to the client
6467  * @param msg the start message
6468  */
6469 static void
6470 handle_suggest (void *cls,
6471                 const struct ExpressPreferenceMessage *msg)
6472 {
6473   struct TransportClient *tc = cls;
6474   struct PeerRequest *pr;
6475
6476   if (CT_NONE == tc->type)
6477   {
6478     tc->type = CT_APPLICATION;
6479     tc->details.application.requests
6480       = GNUNET_CONTAINER_multipeermap_create (16,
6481                                               GNUNET_YES);
6482   }
6483   if (CT_APPLICATION != tc->type)
6484   {
6485     GNUNET_break (0);
6486     GNUNET_SERVICE_client_drop (tc->client);
6487     return;
6488   }
6489   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6490               "Client suggested we talk to %s with preference %d at rate %u\n",
6491               GNUNET_i2s (&msg->peer),
6492               (int) ntohl (msg->pk),
6493               (int) ntohl (msg->bw.value__));
6494   pr = GNUNET_new (struct PeerRequest);
6495   pr->tc = tc;
6496   pr->pid = msg->peer;
6497   pr->bw = msg->bw;
6498   pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk);
6499   if (GNUNET_YES !=
6500       GNUNET_CONTAINER_multipeermap_put (tc->details.application.requests,
6501                                          &pr->pid,
6502                                          pr,
6503                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
6504   {
6505     GNUNET_break (0);
6506     GNUNET_free (pr);
6507     GNUNET_SERVICE_client_drop (tc->client);
6508     return;
6509   }
6510   pr->wc = GNUNET_PEERSTORE_watch (peerstore,
6511                                    "transport",
6512                                    &pr->pid,
6513                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
6514                                    &handle_hello,
6515                                    pr);
6516   GNUNET_SERVICE_client_continue (tc->client);
6517 }
6518
6519
6520 /**
6521  * Given another peers address, consider checking it for validity
6522  * and then adding it to the Peerstore.
6523  *
6524  * @param cls a `struct TransportClient`
6525  * @param hdr message containing the raw address data and
6526  *        signature in the body, see #GNUNET_HELLO_extract_address()
6527  */
6528 static void
6529 handle_address_consider_verify (void *cls,
6530                                 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
6531 {
6532   struct TransportClient *tc = cls;
6533   char *address;
6534   enum GNUNET_NetworkType nt;
6535   struct GNUNET_TIME_Absolute expiration;
6536
6537   (void) cls;
6538   // FIXME: checking that we know this address already should
6539   //        be done BEFORE checking the signature => HELLO API change!
6540   // FIXME: pre-check: rate-limit signature verification / validation?!
6541   address = GNUNET_HELLO_extract_address (&hdr[1],
6542                                           ntohs (hdr->header.size) - sizeof (*hdr),
6543                                           &hdr->peer,
6544                                           &nt,
6545                                           &expiration);
6546   if (NULL == address)
6547   {
6548     GNUNET_break_op (0);
6549     return;
6550   }
6551   start_address_validation (&hdr->peer,
6552                             address,
6553                             expiration);
6554   GNUNET_free (address);
6555   GNUNET_SERVICE_client_continue (tc->client);
6556 }
6557
6558
6559 /**
6560  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_HELLO_VALIDATION
6561  * messages.
6562  *
6563  * @param cls a `struct TransportClient *`
6564  * @param m message to verify
6565  * @return #GNUNET_OK on success
6566  */
6567 static int
6568 check_request_hello_validation (void *cls,
6569                                 const struct RequestHelloValidationMessage *m)
6570 {
6571   (void) cls;
6572   GNUNET_MQ_check_zero_termination (m);
6573   return GNUNET_OK;
6574 }
6575
6576
6577 /**
6578  * A client encountered an address of another peer. Consider validating it,
6579  * and if validation succeeds, persist it to PEERSTORE.
6580  *
6581  * @param cls a `struct TransportClient *`
6582  * @param m message to verify
6583  */
6584 static void
6585 handle_request_hello_validation (void *cls,
6586                                  const struct RequestHelloValidationMessage *m)
6587 {
6588   struct TransportClient *tc = cls;
6589
6590   start_address_validation (&m->peer,
6591                             (const char *) &m[1],
6592                             GNUNET_TIME_absolute_ntoh (m->expiration));
6593   GNUNET_SERVICE_client_continue (tc->client);
6594 }
6595
6596
6597 /**
6598  * Free neighbour entry.
6599  *
6600  * @param cls NULL
6601  * @param pid unused
6602  * @param value a `struct Neighbour`
6603  * @return #GNUNET_OK (always)
6604  */
6605 static int
6606 free_neighbour_cb (void *cls,
6607                    const struct GNUNET_PeerIdentity *pid,
6608                    void *value)
6609 {
6610   struct Neighbour *neighbour = value;
6611
6612   (void) cls;
6613   (void) pid;
6614   GNUNET_break (0); // should this ever happen?
6615   free_neighbour (neighbour);
6616
6617   return GNUNET_OK;
6618 }
6619
6620
6621 /**
6622  * Free DV route entry.
6623  *
6624  * @param cls NULL
6625  * @param pid unused
6626  * @param value a `struct DistanceVector`
6627  * @return #GNUNET_OK (always)
6628  */
6629 static int
6630 free_dv_routes_cb (void *cls,
6631                    const struct GNUNET_PeerIdentity *pid,
6632                    void *value)
6633 {
6634   struct DistanceVector *dv = value;
6635
6636   (void) cls;
6637   (void) pid;
6638   free_dv_route (dv);
6639
6640   return GNUNET_OK;
6641 }
6642
6643
6644 /**
6645  * Free ephemeral entry.
6646  *
6647  * @param cls NULL
6648  * @param pid unused
6649  * @param value a `struct EphemeralCacheEntry`
6650  * @return #GNUNET_OK (always)
6651  */
6652 static int
6653 free_ephemeral_cb (void *cls,
6654                    const struct GNUNET_PeerIdentity *pid,
6655                    void *value)
6656 {
6657   struct EphemeralCacheEntry *ece = value;
6658
6659   (void) cls;
6660   (void) pid;
6661   free_ephemeral (ece);
6662   return GNUNET_OK;
6663 }
6664
6665
6666 /**
6667  * Free validation state.
6668  *
6669  * @param cls NULL
6670  * @param pid unused
6671  * @param value a `struct ValidationState`
6672  * @return #GNUNET_OK (always)
6673  */
6674 static int
6675 free_validation_state_cb (void *cls,
6676                           const struct GNUNET_PeerIdentity *pid,
6677                           void *value)
6678 {
6679   struct ValidationState *vs = value;
6680
6681   (void) cls;
6682   (void) pid;
6683   free_validation_state (vs);
6684   return GNUNET_OK;
6685 }
6686
6687
6688 /**
6689  * Function called when the service shuts down.  Unloads our plugins
6690  * and cancels pending validations.
6691  *
6692  * @param cls closure, unused
6693  */
6694 static void
6695 do_shutdown (void *cls)
6696 {
6697   struct LearnLaunchEntry *lle;
6698   (void) cls;
6699
6700   if (NULL != ephemeral_task)
6701   {
6702     GNUNET_SCHEDULER_cancel (ephemeral_task);
6703     ephemeral_task = NULL;
6704   }
6705   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
6706                                          &free_neighbour_cb,
6707                                          NULL);
6708   if (NULL != peerstore)
6709   {
6710     GNUNET_PEERSTORE_disconnect (peerstore,
6711                                  GNUNET_NO);
6712     peerstore = NULL;
6713   }
6714   if (NULL != GST_stats)
6715   {
6716     GNUNET_STATISTICS_destroy (GST_stats,
6717                                GNUNET_NO);
6718     GST_stats = NULL;
6719   }
6720   if (NULL != GST_my_private_key)
6721   {
6722     GNUNET_free (GST_my_private_key);
6723     GST_my_private_key = NULL;
6724   }
6725   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
6726   neighbours = NULL;
6727   GNUNET_CONTAINER_multipeermap_iterate (validation_map,
6728                                          &free_validation_state_cb,
6729                                          NULL);
6730   GNUNET_CONTAINER_multipeermap_destroy (validation_map);
6731   validation_map = NULL;
6732   while (NULL != (lle = lle_head))
6733   {
6734     GNUNET_CONTAINER_DLL_remove (lle_head,
6735                                  lle_tail,
6736                                  lle);
6737     GNUNET_free (lle);
6738   }
6739   GNUNET_CONTAINER_multishortmap_destroy (dvlearn_map);
6740   dvlearn_map = NULL;
6741   GNUNET_CONTAINER_heap_destroy (validation_heap);
6742   validation_heap = NULL;
6743   GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
6744                                          &free_dv_routes_cb,
6745                                          NULL);
6746   GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
6747   dv_routes = NULL;
6748   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
6749                                          &free_ephemeral_cb,
6750                                          NULL);
6751   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
6752   ephemeral_map = NULL;
6753   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
6754   ephemeral_heap = NULL;
6755 }
6756
6757
6758 /**
6759  * Initiate transport service.
6760  *
6761  * @param cls closure
6762  * @param c configuration to use
6763  * @param service the initialized service
6764  */
6765 static void
6766 run (void *cls,
6767      const struct GNUNET_CONFIGURATION_Handle *c,
6768      struct GNUNET_SERVICE_Handle *service)
6769 {
6770   (void) cls;
6771   (void) service;
6772   /* setup globals */
6773   GST_cfg = c;
6774   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
6775                                                      GNUNET_YES);
6776   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
6777                                                     GNUNET_YES);
6778   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
6779                                                         GNUNET_YES);
6780   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
6781   dvlearn_map = GNUNET_CONTAINER_multishortmap_create (2 * MAX_DV_LEARN_PENDING,
6782                                                        GNUNET_YES);
6783   validation_map = GNUNET_CONTAINER_multipeermap_create (1024,
6784                                                          GNUNET_YES);
6785   validation_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
6786   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
6787   if (NULL == GST_my_private_key)
6788   {
6789     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
6790                 _("Transport service is lacking key configuration settings. Exiting.\n"));
6791     GNUNET_SCHEDULER_shutdown ();
6792     return;
6793   }
6794   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
6795                                       &GST_my_identity.public_key);
6796   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
6797              "My identity is `%s'\n",
6798              GNUNET_i2s_full (&GST_my_identity));
6799   GST_stats = GNUNET_STATISTICS_create ("transport",
6800                                         GST_cfg);
6801   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
6802                                  NULL);
6803   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
6804   if (NULL == peerstore)
6805   {
6806     GNUNET_break (0);
6807     GNUNET_SCHEDULER_shutdown ();
6808     return;
6809   }
6810 }
6811
6812
6813 /**
6814  * Define "main" method using service macro.
6815  */
6816 GNUNET_SERVICE_MAIN
6817 ("transport",
6818  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
6819  &run,
6820  &client_connect_cb,
6821  &client_disconnect_cb,
6822  NULL,
6823  /* communication with applications */
6824  GNUNET_MQ_hd_fixed_size (suggest,
6825                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST,
6826                           struct ExpressPreferenceMessage,
6827                           NULL),
6828  GNUNET_MQ_hd_fixed_size (suggest_cancel,
6829                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL,
6830                           struct ExpressPreferenceMessage,
6831                           NULL),
6832  GNUNET_MQ_hd_var_size (request_hello_validation,
6833                         GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_HELLO_VALIDATION,
6834                         struct RequestHelloValidationMessage,
6835                         NULL),
6836  /* communication with core */
6837  GNUNET_MQ_hd_fixed_size (client_start,
6838                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
6839                           struct StartMessage,
6840                           NULL),
6841  GNUNET_MQ_hd_var_size (client_send,
6842                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
6843                         struct OutboundMessage,
6844                         NULL),
6845  /* communication with communicators */
6846  GNUNET_MQ_hd_var_size (communicator_available,
6847                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
6848                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
6849                         NULL),
6850  GNUNET_MQ_hd_var_size (communicator_backchannel,
6851                         GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
6852                         struct GNUNET_TRANSPORT_CommunicatorBackchannel,
6853                         NULL),
6854  GNUNET_MQ_hd_var_size (add_address,
6855                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
6856                         struct GNUNET_TRANSPORT_AddAddressMessage,
6857                         NULL),
6858  GNUNET_MQ_hd_fixed_size (del_address,
6859                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
6860                           struct GNUNET_TRANSPORT_DelAddressMessage,
6861                           NULL),
6862  GNUNET_MQ_hd_var_size (incoming_msg,
6863                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
6864                         struct GNUNET_TRANSPORT_IncomingMessage,
6865                         NULL),
6866  GNUNET_MQ_hd_fixed_size (queue_create_ok,
6867                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
6868                           struct GNUNET_TRANSPORT_CreateQueueResponse,
6869                           NULL),
6870  GNUNET_MQ_hd_fixed_size (queue_create_fail,
6871                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
6872                           struct GNUNET_TRANSPORT_CreateQueueResponse,
6873                           NULL),
6874  GNUNET_MQ_hd_var_size (add_queue_message,
6875                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
6876                         struct GNUNET_TRANSPORT_AddQueueMessage,
6877                         NULL),
6878  GNUNET_MQ_hd_var_size (address_consider_verify,
6879                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
6880                         struct GNUNET_TRANSPORT_AddressToVerify,
6881                         NULL),
6882  GNUNET_MQ_hd_fixed_size (del_queue_message,
6883                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
6884                           struct GNUNET_TRANSPORT_DelQueueMessage,
6885                           NULL),
6886  GNUNET_MQ_hd_fixed_size (send_message_ack,
6887                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
6888                           struct GNUNET_TRANSPORT_SendMessageToAck,
6889                           NULL),
6890  /* communication with monitors */
6891  GNUNET_MQ_hd_fixed_size (monitor_start,
6892                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
6893                           struct GNUNET_TRANSPORT_MonitorStart,
6894                           NULL),
6895  GNUNET_MQ_handler_end ());
6896
6897
6898 /* end of file gnunet-service-transport.c */