6a8a3fc4d574b1fceaeb69291c8a305ef85d4343
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement next:
36  * - DV data structures:
37  *   + using DV routes!
38  *     - handling of DV-boxed messages that need to be forwarded
39  *     - route_message implementation, including using DV data structures
40  *       (but not when routing certain message types, like DV learn,
41  *        MUST pay attention to content here -- or pass extra flags?)
42  * - ACK handling / retransmission
43  * - track RTT, distance, loss, etc.
44  * - backchannel message encryption & decryption
45  *
46  * Later:
47  * - change transport-core API to provide proper flow control in both
48  *   directions, allow multiple messages per peer simultaneously (tag
49  *   confirmations with unique message ID), and replace quota-out with
50  *   proper flow control;
51  * - if messages are below MTU, consider adding ACKs and other stuff
52  *   (requires planning at receiver, and additional MST-style demultiplex
53  *    at receiver!)
54  * - could avoid copying body of message into each fragment and keep
55  *   fragments as just pointers into the original message and only
56  *   fully build fragments just before transmission (optimization, should
57  *   reduce CPU and memory use)
58  *
59  * Design realizations / discussion:
60  * - communicators do flow control by calling MQ "notify sent"
61  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
62  *   or explicitly via backchannel FC ACKs.  As long as the
63  *   channel is not full, they may 'notify sent' even if the other
64  *   peer has not yet confirmed receipt. The other peer confirming
65  *   is _only_ for FC, not for more reliable transmission; reliable
66  *   transmission (i.e. of fragments) is left to _transport_.
67  * - ACKs sent back in uni-directional communicators are done via
68  *   the background channel API; here transport _may_ initially
69  *   broadcast (with bounded # hops) if no path is known;
70  * - transport should _integrate_ DV-routing and build a view of
71  *   the network; then background channel traffic can be
72  *   routed via DV as well as explicit "DV" traffic.
73  * - background channel is also used for ACKs and NAT traversal support
74  * - transport service is responsible for AEAD'ing the background
75  *   channel, timestamps and monotonic time are used against replay
76  *   of old messages -> peerstore needs to be supplied with
77  *   "latest timestamps seen" data
78  * - if transport implements DV, we likely need a 3rd peermap
79  *   in addition to ephemerals and (direct) neighbours
80  *   ==> check if stuff needs to be moved out of "Neighbour"
81  * - transport should encapsualte core-level messages and do its
82  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
83  *   for retransmission
84  */
85 #include "platform.h"
86 #include "gnunet_util_lib.h"
87 #include "gnunet_statistics_service.h"
88 #include "gnunet_transport_monitor_service.h"
89 #include "gnunet_peerstore_service.h"
90 #include "gnunet_hello_lib.h"
91 #include "gnunet_signatures.h"
92 #include "transport.h"
93
94
95 /**
96  * What is the size we assume for a read operation in the
97  * absence of an MTU for the purpose of flow control?
98  */
99 #define IN_PACKET_SIZE_WITHOUT_MTU 128
100
101 /**
102  * Minimum number of hops we should forward DV learn messages
103  * even if they are NOT useful for us in hope of looping
104  * back to the initiator?
105  *
106  * FIXME: allow initiator some control here instead?
107  */
108 #define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
109
110 /**
111  * Maximum DV distance allowed ever.
112  */
113 #define MAX_DV_HOPS_ALLOWED 16
114
115 /**
116  * Maximum number of DV learning activities we may
117  * have pending at the same time.
118  */
119 #define MAX_DV_LEARN_PENDING 64
120
121 /**
122  * Maximum number of DV paths we keep simultaneously to the same target.
123  */
124 #define MAX_DV_PATHS_TO_TARGET 3
125
126 /**
127  * If a queue delays the next message by more than this number
128  * of seconds we log a warning. Note: this is for testing,
129  * the value chosen here might be too aggressively low!
130  */
131 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
132
133 /**
134  * We only consider queues as "quality" connections when
135  * suppressing the generation of DV initiation messages if
136  * the latency of the queue is below this threshold.
137  */
138 #define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
139
140 /**
141  * How long do we consider a DV path valid if we see no
142  * further updates on it? Note: the value chosen here might be too low!
143  */
144 #define DV_PATH_VALIDITY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
145
146 /**
147  * How long before paths expire would we like to (re)discover DV paths? Should
148  * be below #DV_PATH_VALIDITY_TIMEOUT.
149  */
150 #define DV_PATH_DISCOVERY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
151
152 /**
153  * How long are ephemeral keys valid?
154  */
155 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
156
157 /**
158  * How long do we keep partially reassembled messages around before giving up?
159  */
160 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
161
162 /**
163  * What is the fastest rate at which we send challenges *if* we keep learning
164  * an address (gossip, DHT, etc.)?
165  */
166 #define FAST_VALIDATION_CHALLENGE_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
167
168 /**
169  * What is the slowest rate at which we send challenges?
170  */
171 #define MAX_VALIDATION_CHALLENGE_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
172
173 /**
174  * What is the non-randomized base frequency at which we
175  * would initiate DV learn messages?
176  */
177 #define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
178
179 /**
180  * How many good connections (confirmed, bi-directional, not DV)
181  * do we need to have to suppress initiating DV learn messages?
182  */
183 #define DV_LEARN_QUALITY_THRESHOLD 100
184
185 /**
186  * When do we forget an invalid address for sure?
187  */
188 #define MAX_ADDRESS_VALID_UNTIL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
189 /**
190  * How long do we consider an address valid if we just checked?
191  */
192 #define ADDRESS_VALIDATION_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
193
194 /**
195  * What is the maximum frequency at which we do address validation?
196  * A random value between 0 and this value is added when scheduling
197  * the #validation_task (both to ensure we do not validate too often,
198  * and to randomize a bit).
199  */
200 #define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
201
202 /**
203  * How many network RTTs before an address validation expires should we begin
204  * trying to revalidate? (Note that the RTT used here is the one that we
205  * experienced during the last validation, not necessarily the latest RTT
206  * observed).
207  */
208 #define VALIDATION_RTT_BUFFER_FACTOR 3
209
210 /**
211  * How many messages can we have pending for a given communicator
212  * process before we start to throttle that communicator?
213  *
214  * Used if a communicator might be CPU-bound and cannot handle the traffic.
215  */
216 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
217
218 /**
219  * How many messages can we have pending for a given queue (queue to
220  * a particular peer via a communicator) process before we start to
221  * throttle that queue?
222  */
223 #define QUEUE_LENGTH_LIMIT 32
224
225
226 GNUNET_NETWORK_STRUCT_BEGIN
227
228 /**
229  * Outer layer of an encapsulated backchannel message.
230  */
231 struct TransportBackchannelEncapsulationMessage
232 {
233   /**
234    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
235    */
236   struct GNUNET_MessageHeader header;
237
238   /**
239    * Distance the backchannel message has traveled, to be updated at
240    * each hop.  Used to bound the number of hops in case a backchannel
241    * message is broadcast and thus travels without routing
242    * information (during initial backchannel discovery).
243    */
244   uint32_t distance;
245
246   /**
247    * Target's peer identity (as backchannels may be transmitted
248    * indirectly, or even be broadcast).
249    */
250   struct GNUNET_PeerIdentity target;
251
252   /**
253    * Ephemeral key setup by the sender for @e target, used
254    * to encrypt the payload.
255    */
256   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
257
258   // FIXME: probably should add random IV here as well,
259   // especially if we re-use ephemeral keys!
260
261   /**
262    * HMAC over the ciphertext of the encrypted, variable-size
263    * body that follows.  Verified via DH of @e target and
264    * @e ephemeral_key
265    */
266   struct GNUNET_HashCode hmac;
267
268   /* Followed by encrypted, variable-size payload */
269 };
270
271
272 /**
273  * Body by which a peer confirms that it is using an ephemeral key.
274  */
275 struct EphemeralConfirmation
276 {
277
278   /**
279    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
280    */
281   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
282
283   /**
284    * How long is this signature over the ephemeral key valid?
285    * Note that the receiver MUST IGNORE the absolute time, and
286    * only interpret the value as a mononic time and reject
287    * "older" values than the last one observed.  Even with this,
288    * there is no real guarantee against replay achieved here,
289    * as the latest timestamp is not persisted.  This is
290    * necessary as we do not want to require synchronized
291    * clocks and may not have a bidirectional communication
292    * channel.  Communicators must protect against replay
293    * attacks when using backchannel communication!
294    */
295   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
296
297   /**
298    * Target's peer identity.
299    */
300   struct GNUNET_PeerIdentity target;
301
302   /**
303    * Ephemeral key setup by the sender for @e target, used
304    * to encrypt the payload.
305    */
306   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
307
308 };
309
310
311 /**
312  * Plaintext of the variable-size payload that is encrypted
313  * within a `struct TransportBackchannelEncapsulationMessage`
314  */
315 struct TransportBackchannelRequestPayload
316 {
317
318   /**
319    * Sender's peer identity.
320    */
321   struct GNUNET_PeerIdentity sender;
322
323   /**
324    * Signature of the sender over an
325    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
326    */
327   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
328
329   /**
330    * How long is this signature over the ephemeral key
331    * valid?
332    */
333   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
334
335   /**
336    * Current monotonic time of the sending transport service.  Used to
337    * detect replayed messages.  Note that the receiver should remember
338    * a list of the recently seen timestamps and only reject messages
339    * if the timestamp is in the list, or the list is "full" and the
340    * timestamp is smaller than the lowest in the list.  This list of
341    * timestamps per peer should be persisted to guard against replays
342    * after restarts.
343    */
344   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
345
346   /* Followed by a `struct GNUNET_MessageHeader` with a message
347      for a communicator */
348
349   /* Followed by a 0-termianted string specifying the name of
350      the communicator which is to receive the message */
351
352 };
353
354
355 /**
356  * Outer layer of an encapsulated unfragmented application message sent
357  * over an unreliable channel.
358  */
359 struct TransportReliabilityBox
360 {
361   /**
362    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
363    */
364   struct GNUNET_MessageHeader header;
365
366   /**
367    * Number of messages still to be sent before a commulative
368    * ACK is requested.  Zero if an ACK is requested immediately.
369    * In NBO.  Note that the receiver may send the ACK faster
370    * if it believes that is reasonable.
371    */
372   uint32_t ack_countdown GNUNET_PACKED;
373
374   /**
375    * Unique ID of the message used for signalling receipt of
376    * messages sent over possibly unreliable channels.  Should
377    * be a random.
378    */
379   struct GNUNET_ShortHashCode msg_uuid;
380 };
381
382
383 /**
384  * Confirmation that the receiver got a
385  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
386  * confirmation may be transmitted over a completely different queue,
387  * so ACKs are identified by a combination of PID of sender and
388  * message UUID, without the queue playing any role!
389  */
390 struct TransportReliabilityAckMessage
391 {
392   /**
393    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
394    */
395   struct GNUNET_MessageHeader header;
396
397   /**
398    * Reserved. Zero.
399    */
400   uint32_t reserved GNUNET_PACKED;
401
402   /**
403    * How long was the ACK delayed relative to the average time of
404    * receipt of the messages being acknowledged?  Used to calculate
405    * the average RTT by taking the receipt time of the ack minus the
406    * average transmission time of the sender minus this value.
407    */
408   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
409
410   /* followed by any number of `struct GNUNET_ShortHashCode`
411      messages providing ACKs */
412 };
413
414
415 /**
416  * Outer layer of an encapsulated fragmented application message.
417  */
418 struct TransportFragmentBox
419 {
420   /**
421    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
422    */
423   struct GNUNET_MessageHeader header;
424
425   /**
426    * Unique ID of this fragment (and fragment transmission!). Will
427    * change even if a fragement is retransmitted to make each
428    * transmission attempt unique! Should be incremented by one for
429    * each fragment transmission. If a client receives a duplicate
430    * fragment (same @e frag_off), it must send
431    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
432    */
433   uint32_t frag_uuid GNUNET_PACKED;
434
435   /**
436    * Original message ID for of the message that all the1
437    * fragments belong to.  Must be the same for all fragments.
438    */
439   struct GNUNET_ShortHashCode msg_uuid;
440
441   /**
442    * Offset of this fragment in the overall message.
443    */
444   uint16_t frag_off GNUNET_PACKED;
445
446   /**
447    * Total size of the message that is being fragmented.
448    */
449   uint16_t msg_size GNUNET_PACKED;
450
451 };
452
453
454 /**
455  * Outer layer of an fragmented application message sent over a queue
456  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
457  * received, the receiver has two RTTs or 64 further fragments with
458  * the same basic message time to send an acknowledgement, possibly
459  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
460  * sent immediately once all fragments were sent.
461  */
462 struct TransportFragmentAckMessage
463 {
464   /**
465    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
466    */
467   struct GNUNET_MessageHeader header;
468
469   /**
470    * Unique ID of the lowest fragment UUID being acknowledged.
471    */
472   uint32_t frag_uuid GNUNET_PACKED;
473
474   /**
475    * Bitfield of up to 64 additional fragments following the
476    * @e msg_uuid being acknowledged by this message.
477    */
478   uint64_t extra_acks GNUNET_PACKED;
479
480   /**
481    * Original message ID for of the message that all the
482    * fragments belong to.
483    */
484   struct GNUNET_ShortHashCode msg_uuid;
485
486   /**
487    * How long was the ACK delayed relative to the average time of
488    * receipt of the fragments being acknowledged?  Used to calculate
489    * the average RTT by taking the receipt time of the ack minus the
490    * average transmission time of the sender minus this value.
491    */
492   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
493
494   /**
495    * How long until the receiver will stop trying reassembly
496    * of this message?
497    */
498   struct GNUNET_TIME_RelativeNBO reassembly_timeout;
499 };
500
501
502 /**
503  * Content signed by each peer during DV learning.
504  */
505 struct DvInitPS
506 {
507   /**
508    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
509    */
510   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
511
512   /**
513    * Challenge value used by the initiator to re-identify the path.
514    */
515   struct GNUNET_ShortHashCode challenge;
516
517 };
518
519
520 /**
521  * Content signed by each peer during DV learning.
522  */
523 struct DvHopPS
524 {
525   /**
526    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP
527    */
528   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
529
530   /**
531    * Identity of the previous peer on the path.
532    */
533   struct GNUNET_PeerIdentity pred;
534
535   /**
536    * Identity of the next peer on the path.
537    */
538   struct GNUNET_PeerIdentity succ;
539
540   /**
541    * Challenge value used by the initiator to re-identify the path.
542    */
543   struct GNUNET_ShortHashCode challenge;
544
545 };
546
547
548 /**
549  * An entry describing a peer on a path in a
550  * `struct TransportDVLearn` message.
551  */
552 struct DVPathEntryP
553 {
554   /**
555    * Identity of a peer on the path.
556    */
557   struct GNUNET_PeerIdentity hop;
558
559   /**
560    * Signature of this hop over the path, of purpose
561    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP
562    */
563   struct GNUNET_CRYPTO_EddsaSignature hop_sig;
564
565 };
566
567
568 /**
569  * Internal message used by transport for distance vector learning.
570  * If @e num_hops does not exceed the threshold, peers should append
571  * themselves to the peer list and flood the message (possibly only
572  * to a subset of their neighbours to limit discoverability of the
573  * network topology).  To the extend that the @e bidirectional bits
574  * are set, peers may learn the inverse paths even if they did not
575  * initiate.
576  *
577  * Unless received on a bidirectional queue and @e num_hops just
578  * zero, peers that can forward to the initator should always try to
579  * forward to the initiator.
580  */
581 struct TransportDVLearn
582 {
583   /**
584    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
585    */
586   struct GNUNET_MessageHeader header;
587
588   /**
589    * Number of hops this messages has travelled, in NBO. Zero if
590    * sent by initiator.
591    */
592   uint16_t num_hops GNUNET_PACKED;
593
594   /**
595    * Bitmask of the last 16 hops indicating whether they are confirmed
596    * available (without DV) in both directions or not, in NBO.  Used
597    * to possibly instantly learn a path in both directions.  Each peer
598    * should shift this value by one to the left, and then set the
599    * lowest bit IF the current sender can be reached from it (without
600    * DV routing).
601    */
602   uint16_t bidirectional GNUNET_PACKED;
603
604   /**
605    * Peers receiving this message and delaying forwarding to other
606    * peers for any reason should increment this value by the non-network
607    * delay created by the peer.
608    */
609   struct GNUNET_TIME_RelativeNBO non_network_delay;
610
611   /**
612    * Signature of this hop over the path, of purpose
613    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
614    */
615   struct GNUNET_CRYPTO_EddsaSignature init_sig;
616
617   /**
618    * Identity of the peer that started this learning activity.
619    */
620   struct GNUNET_PeerIdentity initiator;
621
622   /**
623    * Challenge value used by the initiator to re-identify the path.
624    */
625   struct GNUNET_ShortHashCode challenge;
626
627   /* Followed by @e num_hops `struct DVPathEntryP` values,
628      excluding the initiator of the DV trace; the last entry is the
629      current sender; the current peer must not be included. */
630
631 };
632
633
634 /**
635  * Outer layer of an encapsulated message send over multiple hops.
636  * The path given only includes the identities of the subsequent
637  * peers, i.e. it will be empty if we are the receiver. Each
638  * forwarding peer should scan the list from the end, and if it can,
639  * forward to the respective peer. The list should then be shortened
640  * by all the entries up to and including that peer.  Each hop should
641  * also increment @e total_hops to allow the receiver to get a precise
642  * estimate on the number of hops the message travelled.  Senders must
643  * provide a learned path that thus should work, but intermediaries
644  * know of a shortcut, they are allowed to send the message via that
645  * shortcut.
646  *
647  * If a peer finds itself still on the list, it must drop the message.
648  */
649 struct TransportDVBox
650 {
651   /**
652    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
653    */
654   struct GNUNET_MessageHeader header;
655
656   /**
657    * Number of total hops this messages travelled. In NBO.
658    * @e origin sets this to zero, to be incremented at
659    * each hop.
660    */
661   uint16_t total_hops GNUNET_PACKED;
662
663   /**
664    * Number of hops this messages includes. In NBO.
665    */
666   uint16_t num_hops GNUNET_PACKED;
667
668   /**
669    * Identity of the peer that originated the message.
670    */
671   struct GNUNET_PeerIdentity origin;
672
673   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
674      excluding the @e origin and the current peer, the last must be
675      the ultimate target; if @e num_hops is zero, the receiver of this
676      message is the ultimate target. */
677
678   /* Followed by the actual message, which itself may be
679      another box, but not a DV_LEARN or DV_BOX message! */
680 };
681
682
683 /**
684  * Message send to another peer to validate that it can indeed
685  * receive messages at a particular address.
686  */
687 struct TransportValidationChallenge
688 {
689
690   /**
691    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE
692    */
693   struct GNUNET_MessageHeader header;
694
695   /**
696    * Zero.
697    */
698   uint32_t reserved GNUNET_PACKED;
699
700   /**
701    * Challenge to be signed by the receiving peer.
702    */
703   struct GNUNET_ShortHashCode challenge;
704
705   /**
706    * Timestamp of the sender, to be copied into the reply
707    * to allow sender to calculate RTT.
708    */
709   struct GNUNET_TIME_AbsoluteNBO sender_time;
710 };
711
712
713 /**
714  * Message signed by a peer to confirm that it can indeed
715  * receive messages at a particular address.
716  */
717 struct TransportValidationPS
718 {
719
720   /**
721    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE
722    */
723   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
724
725   /**
726    * How long does the sender believe the address on
727    * which the challenge was received to remain valid?
728    */
729   struct GNUNET_TIME_RelativeNBO validity_duration;
730
731   /**
732    * Challenge signed by the receiving peer.
733    */
734   struct GNUNET_ShortHashCode challenge;
735
736 };
737
738
739 /**
740  * Message send to a peer to respond to a
741  * #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
742  */
743 struct TransportValidationResponse
744 {
745
746   /**
747    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE
748    */
749   struct GNUNET_MessageHeader header;
750
751   /**
752    * Zero.
753    */
754   uint32_t reserved GNUNET_PACKED;
755
756   /**
757    * The peer's signature matching the
758    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE purpose.
759    */
760   struct GNUNET_CRYPTO_EddsaSignature signature;
761
762   /**
763    * The challenge that was signed by the receiving peer.
764    */
765   struct GNUNET_ShortHashCode challenge;
766
767   /**
768    * Original timestamp of the sender (was @code{sender_time}),
769    * copied into the reply to allow sender to calculate RTT.
770    */
771   struct GNUNET_TIME_AbsoluteNBO origin_time;
772
773   /**
774    * How long does the sender believe this address to remain
775    * valid?
776    */
777   struct GNUNET_TIME_RelativeNBO validity_duration;
778 };
779
780
781
782 GNUNET_NETWORK_STRUCT_END
783
784
785 /**
786  * What type of client is the `struct TransportClient` about?
787  */
788 enum ClientType
789 {
790   /**
791    * We do not know yet (client is fresh).
792    */
793   CT_NONE = 0,
794
795   /**
796    * Is the CORE service, we need to forward traffic to it.
797    */
798   CT_CORE = 1,
799
800   /**
801    * It is a monitor, forward monitor data.
802    */
803   CT_MONITOR = 2,
804
805   /**
806    * It is a communicator, use for communication.
807    */
808   CT_COMMUNICATOR = 3,
809
810   /**
811    * "Application" telling us where to connect (i.e. TOPOLOGY, DHT or CADET).
812    */
813   CT_APPLICATION = 4
814 };
815
816
817 /**
818  * When did we launch this DV learning activity?
819  */
820 struct LearnLaunchEntry
821 {
822
823   /**
824    * Kept (also) in a DLL sorted by launch time.
825    */
826   struct LearnLaunchEntry *prev;
827
828   /**
829    * Kept (also) in a DLL sorted by launch time.
830    */
831   struct LearnLaunchEntry *next;
832
833   /**
834    * Challenge that uniquely identifies this activity.
835    */
836   struct GNUNET_ShortHashCode challenge;
837
838   /**
839    * When did we transmit the DV learn message (used to
840    * calculate RTT).
841    */
842   struct GNUNET_TIME_Absolute launch_time;
843
844 };
845
846
847 /**
848  * Entry in our cache of ephemeral keys we currently use.
849  * This way, we only sign an ephemeral once per @e target,
850  * and then can re-use it over multiple
851  * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION
852  * messages (as signing is expensive).
853  */
854 struct EphemeralCacheEntry
855 {
856
857   /**
858    * Target's peer identity (we don't re-use ephemerals
859    * to limit linkability of messages).
860    */
861   struct GNUNET_PeerIdentity target;
862
863   /**
864    * Signature affirming @e ephemeral_key of type
865    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
866    */
867   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
868
869   /**
870    * How long is @e sender_sig valid
871    */
872   struct GNUNET_TIME_Absolute ephemeral_validity;
873
874   /**
875    * Our ephemeral key.
876    */
877   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
878
879   /**
880    * Our private ephemeral key.
881    */
882   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
883
884   /**
885    * Node in the ephemeral cache for this entry.
886    * Used for expiration.
887    */
888   struct GNUNET_CONTAINER_HeapNode *hn;
889 };
890
891
892 /**
893  * Client connected to the transport service.
894  */
895 struct TransportClient;
896
897
898 /**
899  * A neighbour that at least one communicator is connected to.
900  */
901 struct Neighbour;
902
903
904 /**
905  * Entry in our #dv_routes table, representing a (set of) distance
906  * vector routes to a particular peer.
907  */
908 struct DistanceVector;
909
910 /**
911  * One possible hop towards a DV target.
912  */
913 struct DistanceVectorHop
914 {
915
916   /**
917    * Kept in a MDLL, sorted by @e timeout.
918    */
919   struct DistanceVectorHop *next_dv;
920
921   /**
922    * Kept in a MDLL, sorted by @e timeout.
923    */
924   struct DistanceVectorHop *prev_dv;
925
926   /**
927    * Kept in a MDLL.
928    */
929   struct DistanceVectorHop *next_neighbour;
930
931   /**
932    * Kept in a MDLL.
933    */
934   struct DistanceVectorHop *prev_neighbour;
935
936   /**
937    * What would be the next hop to @e target?
938    */
939   struct Neighbour *next_hop;
940
941   /**
942    * Distance vector entry this hop belongs with.
943    */
944   struct DistanceVector *dv;
945
946   /**
947    * Array of @e distance hops to the target, excluding @e next_hop.
948    * NULL if the entire path is us to @e next_hop to `target`. Allocated
949    * at the end of this struct.
950    */
951   const struct GNUNET_PeerIdentity *path;
952
953   /**
954    * At what time do we forget about this path unless we see it again
955    * while learning?
956    */
957   struct GNUNET_TIME_Absolute timeout;
958
959   /**
960    * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
961    * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
962    */
963   unsigned int distance;
964 };
965
966
967 /**
968  * Entry in our #dv_routes table, representing a (set of) distance
969  * vector routes to a particular peer.
970  */
971 struct DistanceVector
972 {
973
974   /**
975    * To which peer is this a route?
976    */
977   struct GNUNET_PeerIdentity target;
978
979   /**
980    * Known paths to @e target.
981    */
982   struct DistanceVectorHop *dv_head;
983
984   /**
985    * Known paths to @e target.
986    */
987   struct DistanceVectorHop *dv_tail;
988
989   /**
990    * Task scheduled to purge expired paths from @e dv_head MDLL.
991    */
992   struct GNUNET_SCHEDULER_Task *timeout_task;
993 };
994
995
996 /**
997  * A queue is a message queue provided by a communicator
998  * via which we can reach a particular neighbour.
999  */
1000 struct Queue;
1001
1002
1003 /**
1004  * Entry identifying transmission in one of our `struct
1005  * Queue` which still awaits an ACK.  This is used to
1006  * ensure we do not overwhelm a communicator and limit the number of
1007  * messages outstanding per communicator (say in case communicator is
1008  * CPU bound) and per queue (in case bandwidth allocation exceeds
1009  * what the communicator can actually provide towards a particular
1010  * peer/target).
1011  */
1012 struct QueueEntry
1013 {
1014
1015   /**
1016    * Kept as a DLL.
1017    */
1018   struct QueueEntry *next;
1019
1020   /**
1021    * Kept as a DLL.
1022    */
1023   struct QueueEntry *prev;
1024
1025   /**
1026    * Queue this entry is queued with.
1027    */
1028   struct Queue *queue;
1029
1030   /**
1031    * Message ID used for this message with the queue used for transmission.
1032    */
1033   uint64_t mid;
1034 };
1035
1036
1037 /**
1038  * A queue is a message queue provided by a communicator
1039  * via which we can reach a particular neighbour.
1040  */
1041 struct Queue
1042 {
1043   /**
1044    * Kept in a MDLL.
1045    */
1046   struct Queue *next_neighbour;
1047
1048   /**
1049    * Kept in a MDLL.
1050    */
1051   struct Queue *prev_neighbour;
1052
1053   /**
1054    * Kept in a MDLL.
1055    */
1056   struct Queue *prev_client;
1057
1058   /**
1059    * Kept in a MDLL.
1060    */
1061   struct Queue *next_client;
1062
1063   /**
1064    * Head of DLL of unacked transmission requests.
1065    */
1066   struct QueueEntry *queue_head;
1067
1068   /**
1069    * End of DLL of unacked transmission requests.
1070    */
1071   struct QueueEntry *queue_tail;
1072
1073   /**
1074    * Which neighbour is this queue for?
1075    */
1076   struct Neighbour *neighbour;
1077
1078   /**
1079    * Which communicator offers this queue?
1080    */
1081   struct TransportClient *tc;
1082
1083   /**
1084    * Address served by the queue.
1085    */
1086   const char *address;
1087
1088   /**
1089    * Task scheduled for the time when this queue can (likely) transmit the
1090    * next message. Still needs to check with the @e tracker_out to be sure.
1091    */
1092   struct GNUNET_SCHEDULER_Task *transmit_task;
1093
1094   /**
1095    * Our current RTT estimate for this queue.
1096    */
1097   struct GNUNET_TIME_Relative rtt;
1098
1099   /**
1100    * Message ID generator for transmissions on this queue.
1101    */
1102   uint64_t mid_gen;
1103
1104   /**
1105    * Unique identifier of this queue with the communicator.
1106    */
1107   uint32_t qid;
1108
1109   /**
1110    * Maximum transmission unit supported by this queue.
1111    */
1112   uint32_t mtu;
1113
1114   /**
1115    * Distance to the target of this queue.
1116    * FIXME: needed? DV is done differently these days...
1117    */
1118   uint32_t distance;
1119
1120   /**
1121    * Messages pending.
1122    */
1123   uint32_t num_msg_pending;
1124
1125   /**
1126    * Bytes pending.
1127    */
1128   uint32_t num_bytes_pending;
1129
1130   /**
1131    * Length of the DLL starting at @e queue_head.
1132    */
1133   unsigned int queue_length;
1134
1135   /**
1136    * Network type offered by this queue.
1137    */
1138   enum GNUNET_NetworkType nt;
1139
1140   /**
1141    * Connection status for this queue.
1142    */
1143   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1144
1145   /**
1146    * How much outbound bandwidth do we have available for this queue?
1147    */
1148   struct GNUNET_BANDWIDTH_Tracker tracker_out;
1149
1150   /**
1151    * How much inbound bandwidth do we have available for this queue?
1152    */
1153   struct GNUNET_BANDWIDTH_Tracker tracker_in;
1154 };
1155
1156
1157 /**
1158  * Information we keep for a message that we are reassembling.
1159  */
1160 struct ReassemblyContext
1161 {
1162
1163   /**
1164    * Original message ID for of the message that all the
1165    * fragments belong to.
1166    */
1167   struct GNUNET_ShortHashCode msg_uuid;
1168
1169   /**
1170    * Which neighbour is this context for?
1171    */
1172   struct Neighbour *neighbour;
1173
1174   /**
1175    * Entry in the reassembly heap (sorted by expiration).
1176    */
1177   struct GNUNET_CONTAINER_HeapNode *hn;
1178
1179   /**
1180    * Bitfield with @e msg_size bits representing the positions
1181    * where we have received fragments.  When we receive a fragment,
1182    * we check the bits in @e bitfield before incrementing @e msg_missing.
1183    *
1184    * Allocated after the reassembled message.
1185    */
1186   uint8_t *bitfield;
1187
1188   /**
1189    * Task for sending ACK. We may send ACKs either because of hitting
1190    * the @e extra_acks limit, or based on time and @e num_acks.  This
1191    * task is for the latter case.
1192    */
1193   struct GNUNET_SCHEDULER_Task *ack_task;
1194
1195   /**
1196    * At what time will we give up reassembly of this message?
1197    */
1198   struct GNUNET_TIME_Absolute reassembly_timeout;
1199
1200   /**
1201    * Average delay of all acks in @e extra_acks and @e frag_uuid.
1202    * Should be reset to zero when @e num_acks is set to 0.
1203    */
1204   struct GNUNET_TIME_Relative avg_ack_delay;
1205
1206   /**
1207    * Time we received the last fragment.  @e avg_ack_delay must be
1208    * incremented by now - @e last_frag multiplied by @e num_acks.
1209    */
1210   struct GNUNET_TIME_Absolute last_frag;
1211
1212   /**
1213    * Bitfield of up to 64 additional fragments following @e frag_uuid
1214    * to be acknowledged in the next cummulative ACK.
1215    */
1216   uint64_t extra_acks;
1217
1218   /**
1219    * Unique ID of the lowest fragment UUID to be acknowledged in the
1220    * next cummulative ACK.  Only valid if @e num_acks > 0.
1221    */
1222   uint32_t frag_uuid;
1223
1224   /**
1225    * Number of ACKs we have accumulated so far.  Reset to 0
1226    * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
1227    */
1228   unsigned int num_acks;
1229
1230   /**
1231    * How big is the message we are reassembling in total?
1232    */
1233   uint16_t msg_size;
1234
1235   /**
1236    * How many bytes of the message are still missing?  Defragmentation
1237    * is complete when @e msg_missing == 0.
1238    */
1239   uint16_t msg_missing;
1240
1241   /* Followed by @e msg_size bytes of the (partially) defragmented original message */
1242
1243   /* Followed by @e bitfield data */
1244 };
1245
1246
1247 /**
1248  * A neighbour that at least one communicator is connected to.
1249  */
1250 struct Neighbour
1251 {
1252
1253   /**
1254    * Which peer is this about?
1255    */
1256   struct GNUNET_PeerIdentity pid;
1257
1258   /**
1259    * Map with `struct ReassemblyContext` structs for fragments under
1260    * reassembly. May be NULL if we currently have no fragments from
1261    * this @e pid (lazy initialization).
1262    */
1263   struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
1264
1265   /**
1266    * Heap with `struct ReassemblyContext` structs for fragments under
1267    * reassembly. May be NULL if we currently have no fragments from
1268    * this @e pid (lazy initialization).
1269    */
1270   struct GNUNET_CONTAINER_Heap *reassembly_heap;
1271
1272   /**
1273    * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
1274    */
1275   struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
1276
1277   /**
1278    * Head of list of messages pending for this neighbour.
1279    */
1280   struct PendingMessage *pending_msg_head;
1281
1282   /**
1283    * Tail of list of messages pending for this neighbour.
1284    */
1285   struct PendingMessage *pending_msg_tail;
1286
1287   /**
1288    * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1289    * purged if this neighbour goes down.
1290    */
1291   struct DistanceVectorHop *dv_head;
1292
1293   /**
1294    * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1295    * purged if this neighbour goes down.
1296    */
1297   struct DistanceVectorHop *dv_tail;
1298
1299   /**
1300    * Head of DLL of queues to this peer.
1301    */
1302   struct Queue *queue_head;
1303
1304   /**
1305    * Tail of DLL of queues to this peer.
1306    */
1307   struct Queue *queue_tail;
1308
1309   /**
1310    * Task run to cleanup pending messages that have exceeded their timeout.
1311    */
1312   struct GNUNET_SCHEDULER_Task *timeout_task;
1313
1314   /**
1315    * Quota at which CORE is allowed to transmit to this peer.
1316    *
1317    * FIXME: not yet used, tricky to get right given multiple queues!
1318    *        (=> Idea: measure???)
1319    * FIXME: how do we set this value initially when we tell CORE?
1320    *    Options: start at a minimum value or at literally zero?
1321    *         (=> Current thought: clean would be zero!)
1322    */
1323   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1324
1325   /**
1326    * What is the earliest timeout of any message in @e pending_msg_tail?
1327    */
1328   struct GNUNET_TIME_Absolute earliest_timeout;
1329
1330 };
1331
1332
1333 /**
1334  * A peer that an application (client) would like us to talk to directly.
1335  */
1336 struct PeerRequest
1337 {
1338
1339   /**
1340    * Which peer is this about?
1341    */
1342   struct GNUNET_PeerIdentity pid;
1343
1344   /**
1345    * Client responsible for the request.
1346    */
1347   struct TransportClient *tc;
1348
1349   /**
1350    * Handle for watching the peerstore for HELLOs for this peer.
1351    */
1352   struct GNUNET_PEERSTORE_WatchContext *wc;
1353
1354   /**
1355    * What kind of performance preference does this @e tc have?
1356    */
1357   enum GNUNET_MQ_PreferenceKind pk;
1358
1359   /**
1360    * How much bandwidth would this @e tc like to see?
1361    */
1362   struct GNUNET_BANDWIDTH_Value32NBO bw;
1363
1364 };
1365
1366
1367 /**
1368  * Types of different pending messages.
1369  */
1370 enum PendingMessageType
1371 {
1372
1373   /**
1374    * Ordinary message received from the CORE service.
1375    */
1376   PMT_CORE = 0,
1377
1378   /**
1379    * Fragment box.
1380    */
1381   PMT_FRAGMENT_BOX = 1,
1382
1383   /**
1384    * Reliability box.
1385    */
1386   PMT_RELIABILITY_BOX = 2,
1387
1388   /**
1389    * Any type of acknowledgement.
1390    */
1391   PMT_ACKNOWLEDGEMENT = 3
1392
1393 };
1394
1395
1396 /**
1397  * Transmission request that is awaiting delivery.  The original
1398  * transmission requests from CORE may be too big for some queues.
1399  * In this case, a *tree* of fragments is created.  At each
1400  * level of the tree, fragments are kept in a DLL ordered by which
1401  * fragment should be sent next (at the head).  The tree is searched
1402  * top-down, with the original message at the root.
1403  *
1404  * To select a node for transmission, first it is checked if the
1405  * current node's message fits with the MTU.  If it does not, we
1406  * either calculate the next fragment (based on @e frag_off) from the
1407  * current node, or, if all fragments have already been created,
1408  * descend to the @e head_frag.  Even though the node was already
1409  * fragmented, the fragment may be too big if the fragment was
1410  * generated for a queue with a larger MTU. In this case, the node
1411  * may be fragmented again, thus creating a tree.
1412  *
1413  * When acknowledgements for fragments are received, the tree
1414  * must be pruned, removing those parts that were already
1415  * acknowledged.  When fragments are sent over a reliable
1416  * channel, they can be immediately removed.
1417  *
1418  * If a message is ever fragmented, then the original "full" message
1419  * is never again transmitted (even if it fits below the MTU), and
1420  * only (remaining) fragments are sent.
1421  */
1422 struct PendingMessage
1423 {
1424   /**
1425    * Kept in a MDLL of messages for this @a target.
1426    */
1427   struct PendingMessage *next_neighbour;
1428
1429   /**
1430    * Kept in a MDLL of messages for this @a target.
1431    */
1432   struct PendingMessage *prev_neighbour;
1433
1434   /**
1435    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1436    */
1437   struct PendingMessage *next_client;
1438
1439   /**
1440    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
1441    */
1442   struct PendingMessage *prev_client;
1443
1444   /**
1445    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1446    */
1447   struct PendingMessage *next_frag;
1448
1449   /**
1450    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
1451    */
1452   struct PendingMessage *prev_frag;
1453
1454   /**
1455    * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1456    */
1457   struct PendingMessage *bpm;
1458
1459   /**
1460    * Target of the request.
1461    */
1462   struct Neighbour *target;
1463
1464   /**
1465    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1466    */
1467   struct TransportClient *client;
1468
1469   /**
1470    * Head of a MDLL of fragments created for this core message.
1471    */
1472   struct PendingMessage *head_frag;
1473
1474   /**
1475    * Tail of a MDLL of fragments created for this core message.
1476    */
1477   struct PendingMessage *tail_frag;
1478
1479   /**
1480    * Our parent in the fragmentation tree.
1481    */
1482   struct PendingMessage *frag_parent;
1483
1484   /**
1485    * At what time should we give up on the transmission (and no longer retry)?
1486    */
1487   struct GNUNET_TIME_Absolute timeout;
1488
1489   /**
1490    * What is the earliest time for us to retry transmission of this message?
1491    */
1492   struct GNUNET_TIME_Absolute next_attempt;
1493
1494   /**
1495    * UUID to use for this message (used for reassembly of fragments, only
1496    * initialized if @e msg_uuid_set is #GNUNET_YES).
1497    */
1498   struct GNUNET_ShortHashCode msg_uuid;
1499
1500   /**
1501    * Counter incremented per generated fragment.
1502    */
1503   uint32_t frag_uuidgen;
1504
1505   /**
1506    * Type of the pending message.
1507    */
1508   enum PendingMessageType pmt;
1509
1510   /**
1511    * Size of the original message.
1512    */
1513   uint16_t bytes_msg;
1514
1515   /**
1516    * Offset at which we should generate the next fragment.
1517    */
1518   uint16_t frag_off;
1519
1520   /**
1521    * #GNUNET_YES once @e msg_uuid was initialized
1522    */
1523   int16_t msg_uuid_set;
1524
1525   /* Followed by @e bytes_msg to transmit */
1526 };
1527
1528
1529 /**
1530  * One of the addresses of this peer.
1531  */
1532 struct AddressListEntry
1533 {
1534
1535   /**
1536    * Kept in a DLL.
1537    */
1538   struct AddressListEntry *next;
1539
1540   /**
1541    * Kept in a DLL.
1542    */
1543   struct AddressListEntry *prev;
1544
1545   /**
1546    * Which communicator provides this address?
1547    */
1548   struct TransportClient *tc;
1549
1550   /**
1551    * The actual address.
1552    */
1553   const char *address;
1554
1555   /**
1556    * Current context for storing this address in the peerstore.
1557    */
1558   struct GNUNET_PEERSTORE_StoreContext *sc;
1559
1560   /**
1561    * Task to periodically do @e st operation.
1562    */
1563   struct GNUNET_SCHEDULER_Task *st;
1564
1565   /**
1566    * What is a typical lifetime the communicator expects this
1567    * address to have? (Always from now.)
1568    */
1569   struct GNUNET_TIME_Relative expiration;
1570
1571   /**
1572    * Address identifier used by the communicator.
1573    */
1574   uint32_t aid;
1575
1576   /**
1577    * Network type offered by this address.
1578    */
1579   enum GNUNET_NetworkType nt;
1580
1581 };
1582
1583
1584 /**
1585  * Client connected to the transport service.
1586  */
1587 struct TransportClient
1588 {
1589
1590   /**
1591    * Kept in a DLL.
1592    */
1593   struct TransportClient *next;
1594
1595   /**
1596    * Kept in a DLL.
1597    */
1598   struct TransportClient *prev;
1599
1600   /**
1601    * Handle to the client.
1602    */
1603   struct GNUNET_SERVICE_Client *client;
1604
1605   /**
1606    * Message queue to the client.
1607    */
1608   struct GNUNET_MQ_Handle *mq;
1609
1610   /**
1611    * What type of client is this?
1612    */
1613   enum ClientType type;
1614
1615   union
1616   {
1617
1618     /**
1619      * Information for @e type #CT_CORE.
1620      */
1621     struct {
1622
1623       /**
1624        * Head of list of messages pending for this client, sorted by
1625        * transmission time ("next_attempt" + possibly internal prioritization).
1626        */
1627       struct PendingMessage *pending_msg_head;
1628
1629       /**
1630        * Tail of list of messages pending for this client.
1631        */
1632       struct PendingMessage *pending_msg_tail;
1633
1634     } core;
1635
1636     /**
1637      * Information for @e type #CT_MONITOR.
1638      */
1639     struct {
1640
1641       /**
1642        * Peer identity to monitor the addresses of.
1643        * Zero to monitor all neighbours.  Valid if
1644        * @e type is #CT_MONITOR.
1645        */
1646       struct GNUNET_PeerIdentity peer;
1647
1648       /**
1649        * Is this a one-shot monitor?
1650        */
1651       int one_shot;
1652
1653     } monitor;
1654
1655
1656     /**
1657      * Information for @e type #CT_COMMUNICATOR.
1658      */
1659     struct {
1660       /**
1661        * If @e type is #CT_COMMUNICATOR, this communicator
1662        * supports communicating using these addresses.
1663        */
1664       char *address_prefix;
1665
1666       /**
1667        * Head of DLL of queues offered by this communicator.
1668        */
1669       struct Queue *queue_head;
1670
1671       /**
1672        * Tail of DLL of queues offered by this communicator.
1673        */
1674       struct Queue *queue_tail;
1675
1676       /**
1677        * Head of list of the addresses of this peer offered by this communicator.
1678        */
1679       struct AddressListEntry *addr_head;
1680
1681       /**
1682        * Tail of list of the addresses of this peer offered by this communicator.
1683        */
1684       struct AddressListEntry *addr_tail;
1685
1686       /**
1687        * Number of queue entries in all queues to this communicator. Used
1688        * throttle sending to a communicator if we see that the communicator
1689        * is globally unable to keep up.
1690        */
1691       unsigned int total_queue_length;
1692
1693       /**
1694        * Characteristics of this communicator.
1695        */
1696       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1697
1698     } communicator;
1699
1700     /**
1701      * Information for @e type #CT_APPLICATION
1702      */
1703     struct {
1704
1705       /**
1706        * Map of requests for peers the given client application would like to
1707        * see connections for.  Maps from PIDs to `struct PeerRequest`.
1708        */
1709       struct GNUNET_CONTAINER_MultiPeerMap *requests;
1710
1711     } application;
1712
1713   } details;
1714
1715 };
1716
1717
1718 /**
1719  * State we keep for validation activities.  Each of these
1720  * is both in the #validation_heap and the #validation_map.
1721  */
1722 struct ValidationState
1723 {
1724
1725   /**
1726    * For which peer is @a address to be validated (or possibly valid)?
1727    * Serves as key in the #validation_map.
1728    */
1729   struct GNUNET_PeerIdentity pid;
1730
1731   /**
1732    * How long did the peer claim this @e address to be valid? Capped at
1733    * minimum of #MAX_ADDRESS_VALID_UNTIL relative to the time where we last
1734    * were told about the address and the value claimed by the other peer at
1735    * that time.  May be updated similarly when validation succeeds.
1736    */
1737   struct GNUNET_TIME_Absolute valid_until;
1738
1739   /**
1740    * How long do *we* consider this @e address to be valid?
1741    * In the past or zero if we have not yet validated it.
1742    */
1743   struct GNUNET_TIME_Absolute validated_until;
1744
1745   /**
1746    * When did we FIRST use the current @e challenge in a message?
1747    * Used to sanity-check @code{origin_time} in the response when
1748    * calculating the RTT. If the @code{origin_time} is not in
1749    * the expected range, the response is discarded as malicious.
1750    */
1751   struct GNUNET_TIME_Absolute first_challenge_use;
1752
1753   /**
1754    * When did we LAST use the current @e challenge in a message?
1755    * Used to sanity-check @code{origin_time} in the response when
1756    * calculating the RTT.  If the @code{origin_time} is not in
1757    * the expected range, the response is discarded as malicious.
1758    */
1759   struct GNUNET_TIME_Absolute last_challenge_use;
1760
1761   /**
1762    * Next time we will send the @e challenge to the peer, if this time is past
1763    * @e valid_until, this validation state is released at this time.  If the
1764    * address is valid, @e next_challenge is set to @e validated_until MINUS @e
1765    * validation_delay * #VALIDATION_RTT_BUFFER_FACTOR, such that we will try
1766    * to re-validate before the validity actually expires.
1767    */
1768   struct GNUNET_TIME_Absolute next_challenge;
1769
1770   /**
1771    * Current backoff factor we're applying for sending the @a challenge.
1772    * Reset to 0 if the @a challenge is confirmed upon validation.
1773    * Reduced to minimum of #FAST_VALIDATION_CHALLENGE_FREQ and half of the
1774    * existing value if we receive an unvalidated address again over
1775    * another channel (and thus should consider the information "fresh").
1776    * Maximum is #MAX_VALIDATION_CHALLENGE_FREQ.
1777    */
1778   struct GNUNET_TIME_Relative challenge_backoff;
1779
1780   /**
1781    * Initially set to "forever". Once @e validated_until is set, this value is
1782    * set to the RTT that tells us how long it took to receive the validation.
1783    */
1784   struct GNUNET_TIME_Relative validation_rtt;
1785
1786   /**
1787    * The challenge we sent to the peer to get it to validate the address. Note
1788    * that we rotate the challenge whenever we update @e validated_until to
1789    * avoid attacks where a peer simply replays an old challenge in the future.
1790    * (We must not rotate more often as otherwise we may discard valid answers
1791    * due to packet losses, latency and reorderings on the network).
1792    */
1793   struct GNUNET_ShortHashCode challenge;
1794
1795   /**
1796    * Claimed address of the peer.
1797    */
1798   char *address;
1799
1800   /**
1801    * Entry in the #validation_heap, which is sorted by @e next_challenge. The
1802    * heap is used to figure out when the next validation activity should be
1803    * run.
1804    */
1805   struct GNUNET_CONTAINER_HeapNode *hn;
1806
1807   /**
1808    * Handle to a PEERSTORE store operation for this @e address.  NULL if
1809    * no PEERSTORE operation is pending.
1810    */
1811   struct GNUNET_PEERSTORE_StoreContext *sc;
1812
1813   /**
1814    * We are technically ready to send the challenge, but we are waiting for
1815    * the respective queue to become available for transmission.
1816    */
1817   int awaiting_queue;
1818
1819 };
1820
1821
1822 /**
1823  * Head of linked list of all clients to this service.
1824  */
1825 static struct TransportClient *clients_head;
1826
1827 /**
1828  * Tail of linked list of all clients to this service.
1829  */
1830 static struct TransportClient *clients_tail;
1831
1832 /**
1833  * Statistics handle.
1834  */
1835 static struct GNUNET_STATISTICS_Handle *GST_stats;
1836
1837 /**
1838  * Configuration handle.
1839  */
1840 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1841
1842 /**
1843  * Our public key.
1844  */
1845 static struct GNUNET_PeerIdentity GST_my_identity;
1846
1847 /**
1848  * Our private key.
1849  */
1850 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1851
1852 /**
1853  * Map from PIDs to `struct Neighbour` entries.  A peer is
1854  * a neighbour if we have an MQ to it from some communicator.
1855  */
1856 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1857
1858 /**
1859  * Map from PIDs to `struct DistanceVector` entries describing
1860  * known paths to the peer.
1861  */
1862 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1863
1864 /**
1865  * Map from PIDs to `struct ValidationState` entries describing
1866  * addresses we are aware of and their validity state.
1867  */
1868 static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
1869
1870 /**
1871  * Map from challenges to `struct LearnLaunchEntry` values.
1872  */
1873 static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map;
1874
1875 /**
1876  * Head of a DLL sorted by launch time.
1877  */
1878 static struct LearnLaunchEntry *lle_head;
1879
1880 /**
1881  * Tail of a DLL sorted by launch time.
1882  */
1883 static struct LearnLaunchEntry *lle_tail;
1884
1885 /**
1886  * MIN Heap sorted by "next_challenge" to `struct ValidationState` entries
1887  * sorting addresses we are aware of by when we should next try to (re)validate
1888  * (or expire) them.
1889  */
1890 static struct GNUNET_CONTAINER_Heap *validation_heap;
1891
1892 /**
1893  * Database for peer's HELLOs.
1894  */
1895 static struct GNUNET_PEERSTORE_Handle *peerstore;
1896
1897 /**
1898  * Heap sorting `struct EphemeralCacheEntry` by their
1899  * key/signature validity.
1900  */
1901 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1902
1903 /**
1904  * Hash map for looking up `struct EphemeralCacheEntry`s
1905  * by peer identity. (We may have ephemerals in our
1906  * cache for which we do not have a neighbour entry,
1907  * and similar many neighbours may not need ephemerals,
1908  * so we use a second map.)
1909  */
1910 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1911
1912 /**
1913  * Task to free expired ephemerals.
1914  */
1915 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
1916
1917 /**
1918  * Task run to initiate DV learning.
1919  */
1920 static struct GNUNET_SCHEDULER_Task *dvlearn_task;
1921
1922 /**
1923  * Task to run address validation.
1924  */
1925 static struct GNUNET_SCHEDULER_Task *validation_task;
1926
1927
1928 /**
1929  * Free cached ephemeral key.
1930  *
1931  * @param ece cached signature to free
1932  */
1933 static void
1934 free_ephemeral (struct EphemeralCacheEntry *ece)
1935 {
1936   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1937                                         &ece->target,
1938                                         ece);
1939   GNUNET_CONTAINER_heap_remove_node (ece->hn);
1940   GNUNET_free (ece);
1941 }
1942
1943
1944 /**
1945  * Free validation state.
1946  *
1947  * @param vs validation state to free
1948  */
1949 static void
1950 free_validation_state (struct ValidationState *vs)
1951 {
1952   GNUNET_CONTAINER_multipeermap_remove (validation_map,
1953                                         &vs->pid,
1954                                         vs);
1955   GNUNET_CONTAINER_heap_remove_node (vs->hn);
1956   vs->hn = NULL;
1957   if (NULL != vs->sc)
1958   {
1959     GNUNET_PEERSTORE_store_cancel (vs->sc);
1960     vs->sc = NULL;
1961   }
1962   GNUNET_free (vs->address);
1963   GNUNET_free (vs);
1964 }
1965
1966
1967 /**
1968  * Lookup neighbour record for peer @a pid.
1969  *
1970  * @param pid neighbour to look for
1971  * @return NULL if we do not have this peer as a neighbour
1972  */
1973 static struct Neighbour *
1974 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1975 {
1976   return GNUNET_CONTAINER_multipeermap_get (neighbours,
1977                                             pid);
1978 }
1979
1980
1981 /**
1982  * Details about what to notify monitors about.
1983  */
1984 struct MonitorEvent
1985 {
1986   /**
1987    * @deprecated To be discussed if we keep these...
1988    */
1989   struct GNUNET_TIME_Absolute last_validation;
1990   struct GNUNET_TIME_Absolute valid_until;
1991   struct GNUNET_TIME_Absolute next_validation;
1992
1993   /**
1994    * Current round-trip time estimate.
1995    */
1996   struct GNUNET_TIME_Relative rtt;
1997
1998   /**
1999    * Connection status.
2000    */
2001   enum GNUNET_TRANSPORT_ConnectionStatus cs;
2002
2003   /**
2004    * Messages pending.
2005    */
2006   uint32_t num_msg_pending;
2007
2008   /**
2009    * Bytes pending.
2010    */
2011   uint32_t num_bytes_pending;
2012
2013
2014 };
2015
2016
2017 /**
2018  * Free a @dvh. Callers MAY want to check if this was the last path to the
2019  * `target`, and if so call #free_dv_route to also free the associated DV
2020  * entry in #dv_routes (if not, the associated scheduler job should eventually
2021  * take care of it).
2022  *
2023  * @param dvh hop to free
2024  */
2025 static void
2026 free_distance_vector_hop (struct DistanceVectorHop *dvh)
2027 {
2028   struct Neighbour *n = dvh->next_hop;
2029   struct DistanceVector *dv = dvh->dv;
2030
2031   GNUNET_CONTAINER_MDLL_remove (neighbour,
2032                                 n->dv_head,
2033                                 n->dv_tail,
2034                                 dvh);
2035   GNUNET_CONTAINER_MDLL_remove (dv,
2036                                 dv->dv_head,
2037                                 dv->dv_tail,
2038                                 dvh);
2039   GNUNET_free (dvh);
2040 }
2041
2042
2043 /**
2044  * Free entry in #dv_routes.  First frees all hops to the target, and
2045  * if there are no entries left, frees @a dv as well.
2046  *
2047  * @param dv route to free
2048  */
2049 static void
2050 free_dv_route (struct DistanceVector *dv)
2051 {
2052   struct DistanceVectorHop *dvh;
2053
2054   while (NULL != (dvh = dv->dv_head))
2055     free_distance_vector_hop (dvh);
2056   if (NULL == dv->dv_head)
2057   {
2058     GNUNET_assert (GNUNET_YES ==
2059                    GNUNET_CONTAINER_multipeermap_remove (dv_routes,
2060                                                          &dv->target,
2061                                                          dv));
2062     if (NULL != dv->timeout_task)
2063       GNUNET_SCHEDULER_cancel (dv->timeout_task);
2064     GNUNET_free (dv);
2065   }
2066 }
2067
2068
2069 /**
2070  * Notify monitor @a tc about an event.  That @a tc
2071  * cares about the event has already been checked.
2072  *
2073  * Send @a tc information in @a me about a @a peer's status with
2074  * respect to some @a address to all monitors that care.
2075  *
2076  * @param tc monitor to inform
2077  * @param peer peer the information is about
2078  * @param address address the information is about
2079  * @param nt network type associated with @a address
2080  * @param me detailed information to transmit
2081  */
2082 static void
2083 notify_monitor (struct TransportClient *tc,
2084                 const struct GNUNET_PeerIdentity *peer,
2085                 const char *address,
2086                 enum GNUNET_NetworkType nt,
2087                 const struct MonitorEvent *me)
2088 {
2089   struct GNUNET_MQ_Envelope *env;
2090   struct GNUNET_TRANSPORT_MonitorData *md;
2091   size_t addr_len = strlen (address) + 1;
2092
2093   env = GNUNET_MQ_msg_extra (md,
2094                              addr_len,
2095                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
2096   md->nt = htonl ((uint32_t) nt);
2097   md->peer = *peer;
2098   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
2099   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
2100   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
2101   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
2102   md->cs = htonl ((uint32_t) me->cs);
2103   md->num_msg_pending = htonl (me->num_msg_pending);
2104   md->num_bytes_pending = htonl (me->num_bytes_pending);
2105   memcpy (&md[1],
2106           address,
2107           addr_len);
2108   GNUNET_MQ_send (tc->mq,
2109                   env);
2110 }
2111
2112
2113 /**
2114  * Send information in @a me about a @a peer's status with respect
2115  * to some @a address to all monitors that care.
2116  *
2117  * @param peer peer the information is about
2118  * @param address address the information is about
2119  * @param nt network type associated with @a address
2120  * @param me detailed information to transmit
2121  */
2122 static void
2123 notify_monitors (const struct GNUNET_PeerIdentity *peer,
2124                  const char *address,
2125                  enum GNUNET_NetworkType nt,
2126                  const struct MonitorEvent *me)
2127 {
2128   for (struct TransportClient *tc = clients_head;
2129        NULL != tc;
2130        tc = tc->next)
2131   {
2132     if (CT_MONITOR != tc->type)
2133       continue;
2134     if (tc->details.monitor.one_shot)
2135       continue;
2136     if ( (0 != GNUNET_is_zero (&tc->details.monitor.peer)) &&
2137          (0 != GNUNET_memcmp (&tc->details.monitor.peer,
2138                               peer)) )
2139       continue;
2140     notify_monitor (tc,
2141                     peer,
2142                     address,
2143                     nt,
2144                     me);
2145   }
2146 }
2147
2148
2149 /**
2150  * Called whenever a client connects.  Allocates our
2151  * data structures associated with that client.
2152  *
2153  * @param cls closure, NULL
2154  * @param client identification of the client
2155  * @param mq message queue for the client
2156  * @return our `struct TransportClient`
2157  */
2158 static void *
2159 client_connect_cb (void *cls,
2160                    struct GNUNET_SERVICE_Client *client,
2161                    struct GNUNET_MQ_Handle *mq)
2162 {
2163   struct TransportClient *tc;
2164
2165   (void) cls;
2166   tc = GNUNET_new (struct TransportClient);
2167   tc->client = client;
2168   tc->mq = mq;
2169   GNUNET_CONTAINER_DLL_insert (clients_head,
2170                                clients_tail,
2171                                tc);
2172   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2173               "Client %p connected\n",
2174               tc);
2175   return tc;
2176 }
2177
2178
2179 /**
2180  * Free @a rc
2181  *
2182  * @param rc data structure to free
2183  */
2184 static void
2185 free_reassembly_context (struct ReassemblyContext *rc)
2186 {
2187   struct Neighbour *n = rc->neighbour;
2188
2189   GNUNET_assert (rc ==
2190                  GNUNET_CONTAINER_heap_remove_node (rc->hn));
2191   GNUNET_assert (GNUNET_OK ==
2192                  GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
2193                                                         &rc->msg_uuid,
2194                                                         rc));
2195   GNUNET_free (rc);
2196 }
2197
2198
2199 /**
2200  * Task run to clean up reassembly context of a neighbour that have expired.
2201  *
2202  * @param cls a `struct Neighbour`
2203  */
2204 static void
2205 reassembly_cleanup_task (void *cls)
2206 {
2207   struct Neighbour *n = cls;
2208   struct ReassemblyContext *rc;
2209
2210   n->reassembly_timeout_task = NULL;
2211   while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
2212   {
2213     if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
2214     {
2215       free_reassembly_context (rc);
2216       continue;
2217     }
2218     GNUNET_assert (NULL == n->reassembly_timeout_task);
2219     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
2220                                                           &reassembly_cleanup_task,
2221                                                           n);
2222     return;
2223   }
2224 }
2225
2226
2227 /**
2228  * function called to #free_reassembly_context().
2229  *
2230  * @param cls NULL
2231  * @param key unused
2232  * @param value a `struct ReassemblyContext` to free
2233  * @return #GNUNET_OK (continue iteration)
2234  */
2235 static int
2236 free_reassembly_cb (void *cls,
2237                     const struct GNUNET_ShortHashCode *key,
2238                     void *value)
2239 {
2240   struct ReassemblyContext *rc = value;
2241   (void) cls;
2242   (void) key;
2243
2244   free_reassembly_context (rc);
2245   return GNUNET_OK;
2246 }
2247
2248
2249 /**
2250  * Release memory used by @a neighbour.
2251  *
2252  * @param neighbour neighbour entry to free
2253  */
2254 static void
2255 free_neighbour (struct Neighbour *neighbour)
2256 {
2257   struct DistanceVectorHop *dvh;
2258
2259   GNUNET_assert (NULL == neighbour->queue_head);
2260   GNUNET_assert (GNUNET_YES ==
2261                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
2262                                                        &neighbour->pid,
2263                                                        neighbour));
2264   if (NULL != neighbour->timeout_task)
2265     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
2266   if (NULL != neighbour->reassembly_map)
2267   {
2268     GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
2269                                             &free_reassembly_cb,
2270                                             NULL);
2271     GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
2272     neighbour->reassembly_map = NULL;
2273     GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
2274     neighbour->reassembly_heap = NULL;
2275   }
2276   while (NULL != (dvh = neighbour->dv_head))
2277   {
2278     struct DistanceVector *dv = dvh->dv;
2279
2280     free_distance_vector_hop (dvh);
2281     if (NULL == dv->dv_head)
2282       free_dv_route (dv);
2283   }
2284   if (NULL != neighbour->reassembly_timeout_task)
2285     GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
2286   GNUNET_free (neighbour);
2287 }
2288
2289
2290 /**
2291  * Send message to CORE clients that we lost a connection.
2292  *
2293  * @param tc client to inform (must be CORE client)
2294  * @param pid peer the connection is for
2295  * @param quota_out current quota for the peer
2296  */
2297 static void
2298 core_send_connect_info (struct TransportClient *tc,
2299                         const struct GNUNET_PeerIdentity *pid,
2300                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2301 {
2302   struct GNUNET_MQ_Envelope *env;
2303   struct ConnectInfoMessage *cim;
2304
2305   GNUNET_assert (CT_CORE == tc->type);
2306   env = GNUNET_MQ_msg (cim,
2307                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2308   cim->quota_out = quota_out;
2309   cim->id = *pid;
2310   GNUNET_MQ_send (tc->mq,
2311                   env);
2312 }
2313
2314
2315 /**
2316  * Send message to CORE clients that we gained a connection
2317  *
2318  * @param pid peer the queue was for
2319  * @param quota_out current quota for the peer
2320  */
2321 static void
2322 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
2323                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2324 {
2325   for (struct TransportClient *tc = clients_head;
2326        NULL != tc;
2327        tc = tc->next)
2328   {
2329     if (CT_CORE != tc->type)
2330       continue;
2331     core_send_connect_info (tc,
2332                             pid,
2333                             quota_out);
2334   }
2335 }
2336
2337
2338 /**
2339  * Send message to CORE clients that we lost a connection.
2340  *
2341  * @param pid peer the connection was for
2342  */
2343 static void
2344 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
2345 {
2346   for (struct TransportClient *tc = clients_head;
2347        NULL != tc;
2348        tc = tc->next)
2349   {
2350     struct GNUNET_MQ_Envelope *env;
2351     struct DisconnectInfoMessage *dim;
2352
2353     if (CT_CORE != tc->type)
2354       continue;
2355     env = GNUNET_MQ_msg (dim,
2356                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
2357     dim->peer = *pid;
2358     GNUNET_MQ_send (tc->mq,
2359                     env);
2360   }
2361 }
2362
2363
2364 /**
2365  * We believe we are ready to transmit a message on a queue. Double-checks
2366  * with the queue's "tracker_out" and then gives the message to the
2367  * communicator for transmission (updating the tracker, and re-scheduling
2368  * itself if applicable).
2369  *
2370  * @param cls the `struct Queue` to process transmissions for
2371  */
2372 static void
2373 transmit_on_queue (void *cls);
2374
2375
2376 /**
2377  * Schedule next run of #transmit_on_queue().  Does NOTHING if
2378  * we should run immediately or if the message queue is empty.
2379  * Test for no task being added AND queue not being empty to
2380  * transmit immediately afterwards!  This function must only
2381  * be called if the message queue is non-empty!
2382  *
2383  * @param queue the queue to do scheduling for
2384  */
2385 static void
2386 schedule_transmit_on_queue (struct Queue *queue)
2387 {
2388   struct Neighbour *n = queue->neighbour;
2389   struct PendingMessage *pm = n->pending_msg_head;
2390   struct GNUNET_TIME_Relative out_delay;
2391   unsigned int wsize;
2392
2393   GNUNET_assert (NULL != pm);
2394   if (queue->tc->details.communicator.total_queue_length >=
2395       COMMUNICATOR_TOTAL_QUEUE_LIMIT)
2396   {
2397     GNUNET_STATISTICS_update (GST_stats,
2398                               "# Transmission throttled due to communicator queue limit",
2399                               1,
2400                               GNUNET_NO);
2401     return;
2402   }
2403   if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
2404   {
2405     GNUNET_STATISTICS_update (GST_stats,
2406                               "# Transmission throttled due to queue queue limit",
2407                               1,
2408                               GNUNET_NO);
2409     return;
2410   }
2411
2412   wsize = (0 == queue->mtu)
2413     ? pm->bytes_msg /* FIXME: add overheads? */
2414     : queue->mtu;
2415   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
2416                                                   wsize);
2417   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
2418                                         out_delay);
2419   if (0 == out_delay.rel_value_us)
2420     return; /* we should run immediately! */
2421   /* queue has changed since we were scheduled, reschedule again */
2422   queue->transmit_task
2423     = GNUNET_SCHEDULER_add_delayed (out_delay,
2424                                     &transmit_on_queue,
2425                                     queue);
2426   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
2427     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2428                 "Next transmission on queue `%s' in %s (high delay)\n",
2429                 queue->address,
2430                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2431                                                         GNUNET_YES));
2432   else
2433     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2434                 "Next transmission on queue `%s' in %s\n",
2435                 queue->address,
2436                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2437                                                         GNUNET_YES));
2438 }
2439
2440
2441 /**
2442  * Free @a queue.
2443  *
2444  * @param queue the queue to free
2445  */
2446 static void
2447 free_queue (struct Queue *queue)
2448 {
2449   struct Neighbour *neighbour = queue->neighbour;
2450   struct TransportClient *tc = queue->tc;
2451   struct MonitorEvent me = {
2452     .cs = GNUNET_TRANSPORT_CS_DOWN,
2453     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
2454   };
2455   struct QueueEntry *qe;
2456   int maxxed;
2457
2458   if (NULL != queue->transmit_task)
2459   {
2460     GNUNET_SCHEDULER_cancel (queue->transmit_task);
2461     queue->transmit_task = NULL;
2462   }
2463   GNUNET_CONTAINER_MDLL_remove (neighbour,
2464                                 neighbour->queue_head,
2465                                 neighbour->queue_tail,
2466                                 queue);
2467   GNUNET_CONTAINER_MDLL_remove (client,
2468                                 tc->details.communicator.queue_head,
2469                                 tc->details.communicator.queue_tail,
2470                                 queue);
2471   maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
2472   while (NULL != (qe = queue->queue_head))
2473   {
2474     GNUNET_CONTAINER_DLL_remove (queue->queue_head,
2475                                  queue->queue_tail,
2476                                  qe);
2477     queue->queue_length--;
2478     tc->details.communicator.total_queue_length--;
2479     GNUNET_free (qe);
2480   }
2481   GNUNET_assert (0 == queue->queue_length);
2482   if ( (maxxed) &&
2483        (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2484   {
2485     /* Communicator dropped below threshold, resume all queues */
2486     GNUNET_STATISTICS_update (GST_stats,
2487                               "# Transmission throttled due to communicator queue limit",
2488                               -1,
2489                               GNUNET_NO);
2490     for (struct Queue *s = tc->details.communicator.queue_head;
2491          NULL != s;
2492          s = s->next_client)
2493       schedule_transmit_on_queue (s);
2494   }
2495   notify_monitors (&neighbour->pid,
2496                    queue->address,
2497                    queue->nt,
2498                    &me);
2499   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2500   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2501   GNUNET_free (queue);
2502   if (NULL == neighbour->queue_head)
2503   {
2504     cores_send_disconnect_info (&neighbour->pid);
2505     free_neighbour (neighbour);
2506   }
2507 }
2508
2509
2510 /**
2511  * Free @a ale
2512  *
2513  * @param ale address list entry to free
2514  */
2515 static void
2516 free_address_list_entry (struct AddressListEntry *ale)
2517 {
2518   struct TransportClient *tc = ale->tc;
2519
2520   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2521                                tc->details.communicator.addr_tail,
2522                                ale);
2523   if (NULL != ale->sc)
2524   {
2525     GNUNET_PEERSTORE_store_cancel (ale->sc);
2526     ale->sc = NULL;
2527   }
2528   if (NULL != ale->st)
2529   {
2530     GNUNET_SCHEDULER_cancel (ale->st);
2531     ale->st = NULL;
2532   }
2533   GNUNET_free (ale);
2534 }
2535
2536
2537 /**
2538  * Stop the peer request in @a value.
2539  *
2540  * @param cls a `struct TransportClient` that no longer makes the request
2541  * @param pid the peer's identity
2542  * @param value a `struct PeerRequest`
2543  * @return #GNUNET_YES (always)
2544  */
2545 static int
2546 stop_peer_request (void *cls,
2547                    const struct GNUNET_PeerIdentity *pid,
2548                    void *value)
2549 {
2550   struct TransportClient *tc = cls;
2551   struct PeerRequest *pr = value;
2552
2553   GNUNET_PEERSTORE_watch_cancel (pr->wc);
2554   GNUNET_assert (GNUNET_YES ==
2555                  GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
2556                                                        pid,
2557                                                        pr));
2558   GNUNET_free (pr);
2559
2560   return GNUNET_OK;
2561 }
2562
2563
2564 /**
2565  * Called whenever a client is disconnected.  Frees our
2566  * resources associated with that client.
2567  *
2568  * @param cls closure, NULL
2569  * @param client identification of the client
2570  * @param app_ctx our `struct TransportClient`
2571  */
2572 static void
2573 client_disconnect_cb (void *cls,
2574                       struct GNUNET_SERVICE_Client *client,
2575                       void *app_ctx)
2576 {
2577   struct TransportClient *tc = app_ctx;
2578
2579   (void) cls;
2580   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2581               "Client %p disconnected, cleaning up.\n",
2582               tc);
2583   GNUNET_CONTAINER_DLL_remove (clients_head,
2584                                clients_tail,
2585                                tc);
2586   switch (tc->type)
2587   {
2588   case CT_NONE:
2589     break;
2590   case CT_CORE:
2591     {
2592       struct PendingMessage *pm;
2593
2594       while (NULL != (pm = tc->details.core.pending_msg_head))
2595       {
2596         GNUNET_CONTAINER_MDLL_remove (client,
2597                                       tc->details.core.pending_msg_head,
2598                                       tc->details.core.pending_msg_tail,
2599                                       pm);
2600         pm->client = NULL;
2601       }
2602     }
2603     break;
2604   case CT_MONITOR:
2605     break;
2606   case CT_COMMUNICATOR:
2607     {
2608       struct Queue *q;
2609       struct AddressListEntry *ale;
2610
2611       while (NULL != (q = tc->details.communicator.queue_head))
2612         free_queue (q);
2613       while (NULL != (ale = tc->details.communicator.addr_head))
2614         free_address_list_entry (ale);
2615       GNUNET_free (tc->details.communicator.address_prefix);
2616     }
2617     break;
2618   case CT_APPLICATION:
2619     GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
2620                                            &stop_peer_request,
2621                                            tc);
2622     GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
2623     break;
2624   }
2625   GNUNET_free (tc);
2626 }
2627
2628
2629 /**
2630  * Iterator telling new CORE client about all existing
2631  * connections to peers.
2632  *
2633  * @param cls the new `struct TransportClient`
2634  * @param pid a connected peer
2635  * @param value the `struct Neighbour` with more information
2636  * @return #GNUNET_OK (continue to iterate)
2637  */
2638 static int
2639 notify_client_connect_info (void *cls,
2640                             const struct GNUNET_PeerIdentity *pid,
2641                             void *value)
2642 {
2643   struct TransportClient *tc = cls;
2644   struct Neighbour *neighbour = value;
2645
2646   core_send_connect_info (tc,
2647                           pid,
2648                           neighbour->quota_out);
2649   return GNUNET_OK;
2650 }
2651
2652
2653 /**
2654  * Initialize a "CORE" client.  We got a start message from this
2655  * client, so add it to the list of clients for broadcasting of
2656  * inbound messages.
2657  *
2658  * @param cls the client
2659  * @param start the start message that was sent
2660  */
2661 static void
2662 handle_client_start (void *cls,
2663                      const struct StartMessage *start)
2664 {
2665   struct TransportClient *tc = cls;
2666   uint32_t options;
2667
2668   options = ntohl (start->options);
2669   if ( (0 != (1 & options)) &&
2670        (0 !=
2671         GNUNET_memcmp (&start->self,
2672                        &GST_my_identity)) )
2673   {
2674     /* client thinks this is a different peer, reject */
2675     GNUNET_break (0);
2676     GNUNET_SERVICE_client_drop (tc->client);
2677     return;
2678   }
2679   if (CT_NONE != tc->type)
2680   {
2681     GNUNET_break (0);
2682     GNUNET_SERVICE_client_drop (tc->client);
2683     return;
2684   }
2685   tc->type = CT_CORE;
2686   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2687                                          &notify_client_connect_info,
2688                                          tc);
2689   GNUNET_SERVICE_client_continue (tc->client);
2690 }
2691
2692
2693 /**
2694  * Client asked for transmission to a peer.  Process the request.
2695  *
2696  * @param cls the client
2697  * @param obm the send message that was sent
2698  */
2699 static int
2700 check_client_send (void *cls,
2701                    const struct OutboundMessage *obm)
2702 {
2703   struct TransportClient *tc = cls;
2704   uint16_t size;
2705   const struct GNUNET_MessageHeader *obmm;
2706
2707   if (CT_CORE != tc->type)
2708   {
2709     GNUNET_break (0);
2710     return GNUNET_SYSERR;
2711   }
2712   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2713   if (size < sizeof (struct GNUNET_MessageHeader))
2714   {
2715     GNUNET_break (0);
2716     return GNUNET_SYSERR;
2717   }
2718   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2719   if (size != ntohs (obmm->size))
2720   {
2721     GNUNET_break (0);
2722     return GNUNET_SYSERR;
2723   }
2724   return GNUNET_OK;
2725 }
2726
2727
2728 /**
2729  * Free fragment tree below @e root, excluding @e root itself.
2730  *
2731  * @param root root of the tree to free
2732  */
2733 static void
2734 free_fragment_tree (struct PendingMessage *root)
2735 {
2736   struct PendingMessage *frag;
2737
2738   while (NULL != (frag = root->head_frag))
2739   {
2740     free_fragment_tree (frag);
2741     GNUNET_CONTAINER_MDLL_remove (frag,
2742                                   root->head_frag,
2743                                   root->tail_frag,
2744                                   frag);
2745     GNUNET_free (frag);
2746   }
2747 }
2748
2749
2750 /**
2751  * Release memory associated with @a pm and remove @a pm from associated
2752  * data structures.  @a pm must be a top-level pending message and not
2753  * a fragment in the tree.  The entire tree is freed (if applicable).
2754  *
2755  * @param pm the pending message to free
2756  */
2757 static void
2758 free_pending_message (struct PendingMessage *pm)
2759 {
2760   struct TransportClient *tc = pm->client;
2761   struct Neighbour *target = pm->target;
2762
2763   if (NULL != tc)
2764   {
2765     GNUNET_CONTAINER_MDLL_remove (client,
2766                                   tc->details.core.pending_msg_head,
2767                                   tc->details.core.pending_msg_tail,
2768                                   pm);
2769   }
2770   GNUNET_CONTAINER_MDLL_remove (neighbour,
2771                                 target->pending_msg_head,
2772                                 target->pending_msg_tail,
2773                                 pm);
2774   free_fragment_tree (pm);
2775   GNUNET_free_non_null (pm->bpm);
2776   GNUNET_free (pm);
2777 }
2778
2779
2780 /**
2781  * Send a response to the @a pm that we have processed a
2782  * "send" request with status @a success. We
2783  * transmitted @a bytes_physical on the actual wire.
2784  * Sends a confirmation to the "core" client responsible
2785  * for the original request and free's @a pm.
2786  *
2787  * @param pm handle to the original pending message
2788  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2789  *          for transmission failure
2790  * @param bytes_physical amount of bandwidth consumed
2791  */
2792 static void
2793 client_send_response (struct PendingMessage *pm,
2794                       int success,
2795                       uint32_t bytes_physical)
2796 {
2797   struct TransportClient *tc = pm->client;
2798   struct Neighbour *target = pm->target;
2799   struct GNUNET_MQ_Envelope *env;
2800   struct SendOkMessage *som;
2801
2802   if (NULL != tc)
2803   {
2804     env = GNUNET_MQ_msg (som,
2805                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2806     som->success = htonl ((uint32_t) success);
2807     som->bytes_msg = htons (pm->bytes_msg);
2808     som->bytes_physical = htonl (bytes_physical);
2809     som->peer = target->pid;
2810     GNUNET_MQ_send (tc->mq,
2811                     env);
2812   }
2813   free_pending_message (pm);
2814 }
2815
2816
2817 /**
2818  * Checks the message queue for a neighbour for messages that have timed
2819  * out and purges them.
2820  *
2821  * @param cls a `struct Neighbour`
2822  */
2823 static void
2824 check_queue_timeouts (void *cls)
2825 {
2826   struct Neighbour *n = cls;
2827   struct PendingMessage *pm;
2828   struct GNUNET_TIME_Absolute now;
2829   struct GNUNET_TIME_Absolute earliest_timeout;
2830
2831   n->timeout_task = NULL;
2832   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2833   now = GNUNET_TIME_absolute_get ();
2834   for (struct PendingMessage *pos = n->pending_msg_head;
2835        NULL != pos;
2836        pos = pm)
2837   {
2838     pm = pos->next_neighbour;
2839     if (pos->timeout.abs_value_us <= now.abs_value_us)
2840     {
2841       GNUNET_STATISTICS_update (GST_stats,
2842                                 "# messages dropped (timeout before confirmation)",
2843                                 1,
2844                                 GNUNET_NO);
2845       client_send_response (pm,
2846                             GNUNET_NO,
2847                             0);
2848       continue;
2849     }
2850     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2851                                                  pos->timeout);
2852   }
2853   n->earliest_timeout = earliest_timeout;
2854   if (NULL != n->pending_msg_head)
2855     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2856                                                &check_queue_timeouts,
2857                                                n);
2858 }
2859
2860
2861 /**
2862  * Client asked for transmission to a peer.  Process the request.
2863  *
2864  * @param cls the client
2865  * @param obm the send message that was sent
2866  */
2867 static void
2868 handle_client_send (void *cls,
2869                     const struct OutboundMessage *obm)
2870 {
2871   struct TransportClient *tc = cls;
2872   struct PendingMessage *pm;
2873   const struct GNUNET_MessageHeader *obmm;
2874   struct Neighbour *target;
2875   uint32_t bytes_msg;
2876   int was_empty;
2877
2878   GNUNET_assert (CT_CORE == tc->type);
2879   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2880   bytes_msg = ntohs (obmm->size);
2881   target = lookup_neighbour (&obm->peer);
2882   if (NULL == target)
2883   {
2884     /* Failure: don't have this peer as a neighbour (anymore).
2885        Might have gone down asynchronously, so this is NOT
2886        a protocol violation by CORE. Still count the event,
2887        as this should be rare. */
2888     struct GNUNET_MQ_Envelope *env;
2889     struct SendOkMessage *som;
2890
2891     env = GNUNET_MQ_msg (som,
2892                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2893     som->success = htonl (GNUNET_SYSERR);
2894     som->bytes_msg = htonl (bytes_msg);
2895     som->bytes_physical = htonl (0);
2896     som->peer = obm->peer;
2897     GNUNET_MQ_send (tc->mq,
2898                     env);
2899     GNUNET_SERVICE_client_continue (tc->client);
2900     GNUNET_STATISTICS_update (GST_stats,
2901                               "# messages dropped (neighbour unknown)",
2902                               1,
2903                               GNUNET_NO);
2904     return;
2905   }
2906   was_empty = (NULL == target->pending_msg_head);
2907   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2908   pm->client = tc;
2909   pm->target = target;
2910   pm->bytes_msg = bytes_msg;
2911   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2912   memcpy (&pm[1],
2913           &obm[1],
2914           bytes_msg);
2915   GNUNET_CONTAINER_MDLL_insert (neighbour,
2916                                 target->pending_msg_head,
2917                                 target->pending_msg_tail,
2918                                 pm);
2919   GNUNET_CONTAINER_MDLL_insert (client,
2920                                 tc->details.core.pending_msg_head,
2921                                 tc->details.core.pending_msg_tail,
2922                                 pm);
2923   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2924   {
2925     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2926     if (NULL != target->timeout_task)
2927       GNUNET_SCHEDULER_cancel (target->timeout_task);
2928     target->timeout_task
2929       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2930                                  &check_queue_timeouts,
2931                                  target);
2932   }
2933   if (! was_empty)
2934     return; /* all queues must already be busy */
2935   for (struct Queue *queue = target->queue_head;
2936        NULL != queue;
2937        queue = queue->next_neighbour)
2938   {
2939     /* try transmission on any queue that is idle */
2940     if (NULL == queue->transmit_task)
2941       queue->transmit_task = GNUNET_SCHEDULER_add_now (&transmit_on_queue,
2942                                                        queue);
2943   }
2944 }
2945
2946
2947 /**
2948  * Communicator started.  Test message is well-formed.
2949  *
2950  * @param cls the client
2951  * @param cam the send message that was sent
2952  */
2953 static int
2954 check_communicator_available (void *cls,
2955                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2956 {
2957   struct TransportClient *tc = cls;
2958   uint16_t size;
2959
2960   if (CT_NONE != tc->type)
2961   {
2962     GNUNET_break (0);
2963     return GNUNET_SYSERR;
2964   }
2965   tc->type = CT_COMMUNICATOR;
2966   size = ntohs (cam->header.size) - sizeof (*cam);
2967   if (0 == size)
2968     return GNUNET_OK; /* receive-only communicator */
2969   GNUNET_MQ_check_zero_termination (cam);
2970   return GNUNET_OK;
2971 }
2972
2973
2974 /**
2975  * Communicator started.  Process the request.
2976  *
2977  * @param cls the client
2978  * @param cam the send message that was sent
2979  */
2980 static void
2981 handle_communicator_available (void *cls,
2982                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2983 {
2984   struct TransportClient *tc = cls;
2985   uint16_t size;
2986
2987   size = ntohs (cam->header.size) - sizeof (*cam);
2988   if (0 == size)
2989     return; /* receive-only communicator */
2990   tc->details.communicator.address_prefix
2991     = GNUNET_strdup ((const char *) &cam[1]);
2992   tc->details.communicator.cc
2993     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2994   GNUNET_SERVICE_client_continue (tc->client);
2995 }
2996
2997
2998 /**
2999  * Communicator requests backchannel transmission.  Check the request.
3000  *
3001  * @param cls the client
3002  * @param cb the send message that was sent
3003  * @return #GNUNET_OK if message is well-formed
3004  */
3005 static int
3006 check_communicator_backchannel (void *cls,
3007                                 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
3008 {
3009   const struct GNUNET_MessageHeader *inbox;
3010   const char *is;
3011   uint16_t msize;
3012   uint16_t isize;
3013
3014   (void) cls;
3015   msize = ntohs (cb->header.size) - sizeof (*cb);
3016   if (UINT16_MAX - msize >
3017       sizeof (struct TransportBackchannelEncapsulationMessage) +
3018       sizeof (struct TransportBackchannelRequestPayload) )
3019   {
3020     GNUNET_break (0);
3021     return GNUNET_SYSERR;
3022   }
3023   inbox = (const struct GNUNET_MessageHeader *) &cb[1];
3024   isize = ntohs (inbox->size);
3025   if (isize >= msize)
3026   {
3027     GNUNET_break (0);
3028     return GNUNET_SYSERR;
3029   }
3030   is = (const char *) inbox;
3031   is += isize;
3032   msize -= isize;
3033   GNUNET_assert (msize > 0);
3034   if ('\0' != is[msize-1])
3035   {
3036     GNUNET_break (0);
3037     return GNUNET_SYSERR;
3038   }
3039   return GNUNET_OK;
3040 }
3041
3042
3043 /**
3044  * Remove memory used by expired ephemeral keys.
3045  *
3046  * @param cls NULL
3047  */
3048 static void
3049 expire_ephemerals (void *cls)
3050 {
3051   struct EphemeralCacheEntry *ece;
3052
3053   (void) cls;
3054   ephemeral_task = NULL;
3055   while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
3056   {
3057     if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
3058     {
3059       free_ephemeral (ece);
3060       continue;
3061     }
3062     ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
3063                                               &expire_ephemerals,
3064                                               NULL);
3065     return;
3066   }
3067 }
3068
3069
3070 /**
3071  * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
3072  * one, cache it and return it.
3073  *
3074  * @param pid peer to look up ephemeral for
3075  * @param private_key[out] set to the private key
3076  * @param ephemeral_key[out] set to the key
3077  * @param ephemeral_sender_sig[out] set to the signature
3078  * @param ephemeral_validity[out] set to the validity expiration time
3079  */
3080 static void
3081 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
3082                   struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
3083                   struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
3084                   struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
3085                   struct GNUNET_TIME_Absolute *ephemeral_validity)
3086 {
3087   struct EphemeralCacheEntry *ece;
3088   struct EphemeralConfirmation ec;
3089
3090   ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
3091                                            pid);
3092   if ( (NULL != ece) &&
3093        (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
3094   {
3095     free_ephemeral (ece);
3096     ece = NULL;
3097   }
3098   if (NULL == ece)
3099   {
3100     ece = GNUNET_new (struct EphemeralCacheEntry);
3101     ece->target = *pid;
3102     ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
3103                                                         EPHEMERAL_VALIDITY);
3104     GNUNET_assert (GNUNET_OK ==
3105                    GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
3106     GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
3107                                         &ece->ephemeral_key);
3108     ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
3109     ec.purpose.size = htonl (sizeof (ec));
3110     ec.target = *pid;
3111     ec.ephemeral_key = ece->ephemeral_key;
3112     GNUNET_assert (GNUNET_OK ==
3113                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
3114                                              &ec.purpose,
3115                                              &ece->sender_sig));
3116     ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
3117                                             ece,
3118                                             ece->ephemeral_validity.abs_value_us);
3119     GNUNET_assert (GNUNET_OK ==
3120                    GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
3121                                                       &ece->target,
3122                                                       ece,
3123                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3124     if (NULL == ephemeral_task)
3125       ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
3126                                                 &expire_ephemerals,
3127                                                 NULL);
3128   }
3129   *private_key = ece->private_key;
3130   *ephemeral_key = ece->ephemeral_key;
3131   *ephemeral_sender_sig = ece->sender_sig;
3132   *ephemeral_validity = ece->ephemeral_validity;
3133 }
3134
3135
3136 /**
3137  * We need to transmit @a hdr to @a target.  If necessary, this may
3138  * involve DV routing or even broadcasting and fragmentation.
3139  *
3140  * @param target peer to receive @a hdr
3141  * @param hdr header of the message to route
3142  */
3143 static void
3144 route_message (const struct GNUNET_PeerIdentity *target,
3145                struct GNUNET_MessageHeader *hdr)
3146 {
3147   // FIXME: this one is tricky:
3148   // - we could try a direct, reliable channel
3149   // - if that is unavailable / for load balancing, we may try:
3150   //   * multiple (?) direct unreliable channels - depending on loss rate?
3151   //   * some (?) DV channels - if above unavailable / too lossy?
3152   //   * _random_ other peers ("broadcasting") in hope of *discovering*
3153   //      a path back! - if all else fails
3154   // => need more on DV first!
3155
3156   // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
3157   GNUNET_free (hdr);
3158 }
3159
3160
3161 /**
3162  * Communicator requests backchannel transmission.  Process the request.
3163  *
3164  * @param cls the client
3165  * @param cb the send message that was sent
3166  */
3167 static void
3168 handle_communicator_backchannel (void *cls,
3169                                  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
3170 {
3171   struct TransportClient *tc = cls;
3172   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
3173   struct GNUNET_TIME_Absolute ephemeral_validity;
3174   struct TransportBackchannelEncapsulationMessage *enc;
3175   struct TransportBackchannelRequestPayload ppay;
3176   char *mpos;
3177   uint16_t msize;
3178
3179   /* encapsulate and encrypt message */
3180   msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
3181   enc = GNUNET_malloc (sizeof (*enc) + msize);
3182   enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
3183   enc->header.size = htons (sizeof (*enc) + msize);
3184   enc->target = cb->pid;
3185   lookup_ephemeral (&cb->pid,
3186                     &private_key,
3187                     &enc->ephemeral_key,
3188                     &ppay.sender_sig,
3189                     &ephemeral_validity);
3190   // FIXME: setup 'iv'
3191 #if FIXME
3192   dh_key_derive (&private_key,
3193                  &cb->pid,
3194                  &enc->iv,
3195                  &key);
3196 #endif
3197   ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
3198   ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
3199   mpos = (char *) &enc[1];
3200 #if FIXME
3201   encrypt (key,
3202            &ppay,
3203            &mpos,
3204            sizeof (ppay));
3205   encrypt (key,
3206            &cb[1],
3207            &mpos,
3208            ntohs (cb->header.size) - sizeof (*cb));
3209   hmac (key,
3210         &enc->hmac);
3211 #endif
3212   route_message (&cb->pid,
3213                  &enc->header);
3214   GNUNET_SERVICE_client_continue (tc->client);
3215 }
3216
3217
3218 /**
3219  * Address of our peer added.  Test message is well-formed.
3220  *
3221  * @param cls the client
3222  * @param aam the send message that was sent
3223  * @return #GNUNET_OK if message is well-formed
3224  */
3225 static int
3226 check_add_address (void *cls,
3227                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
3228 {
3229   struct TransportClient *tc = cls;
3230
3231   if (CT_COMMUNICATOR != tc->type)
3232   {
3233     GNUNET_break (0);
3234     return GNUNET_SYSERR;
3235   }
3236   GNUNET_MQ_check_zero_termination (aam);
3237   return GNUNET_OK;
3238 }
3239
3240
3241 /**
3242  * Ask peerstore to store our address.
3243  *
3244  * @param cls an `struct AddressListEntry *`
3245  */
3246 static void
3247 store_pi (void *cls);
3248
3249
3250 /**
3251  * Function called when peerstore is done storing our address.
3252  *
3253  * @param cls a `struct AddressListEntry`
3254  * @param success #GNUNET_YES if peerstore was successful
3255  */
3256 static void
3257 peerstore_store_own_cb (void *cls,
3258                         int success)
3259 {
3260   struct AddressListEntry *ale = cls;
3261
3262   ale->sc = NULL;
3263   if (GNUNET_YES != success)
3264     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3265                 "Failed to store our own address `%s' in peerstore!\n",
3266                 ale->address);
3267   /* refresh period is 1/4 of expiration time, that should be plenty
3268      without being excessive. */
3269   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
3270                                                                        4ULL),
3271                                           &store_pi,
3272                                           ale);
3273 }
3274
3275
3276 /**
3277  * Ask peerstore to store our address.
3278  *
3279  * @param cls an `struct AddressListEntry *`
3280  */
3281 static void
3282 store_pi (void *cls)
3283 {
3284   struct AddressListEntry *ale = cls;
3285   void *addr;
3286   size_t addr_len;
3287   struct GNUNET_TIME_Absolute expiration;
3288
3289   ale->st = NULL;
3290   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
3291   GNUNET_HELLO_sign_address (ale->address,
3292                              ale->nt,
3293                              expiration,
3294                              GST_my_private_key,
3295                              &addr,
3296                              &addr_len);
3297   ale->sc = GNUNET_PEERSTORE_store (peerstore,
3298                                     "transport",
3299                                     &GST_my_identity,
3300                                     GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
3301                                     addr,
3302                                     addr_len,
3303                                     expiration,
3304                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
3305                                     &peerstore_store_own_cb,
3306                                     ale);
3307   GNUNET_free (addr);
3308   if (NULL == ale->sc)
3309   {
3310     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3311                 "Failed to store our address `%s' with peerstore\n",
3312                 ale->address);
3313     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
3314                                             &store_pi,
3315                                             ale);
3316   }
3317 }
3318
3319
3320 /**
3321  * Address of our peer added.  Process the request.
3322  *
3323  * @param cls the client
3324  * @param aam the send message that was sent
3325  */
3326 static void
3327 handle_add_address (void *cls,
3328                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
3329 {
3330   struct TransportClient *tc = cls;
3331   struct AddressListEntry *ale;
3332   size_t slen;
3333
3334   slen = ntohs (aam->header.size) - sizeof (*aam);
3335   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
3336   ale->tc = tc;
3337   ale->address = (const char *) &ale[1];
3338   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
3339   ale->aid = aam->aid;
3340   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
3341   memcpy (&ale[1],
3342           &aam[1],
3343           slen);
3344   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
3345                                tc->details.communicator.addr_tail,
3346                                ale);
3347   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
3348                                       ale);
3349   GNUNET_SERVICE_client_continue (tc->client);
3350 }
3351
3352
3353 /**
3354  * Address of our peer deleted.  Process the request.
3355  *
3356  * @param cls the client
3357  * @param dam the send message that was sent
3358  */
3359 static void
3360 handle_del_address (void *cls,
3361                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
3362 {
3363   struct TransportClient *tc = cls;
3364
3365   if (CT_COMMUNICATOR != tc->type)
3366   {
3367     GNUNET_break (0);
3368     GNUNET_SERVICE_client_drop (tc->client);
3369     return;
3370   }
3371   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
3372        NULL != ale;
3373        ale = ale->next)
3374   {
3375     if (dam->aid != ale->aid)
3376       continue;
3377     GNUNET_assert (ale->tc == tc);
3378     free_address_list_entry (ale);
3379     GNUNET_SERVICE_client_continue (tc->client);
3380   }
3381   GNUNET_break (0);
3382   GNUNET_SERVICE_client_drop (tc->client);
3383 }
3384
3385
3386 /**
3387  * Context from #handle_incoming_msg().  Closure for many
3388  * message handlers below.
3389  */
3390 struct CommunicatorMessageContext
3391 {
3392   /**
3393    * Which communicator provided us with the message.
3394    */
3395   struct TransportClient *tc;
3396
3397   /**
3398    * Additional information for flow control and about the sender.
3399    */
3400   struct GNUNET_TRANSPORT_IncomingMessage im;
3401
3402   /**
3403    * Number of hops the message has travelled (if DV-routed).
3404    * FIXME: make use of this in ACK handling!
3405    */
3406   uint16_t total_hops;
3407 };
3408
3409
3410 /**
3411  * Given an inbound message @a msg from a communicator @a cmc,
3412  * demultiplex it based on the type calling the right handler.
3413  *
3414  * @param cmc context for demultiplexing
3415  * @param msg message to demultiplex
3416  */
3417 static void
3418 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3419                       const struct GNUNET_MessageHeader *msg);
3420
3421
3422 /**
3423  * Send ACK to communicator (if requested) and free @a cmc.
3424  *
3425  * @param cmc context for which we are done handling the message
3426  */
3427 static void
3428 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
3429 {
3430   if (0 != ntohl (cmc->im.fc_on))
3431   {
3432     /* send ACK when done to communicator for flow control! */
3433     struct GNUNET_MQ_Envelope *env;
3434     struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
3435
3436     env = GNUNET_MQ_msg (ack,
3437                          GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
3438     ack->reserved = htonl (0);
3439     ack->fc_id = cmc->im.fc_id;
3440     ack->sender = cmc->im.sender;
3441     GNUNET_MQ_send (cmc->tc->mq,
3442                     env);
3443   }
3444   GNUNET_SERVICE_client_continue (cmc->tc->client);
3445   GNUNET_free (cmc);
3446 }
3447
3448
3449 /**
3450  * Communicator gave us an unencapsulated message to pass as-is to
3451  * CORE.  Process the request.
3452  *
3453  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3454  * @param mh the message that was received
3455  */
3456 static void
3457 handle_raw_message (void *cls,
3458                     const struct GNUNET_MessageHeader *mh)
3459 {
3460   struct CommunicatorMessageContext *cmc = cls;
3461   uint16_t size = ntohs (mh->size);
3462
3463   if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
3464        (size < sizeof (struct GNUNET_MessageHeader)) )
3465   {
3466     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3467
3468     GNUNET_break (0);
3469     finish_cmc_handling (cmc);
3470     GNUNET_SERVICE_client_drop (client);
3471     return;
3472   }
3473   /* Forward to all CORE clients */
3474   for (struct TransportClient *tc = clients_head;
3475        NULL != tc;
3476        tc = tc->next)
3477   {
3478     struct GNUNET_MQ_Envelope *env;
3479     struct InboundMessage *im;
3480
3481     if (CT_CORE != tc->type)
3482       continue;
3483     env = GNUNET_MQ_msg_extra (im,
3484                                size,
3485                                GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
3486     im->peer = cmc->im.sender;
3487     memcpy (&im[1],
3488             mh,
3489             size);
3490     GNUNET_MQ_send (tc->mq,
3491                     env);
3492   }
3493   /* FIXME: consider doing this _only_ once the message
3494      was drained from the CORE MQs to extend flow control to CORE!
3495      (basically, increment counter in cmc, decrement on MQ send continuation! */
3496   finish_cmc_handling (cmc);
3497 }
3498
3499
3500 /**
3501  * Communicator gave us a fragment box.  Check the message.
3502  *
3503  * @param cls a `struct CommunicatorMessageContext`
3504  * @param fb the send message that was sent
3505  * @return #GNUNET_YES if message is well-formed
3506  */
3507 static int
3508 check_fragment_box (void *cls,
3509                     const struct TransportFragmentBox *fb)
3510 {
3511   uint16_t size = ntohs (fb->header.size);
3512   uint16_t bsize = size - sizeof (*fb);
3513
3514   if (0 == bsize)
3515   {
3516     GNUNET_break_op (0);
3517     return GNUNET_SYSERR;
3518   }
3519   if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
3520   {
3521     GNUNET_break_op (0);
3522     return GNUNET_SYSERR;
3523   }
3524   if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
3525   {
3526     GNUNET_break_op (0);
3527     return GNUNET_SYSERR;
3528   }
3529   return GNUNET_YES;
3530 }
3531
3532
3533 /**
3534  * Generate a fragment acknowledgement for an @a rc.
3535  *
3536  * @param rc context to generate ACK for, @a rc ACK state is reset
3537  */
3538 static void
3539 send_fragment_ack (struct ReassemblyContext *rc)
3540 {
3541   struct TransportFragmentAckMessage *ack;
3542
3543   ack = GNUNET_new (struct TransportFragmentAckMessage);
3544   ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3545   ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3546   ack->frag_uuid = htonl (rc->frag_uuid);
3547   ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3548   ack->msg_uuid = rc->msg_uuid;
3549   ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3550   if (0 == rc->msg_missing)
3551     ack->reassembly_timeout
3552       = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3553   else
3554     ack->reassembly_timeout
3555       = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3556   route_message (&rc->neighbour->pid,
3557                  &ack->header);
3558   rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3559   rc->num_acks = 0;
3560   rc->extra_acks = 0LLU;
3561 }
3562
3563
3564 /**
3565  * Communicator gave us a fragment.  Process the request.
3566  *
3567  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3568  * @param fb the message that was received
3569  */
3570 static void
3571 handle_fragment_box (void *cls,
3572                      const struct TransportFragmentBox *fb)
3573 {
3574   struct CommunicatorMessageContext *cmc = cls;
3575   struct Neighbour *n;
3576   struct ReassemblyContext *rc;
3577   const struct GNUNET_MessageHeader *msg;
3578   uint16_t msize;
3579   uint16_t fsize;
3580   uint16_t frag_off;
3581   uint32_t frag_uuid;
3582   char *target;
3583   struct GNUNET_TIME_Relative cdelay;
3584   int ack_now;
3585
3586   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3587                                          &cmc->im.sender);
3588   if (NULL == n)
3589   {
3590     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3591
3592     GNUNET_break (0);
3593     finish_cmc_handling (cmc);
3594     GNUNET_SERVICE_client_drop (client);
3595     return;
3596   }
3597   if (NULL == n->reassembly_map)
3598   {
3599     n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
3600                                                                GNUNET_YES);
3601     n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3602     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
3603                                                                &reassembly_cleanup_task,
3604                                                                n);
3605   }
3606   msize = ntohs (fb->msg_size);
3607   rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
3608                                            &fb->msg_uuid);
3609   if (NULL == rc)
3610   {
3611     rc = GNUNET_malloc (sizeof (*rc) +
3612                         msize + /* reassembly payload buffer */
3613                         (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
3614     rc->msg_uuid = fb->msg_uuid;
3615     rc->neighbour = n;
3616     rc->msg_size = msize;
3617     rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
3618     rc->last_frag = GNUNET_TIME_absolute_get ();
3619     rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
3620                                            rc,
3621                                            rc->reassembly_timeout.abs_value_us);
3622     GNUNET_assert (GNUNET_OK ==
3623                    GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
3624                                                        &rc->msg_uuid,
3625                                                        rc,
3626                                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3627     target = (char *) &rc[1];
3628     rc->bitfield = (uint8_t *) (target + rc->msg_size);
3629     rc->msg_missing = rc->msg_size;
3630   }
3631   else
3632   {
3633     target = (char *) &rc[1];
3634   }
3635   if (msize != rc->msg_size)
3636   {
3637     GNUNET_break (0);
3638     finish_cmc_handling (cmc);
3639     return;
3640   }
3641
3642   /* reassemble */
3643   fsize = ntohs (fb->header.size) - sizeof (*fb);
3644   frag_off = ntohs (fb->frag_off);
3645   memcpy (&target[frag_off],
3646           &fb[1],
3647           fsize);
3648   /* update bitfield and msg_missing */
3649   for (unsigned int i=frag_off;i<frag_off+fsize;i++)
3650   {
3651     if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
3652     {
3653       rc->bitfield[i / 8] |= (1 << (i % 8));
3654       rc->msg_missing--;
3655     }
3656   }
3657
3658   /* Compute cummulative ACK */
3659   frag_uuid = ntohl (fb->frag_uuid);
3660   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
3661   cdelay = GNUNET_TIME_relative_multiply (cdelay,
3662                                           rc->num_acks);
3663   rc->last_frag = GNUNET_TIME_absolute_get ();
3664   rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
3665                                                 cdelay);
3666   ack_now = GNUNET_NO;
3667   if (0 == rc->num_acks)
3668   {
3669     /* case one: first ack */
3670     rc->frag_uuid = frag_uuid;
3671     rc->extra_acks = 0LLU;
3672     rc->num_acks = 1;
3673   }
3674   else if ( (frag_uuid >= rc->frag_uuid) &&
3675             (frag_uuid <= rc->frag_uuid + 64) )
3676   {
3677     /* case two: ack fits after existing min UUID */
3678     if ( (frag_uuid == rc->frag_uuid) ||
3679          (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
3680     {
3681       /* duplicate fragment, ack now! */
3682       ack_now = GNUNET_YES;
3683     }
3684     else
3685     {
3686       rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
3687       rc->num_acks++;
3688     }
3689   }
3690   else if ( (rc->frag_uuid > frag_uuid) &&
3691             ( ( (rc->frag_uuid == frag_uuid + 64) &&
3692                 (0 == rc->extra_acks) ) ||
3693               ( (rc->frag_uuid < frag_uuid + 64) &&
3694                 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
3695   {
3696     /* can fit ack by shifting extra acks and starting at
3697        frag_uid, test above esured that the bits we will
3698        shift 'extra_acks' by are all zero. */
3699     rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
3700     rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
3701     rc->frag_uuid = frag_uuid;
3702     rc->num_acks++;
3703   }
3704   if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
3705     ack_now = GNUNET_YES; /* maximum acks received */
3706   // FIXME: possibly also ACK based on RTT (but for that we'd need to
3707   // determine the queue used for the ACK first!)
3708
3709   /* is reassembly complete? */
3710   if (0 != rc->msg_missing)
3711   {
3712     if (ack_now)
3713       send_fragment_ack (rc);
3714     finish_cmc_handling (cmc);
3715     return;
3716   }
3717   /* reassembly is complete, verify result */
3718   msg = (const struct GNUNET_MessageHeader *) &rc[1];
3719   if (ntohs (msg->size) != rc->msg_size)
3720   {
3721     GNUNET_break (0);
3722     free_reassembly_context (rc);
3723     finish_cmc_handling (cmc);
3724     return;
3725   }
3726   /* successful reassembly */
3727   send_fragment_ack (rc);
3728   demultiplex_with_cmc (cmc,
3729                         msg);
3730   /* FIXME: really free here? Might be bad if fragments are still
3731      en-route and we forget that we finished this reassembly immediately!
3732      -> keep around until timeout?
3733      -> shorten timeout based on ACK? */
3734   free_reassembly_context (rc);
3735 }
3736
3737
3738 /**
3739  * Check the @a fa against the fragments associated with @a pm.
3740  * If it matches, remove the matching fragments from the transmission
3741  * list.
3742  *
3743  * @param pm pending message to check against the ack
3744  * @param fa the ack that was received
3745  * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
3746  */
3747 static int
3748 check_ack_against_pm (struct PendingMessage *pm,
3749                       const struct TransportFragmentAckMessage *fa)
3750 {
3751   int match;
3752   struct PendingMessage *nxt;
3753   uint32_t fs = ntohl (fa->frag_uuid);
3754   uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
3755
3756   match = GNUNET_NO;
3757   for (struct PendingMessage *frag = pm->head_frag;
3758        NULL != frag;
3759        frag = nxt)
3760   {
3761     const struct TransportFragmentBox *tfb
3762       = (const struct TransportFragmentBox *) &pm[1];
3763     uint32_t fu = ntohl (tfb->frag_uuid);
3764
3765     GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
3766     nxt = frag->next_frag;
3767     /* Check for exact match or match in the 'xtra' bitmask */
3768     if ( (fu == fs) ||
3769          ( (fu > fs) &&
3770            (fu <= fs + 64) &&
3771            (0 != (1LLU << (fu - fs - 1) & xtra)) ) )
3772     {
3773       match = GNUNET_YES;
3774       free_fragment_tree (frag);
3775     }
3776   }
3777   return match;
3778 }
3779
3780
3781 /**
3782  * Communicator gave us a fragment acknowledgement.  Process the request.
3783  *
3784  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3785  * @param fa the message that was received
3786  */
3787 static void
3788 handle_fragment_ack (void *cls,
3789                      const struct TransportFragmentAckMessage *fa)
3790 {
3791   struct CommunicatorMessageContext *cmc = cls;
3792   struct Neighbour *n;
3793   int matched;
3794
3795   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3796                                          &cmc->im.sender);
3797   if (NULL == n)
3798   {
3799     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3800
3801     GNUNET_break (0);
3802     finish_cmc_handling (cmc);
3803     GNUNET_SERVICE_client_drop (client);
3804     return;
3805   }
3806   /* FIXME-OPTIMIZE: maybe use another hash map here? */
3807   matched = GNUNET_NO;
3808   for (struct PendingMessage *pm = n->pending_msg_head;
3809        NULL != pm;
3810        pm = pm->prev_neighbour)
3811   {
3812     if (0 !=
3813         GNUNET_memcmp (&fa->msg_uuid,
3814                        &pm->msg_uuid))
3815       continue;
3816     matched = GNUNET_YES;
3817     if (GNUNET_YES ==
3818         check_ack_against_pm (pm,
3819                               fa))
3820     {
3821       struct GNUNET_TIME_Relative avg_ack_delay
3822         = GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
3823       // FIXME: update RTT and other reliability data!
3824       // ISSUE: we don't know which of n's queues the message(s)
3825       // took (and in fact the different messages might have gone
3826       // over different queues and possibly over multiple).
3827       // => track queues with PendingMessages, and update RTT only if
3828       //    the queue used is unique?
3829       //    -> how can we get loss rates?
3830       //    -> or, add extra state to Box and ACK to identify queue?
3831       (void) avg_ack_delay;
3832     }
3833     else
3834     {
3835       GNUNET_STATISTICS_update (GST_stats,
3836                                 "# FRAGMENT_ACKS dropped, no matching fragment",
3837                                 1,
3838                                 GNUNET_NO);
3839     }
3840     if (NULL == pm->head_frag)
3841     {
3842       // if entire message is ACKed, handle that as well.
3843       // => clean up PM, any post actions?
3844       free_pending_message (pm);
3845     }
3846     else
3847     {
3848       struct GNUNET_TIME_Relative reassembly_timeout
3849         = GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
3850       // OPTIMIZE-FIXME: adjust retransmission strategy based on reassembly_timeout!
3851       (void) reassembly_timeout;
3852     }
3853     break;
3854   }
3855   if (GNUNET_NO == matched)
3856   {
3857     GNUNET_STATISTICS_update (GST_stats,
3858                               "# FRAGMENT_ACKS dropped, no matching pending message",
3859                               1,
3860                               GNUNET_NO);
3861   }
3862   finish_cmc_handling (cmc);
3863 }
3864
3865
3866 /**
3867  * Communicator gave us a reliability box.  Check the message.
3868  *
3869  * @param cls a `struct CommunicatorMessageContext`
3870  * @param rb the send message that was sent
3871  * @return #GNUNET_YES if message is well-formed
3872  */
3873 static int
3874 check_reliability_box (void *cls,
3875                        const struct TransportReliabilityBox *rb)
3876 {
3877   GNUNET_MQ_check_boxed_message (rb);
3878   return GNUNET_YES;
3879 }
3880
3881
3882 /**
3883  * Communicator gave us a reliability box.  Process the request.
3884  *
3885  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3886  * @param rb the message that was received
3887  */
3888 static void
3889 handle_reliability_box (void *cls,
3890                         const struct TransportReliabilityBox *rb)
3891 {
3892   struct CommunicatorMessageContext *cmc = cls;
3893   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
3894
3895   if (0 == ntohl (rb->ack_countdown))
3896   {
3897     struct TransportReliabilityAckMessage *ack;
3898
3899     /* FIXME: implement cummulative ACKs and ack_countdown,
3900        then setting the avg_ack_delay field below: */
3901     ack = GNUNET_malloc (sizeof (*ack) +
3902                          sizeof (struct GNUNET_ShortHashCode));
3903     ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
3904     ack->header.size = htons (sizeof (*ack) +
3905                               sizeof (struct GNUNET_ShortHashCode));
3906     memcpy (&ack[1],
3907             &rb->msg_uuid,
3908             sizeof (struct GNUNET_ShortHashCode));
3909     route_message (&cmc->im.sender,
3910                    &ack->header);
3911   }
3912   /* continue with inner message */
3913   demultiplex_with_cmc (cmc,
3914                         inbox);
3915 }
3916
3917
3918 /**
3919  * Communicator gave us a reliability ack.  Process the request.
3920  *
3921  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3922  * @param ra the message that was received
3923  */
3924 static void
3925 handle_reliability_ack (void *cls,
3926                         const struct TransportReliabilityAckMessage *ra)
3927 {
3928   struct CommunicatorMessageContext *cmc = cls;
3929
3930   // FIXME: do work: find message that was acknowledged, and
3931   // remove from transmission queue; update RTT.
3932   finish_cmc_handling (cmc);
3933 }
3934
3935
3936 /**
3937  * Communicator gave us a backchannel encapsulation.  Check the message.
3938  *
3939  * @param cls a `struct CommunicatorMessageContext`
3940  * @param be the send message that was sent
3941  * @return #GNUNET_YES if message is well-formed
3942  */
3943 static int
3944 check_backchannel_encapsulation (void *cls,
3945                                  const struct TransportBackchannelEncapsulationMessage *be)
3946 {
3947   uint16_t size = ntohs (be->header.size);
3948
3949   (void) cls;
3950   if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
3951   {
3952     GNUNET_break_op (0);
3953     return GNUNET_SYSERR;
3954   }
3955   return GNUNET_YES;
3956 }
3957
3958
3959 /**
3960  * Communicator gave us a backchannel encapsulation.  Process the request.
3961  *
3962  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3963  * @param be the message that was received
3964  */
3965 static void
3966 handle_backchannel_encapsulation (void *cls,
3967                                   const struct TransportBackchannelEncapsulationMessage *be)
3968 {
3969   struct CommunicatorMessageContext *cmc = cls;
3970
3971   if (0 != GNUNET_memcmp (&be->target,
3972                           &GST_my_identity))
3973   {
3974     /* not for me, try to route to target */
3975     route_message (&be->target,
3976                    GNUNET_copy_message (&be->header));
3977     finish_cmc_handling (cmc);
3978     return;
3979   }
3980   // FIXME: compute shared secret
3981   // FIXME: check HMAC
3982   // FIXME: decrypt payload
3983   // FIXME: forward to specified communicator!
3984   // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
3985   finish_cmc_handling (cmc);
3986 }
3987
3988
3989 /**
3990  * Task called when we should check if any of the DV paths
3991  * we have learned to a target are due for garbage collection.
3992  *
3993  * Collects stale paths, and possibly frees the entire DV
3994  * entry if no paths are left. Otherwise re-schedules itself.
3995  *
3996  * @param cls a `struct DistanceVector`
3997  */
3998 static void
3999 path_cleanup_cb (void *cls)
4000 {
4001   struct DistanceVector *dv = cls;
4002   struct DistanceVectorHop *pos;
4003
4004   dv->timeout_task = NULL;
4005   while (NULL != (pos = dv->dv_head))
4006   {
4007     GNUNET_assert (dv == pos->dv);
4008     if (GNUNET_TIME_absolute_get_remaining (pos->timeout).rel_value_us > 0)
4009       break;
4010     free_distance_vector_hop (pos);
4011   }
4012   if (NULL == pos)
4013   {
4014     free_dv_route (dv);
4015     return;
4016   }
4017   dv->timeout_task = GNUNET_SCHEDULER_add_at (pos->timeout,
4018                                               &path_cleanup_cb,
4019                                               dv);
4020 }
4021
4022
4023 /**
4024  * We have learned a @a path through the network to some other peer, add it to
4025  * our DV data structure (returning #GNUNET_YES on success).
4026  *
4027  * We do not add paths if we have a sufficient number of shorter
4028  * paths to this target already (returning #GNUNET_NO).
4029  *
4030  * We also do not add problematic paths, like those where we lack the first
4031  * hop in our neighbour list (i.e. due to a topology change) or where some
4032  * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
4033  *
4034  * @param path the path we learned, path[0] should be us,
4035  *             and then path contains a valid path from us to `path[path_len-1]`
4036  *             path[1] should be a direct neighbour (we should check!)
4037  * @param path_len number of entries on the @a path, at least three!
4038  * @param network_latency how long does the message take from us to `path[path_len-1]`?
4039  *          set to "forever" if unknown
4040  * @return #GNUNET_YES on success,
4041  *         #GNUNET_NO if we have better path(s) to the target
4042  *         #GNUNET_SYSERR if the path is useless and/or invalid
4043  *                         (i.e. path[1] not a direct neighbour
4044  *                        or path[i+1] is a direct neighbour for i>0)
4045  */
4046 static int
4047 learn_dv_path (const struct GNUNET_PeerIdentity *path,
4048                unsigned int path_len,
4049                struct GNUNET_TIME_Relative network_latency)
4050 {
4051   struct DistanceVectorHop *hop;
4052   struct DistanceVector *dv;
4053   struct Neighbour *next_hop;
4054   unsigned int shorter_distance;
4055
4056   if (path_len < 3)
4057   {
4058     /* what a boring path! not allowed! */
4059     GNUNET_break (0);
4060     return GNUNET_SYSERR;
4061   }
4062   GNUNET_assert (0 ==
4063                  GNUNET_memcmp (&GST_my_identity,
4064                                 &path[0]));
4065   next_hop = GNUNET_CONTAINER_multipeermap_get (neighbours,
4066                                                 &path[1]);
4067   if (NULL == next_hop)
4068   {
4069     /* next hop must be a neighbour, otherwise this whole thing is useless! */
4070     GNUNET_break (0);
4071     return GNUNET_SYSERR;
4072   }
4073   for (unsigned int i=2;i<path_len;i++)
4074     if (NULL !=
4075         GNUNET_CONTAINER_multipeermap_get (neighbours,
4076                                            &path[i]))
4077     {
4078       /* Useless path, we have a direct connection to some hop
4079          in the middle of the path, so this one doesn't even
4080          seem terribly useful for redundancy */
4081       return GNUNET_SYSERR;
4082     }
4083   dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
4084                                           &path[path_len - 1]);
4085   if (NULL == dv)
4086   {
4087     dv = GNUNET_new (struct DistanceVector);
4088     dv->target = path[path_len - 1];
4089     dv->timeout_task = GNUNET_SCHEDULER_add_delayed (DV_PATH_VALIDITY_TIMEOUT,
4090                                                      &path_cleanup_cb,
4091                                                      dv);
4092     GNUNET_assert (GNUNET_OK ==
4093                    GNUNET_CONTAINER_multipeermap_put (dv_routes,
4094                                                       &dv->target,
4095                                                       dv,
4096                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4097   }
4098   /* Check if we have this path already! */
4099   shorter_distance = 0;
4100   for (struct DistanceVectorHop *pos = dv->dv_head;
4101        NULL != pos;
4102        pos = pos->next_dv)
4103   {
4104     if (pos->distance < path_len - 2)
4105       shorter_distance++;
4106     /* Note that the distances in 'pos' excludes us (path[0]) and
4107        the next_hop (path[1]), so we need to subtract two
4108        and check next_hop explicitly */
4109     if ( (pos->distance == path_len - 2) &&
4110          (pos->next_hop == next_hop) )
4111     {
4112       int match = GNUNET_YES;
4113
4114       for (unsigned int i=0;i<pos->distance;i++)
4115       {
4116         if (0 !=
4117             GNUNET_memcmp (&pos->path[i],
4118                            &path[i+2]))
4119         {
4120           match = GNUNET_NO;
4121           break;
4122         }
4123       }
4124       if (GNUNET_YES == match)
4125       {
4126         struct GNUNET_TIME_Relative last_timeout;
4127
4128         /* Re-discovered known path, update timeout */
4129         GNUNET_STATISTICS_update (GST_stats,
4130                                   "# Known DV path refreshed",
4131                                   1,
4132                                   GNUNET_NO);
4133         last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
4134         pos->timeout
4135           = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
4136         GNUNET_CONTAINER_MDLL_remove (dv,
4137                                       dv->dv_head,
4138                                       dv->dv_tail,
4139                                       pos);
4140         GNUNET_CONTAINER_MDLL_insert (dv,
4141                                       dv->dv_head,
4142                                       dv->dv_tail,
4143                                       pos);
4144         if (last_timeout.rel_value_us <
4145             GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
4146                                            DV_PATH_DISCOVERY_FREQUENCY).rel_value_us)
4147         {
4148           /* Some peer send DV learn messages too often, we are learning
4149              the same path faster than it would be useful; do not forward! */
4150           return GNUNET_NO;
4151         }
4152         return GNUNET_YES;
4153       }
4154     }
4155   }
4156   /* Count how many shorter paths we have (incl. direct
4157      neighbours) before simply giving up on this one! */
4158   if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
4159   {
4160     /* We have a shorter path already! */
4161     return GNUNET_NO;
4162   }
4163   /* create new DV path entry */
4164   hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
4165                        sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
4166   hop->next_hop = next_hop;
4167   hop->dv = dv;
4168   hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
4169   memcpy (&hop[1],
4170           &path[2],
4171           sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
4172   hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
4173   hop->distance = path_len - 2;
4174   GNUNET_CONTAINER_MDLL_insert (dv,
4175                                 dv->dv_head,
4176                                 dv->dv_tail,
4177                                 hop);
4178   GNUNET_CONTAINER_MDLL_insert (neighbour,
4179                                 next_hop->dv_head,
4180                                 next_hop->dv_tail,
4181                                 hop);
4182   return GNUNET_YES;
4183 }
4184
4185
4186 /**
4187  * Communicator gave us a DV learn message.  Check the message.
4188  *
4189  * @param cls a `struct CommunicatorMessageContext`
4190  * @param dvl the send message that was sent
4191  * @return #GNUNET_YES if message is well-formed
4192  */
4193 static int
4194 check_dv_learn (void *cls,
4195                 const struct TransportDVLearn *dvl)
4196 {
4197   uint16_t size = ntohs (dvl->header.size);
4198   uint16_t num_hops = ntohs (dvl->num_hops);
4199   const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
4200
4201   (void) cls;
4202   if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
4203   {
4204     GNUNET_break_op (0);
4205     return GNUNET_SYSERR;
4206   }
4207   if (num_hops > MAX_DV_HOPS_ALLOWED)
4208   {
4209     GNUNET_break_op (0);
4210     return GNUNET_SYSERR;
4211   }
4212   for (unsigned int i=0;i<num_hops;i++)
4213   {
4214     if (0 == GNUNET_memcmp (&dvl->initiator,
4215                             &hops[i].hop))
4216     {
4217       GNUNET_break_op (0);
4218       return GNUNET_SYSERR;
4219     }
4220     if (0 == GNUNET_memcmp (&GST_my_identity,
4221                             &hops[i].hop))
4222     {
4223       GNUNET_break_op (0);
4224       return GNUNET_SYSERR;
4225     }
4226   }
4227   return GNUNET_YES;
4228 }
4229
4230
4231 /**
4232  * Build and forward a DV learn message to @a next_hop.
4233  *
4234  * @param next_hop peer to send the message to
4235  * @param msg message received
4236  * @param bi_history bitmask specifying hops on path that were bidirectional
4237  * @param nhops length of the @a hops array
4238  * @param hops path the message traversed so far
4239  * @param in_time when did we receive the message, used to calculate network delay
4240  */
4241 static void
4242 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
4243                   const struct TransportDVLearn *msg,
4244                   uint16_t bi_history,
4245                   uint16_t nhops,
4246                   const struct DVPathEntryP *hops,
4247                   struct GNUNET_TIME_Absolute in_time)
4248 {
4249   struct DVPathEntryP *dhops;
4250   struct TransportDVLearn *fwd;
4251   struct GNUNET_TIME_Relative nnd;
4252
4253   /* compute message for forwarding */
4254   GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
4255   fwd = GNUNET_malloc (sizeof (struct TransportDVLearn) +
4256                        (nhops + 1) * sizeof (struct DVPathEntryP));
4257   fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
4258   fwd->header.size = htons (sizeof (struct TransportDVLearn) +
4259                             (nhops + 1) * sizeof (struct DVPathEntryP));
4260   fwd->num_hops = htons (nhops + 1);
4261   fwd->bidirectional = htons (bi_history);
4262   nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
4263                                   GNUNET_TIME_relative_ntoh (msg->non_network_delay));
4264   fwd->non_network_delay = GNUNET_TIME_relative_hton (nnd);
4265   fwd->init_sig = msg->init_sig;
4266   fwd->initiator = msg->initiator;
4267   fwd->challenge = msg->challenge;
4268   dhops = (struct DVPathEntryP *) &fwd[1];
4269   GNUNET_memcpy (dhops,
4270                  hops,
4271                  sizeof (struct DVPathEntryP) * nhops);
4272   dhops[nhops].hop = GST_my_identity;
4273   {
4274     struct DvHopPS dhp = {
4275       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
4276       .purpose.size = htonl (sizeof (dhp)),
4277       .pred = dhops[nhops-1].hop,
4278       .succ = *next_hop,
4279       .challenge = msg->challenge
4280     };
4281
4282     GNUNET_assert (GNUNET_OK ==
4283                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
4284                                              &dhp.purpose,
4285                                              &dhops[nhops].hop_sig));
4286   }
4287   route_message (next_hop,
4288                  &fwd->header);
4289 }
4290
4291
4292 /**
4293  * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
4294  *
4295  * @param init the signer
4296  * @param challenge the challenge that was signed
4297  * @param init_sig signature presumably by @a init
4298  * @return #GNUNET_OK if the signature is valid
4299  */
4300 static int
4301 validate_dv_initiator_signature (const struct GNUNET_PeerIdentity *init,
4302                                  const struct GNUNET_ShortHashCode *challenge,
4303                                  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
4304 {
4305   struct DvInitPS ip = {
4306     .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
4307     .purpose.size = htonl (sizeof (ip)),
4308     .challenge = *challenge
4309   };
4310
4311   if (GNUNET_OK !=
4312       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
4313                                   &ip.purpose,
4314                                   init_sig,
4315                                   &init->public_key))
4316   {
4317     GNUNET_break_op (0);
4318     return GNUNET_SYSERR;
4319   }
4320   return GNUNET_OK;
4321 }
4322
4323
4324 /**
4325  * Communicator gave us a DV learn message.  Process the request.
4326  *
4327  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4328  * @param dvl the message that was received
4329  */
4330 static void
4331 handle_dv_learn (void *cls,
4332                  const struct TransportDVLearn *dvl)
4333 {
4334   struct CommunicatorMessageContext *cmc = cls;
4335   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
4336   int bi_hop;
4337   uint16_t nhops;
4338   uint16_t bi_history;
4339   const struct DVPathEntryP *hops;
4340   int do_fwd;
4341   int did_initiator;
4342   struct GNUNET_TIME_Absolute in_time;
4343
4344   nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
4345   bi_history = ntohs (dvl->bidirectional);
4346   hops = (const struct DVPathEntryP *) &dvl[1];
4347   if (0 == nhops)
4348   {
4349     /* sanity check */
4350     if (0 != GNUNET_memcmp (&dvl->initiator,
4351                             &cmc->im.sender))
4352     {
4353       GNUNET_break (0);
4354       finish_cmc_handling (cmc);
4355       return;
4356     }
4357   }
4358   else
4359   {
4360     /* sanity check */
4361     if (0 != GNUNET_memcmp (&hops[nhops - 1].hop,
4362                             &cmc->im.sender))
4363     {
4364       GNUNET_break (0);
4365       finish_cmc_handling (cmc);
4366       return;
4367     }
4368   }
4369
4370   GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
4371   cc = cmc->tc->details.communicator.cc;
4372   bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE == cc); // FIXME: add bi-directional flag to cc?
4373   in_time = GNUNET_TIME_absolute_get ();
4374
4375   /* continue communicator here, everything else can happen asynchronous! */
4376   finish_cmc_handling (cmc);
4377
4378   // FIXME: should we bother to verify _every_ DV initiator signature?
4379   if (GNUNET_OK !=
4380       validate_dv_initiator_signature (&dvl->initiator,
4381                                        &dvl->challenge,
4382                                        &dvl->init_sig))
4383   {
4384     GNUNET_break_op (0);
4385     return;
4386   }
4387   // FIXME: asynchronously (!) verify hop-by-hop signatures!
4388   // => if signature verification load too high, implement random drop strategy!
4389
4390   do_fwd = GNUNET_YES;
4391   if (0 == GNUNET_memcmp (&GST_my_identity,
4392                           &dvl->initiator))
4393   {
4394     struct GNUNET_PeerIdentity path[nhops + 1];
4395     struct GNUNET_TIME_Relative host_latency_sum;
4396     struct GNUNET_TIME_Relative latency;
4397     struct GNUNET_TIME_Relative network_latency;
4398
4399     /* We initiated this, learn the forward path! */
4400     path[0] = GST_my_identity;
4401     path[1] = hops[0].hop;
4402     host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
4403
4404     // Need also something to lookup initiation time
4405     // to compute RTT! -> add RTT argument here?
4406     latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
4407     // (based on dvl->challenge, we can identify time of origin!)
4408
4409     network_latency = GNUNET_TIME_relative_subtract (latency,
4410                                                      host_latency_sum);
4411     /* assumption: latency on all links is the same */
4412     network_latency = GNUNET_TIME_relative_divide (network_latency,
4413                                                    nhops);
4414
4415     for (unsigned int i=2;i<=nhops;i++)
4416     {
4417       struct GNUNET_TIME_Relative ilat;
4418
4419       /* assumption: linear latency increase per hop */
4420       ilat = GNUNET_TIME_relative_multiply (network_latency,
4421                                             i);
4422       path[i] = hops[i-1].hop;
4423       learn_dv_path (path,
4424                      i,
4425                      ilat);
4426     }
4427     /* as we initiated, do not forward again (would be circular!) */
4428     do_fwd = GNUNET_NO;
4429     return;
4430   }
4431   else if (bi_hop)
4432   {
4433     /* last hop was bi-directional, we could learn something here! */
4434     struct GNUNET_PeerIdentity path[nhops + 2];
4435
4436     path[0] = GST_my_identity;
4437     path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
4438     for (unsigned int i=0;i<nhops;i++)
4439     {
4440       int iret;
4441
4442       if (0 == (bi_history & (1 << i)))
4443         break; /* i-th hop not bi-directional, stop learning! */
4444       if (i == nhops)
4445       {
4446         path[i + 2] = dvl->initiator;
4447       }
4448       else
4449       {
4450         path[i + 2] = hops[nhops - i - 2].hop;
4451       }
4452
4453       iret = learn_dv_path (path,
4454                             i + 2,
4455                             GNUNET_TIME_UNIT_FOREVER_REL);
4456       if (GNUNET_SYSERR == iret)
4457       {
4458         /* path invalid or too long to be interesting for US, thus should also
4459            not be interesting to our neighbours, cut path when forwarding to
4460            'i' hops, except of course for the one that goes back to the
4461            initiator */
4462         GNUNET_STATISTICS_update (GST_stats,
4463                                   "# DV learn not forwarded due invalidity of path",
4464                                   1,
4465                                   GNUNET_NO);
4466         do_fwd = GNUNET_NO;
4467         break;
4468       }
4469       if ( (GNUNET_NO == iret) &&
4470            (nhops == i + 1) )
4471       {
4472         /* we have better paths, and this is the longest target,
4473            so there cannot be anything interesting later */
4474         GNUNET_STATISTICS_update (GST_stats,
4475                                   "# DV learn not forwarded, got better paths",
4476                                   1,
4477                                   GNUNET_NO);
4478         do_fwd = GNUNET_NO;
4479         break;
4480       }
4481     }
4482   }
4483
4484   if (MAX_DV_HOPS_ALLOWED == nhops)
4485   {
4486     /* At limit, we're out of here! */
4487     finish_cmc_handling (cmc);
4488     return;
4489   }
4490
4491   /* Forward to initiator, if path non-trivial and possible */
4492   bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
4493   did_initiator = GNUNET_NO;
4494   if ( (1 < nhops) &&
4495        (GNUNET_YES ==
4496         GNUNET_CONTAINER_multipeermap_contains (neighbours,
4497                                                 &dvl->initiator)) )
4498   {
4499     /* send back to origin! */
4500     forward_dv_learn (&dvl->initiator,
4501                       dvl,
4502                       bi_history,
4503                       nhops,
4504                       hops,
4505                       in_time);
4506     did_initiator = GNUNET_YES;
4507   }
4508   /* We forward under two conditions: either we still learned something
4509      ourselves (do_fwd), or the path was darn short and thus the initiator is
4510      likely to still be very interested in this (and we did NOT already
4511      send it back to the initiator) */
4512   if ( (do_fwd) ||
4513        ( (nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
4514          (GNUNET_NO == did_initiator) ) )
4515   {
4516     /* FIXME: loop over all neighbours, pick those with low
4517        queues AND that are not yet on the path; possibly
4518        adapt threshold to nhops! */
4519 #if FIXME
4520     forward_dv_learn (NULL, // fill in peer from iterator here!
4521                       dvl,
4522                       bi_history,
4523                       nhops,
4524                       hops,
4525                       in_time);
4526 #endif
4527   }
4528 }
4529
4530
4531 /**
4532  * Communicator gave us a DV box.  Check the message.
4533  *
4534  * @param cls a `struct CommunicatorMessageContext`
4535  * @param dvb the send message that was sent
4536  * @return #GNUNET_YES if message is well-formed
4537  */
4538 static int
4539 check_dv_box (void *cls,
4540               const struct TransportDVBox *dvb)
4541 {
4542   uint16_t size = ntohs (dvb->header.size);
4543   uint16_t num_hops = ntohs (dvb->num_hops);
4544   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
4545   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
4546   uint16_t isize;
4547   uint16_t itype;
4548
4549   (void) cls;
4550   if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
4551   {
4552     GNUNET_break_op (0);
4553     return GNUNET_SYSERR;
4554   }
4555   isize = ntohs (inbox->size);
4556   if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
4557   {
4558     GNUNET_break_op (0);
4559     return GNUNET_SYSERR;
4560   }
4561   itype = ntohs (inbox->type);
4562   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
4563        (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
4564   {
4565     GNUNET_break_op (0);
4566     return GNUNET_SYSERR;
4567   }
4568   return GNUNET_YES;
4569 }
4570
4571
4572 /**
4573  * Communicator gave us a DV box.  Process the request.
4574  *
4575  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4576  * @param dvb the message that was received
4577  */
4578 static void
4579 handle_dv_box (void *cls,
4580                const struct TransportDVBox *dvb)
4581 {
4582   struct CommunicatorMessageContext *cmc = cls;
4583   uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
4584   uint16_t num_hops = ntohs (dvb->num_hops);
4585   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
4586   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
4587
4588   if (num_hops > 0)
4589   {
4590     // FIXME: if we are not the target, shorten path and forward along.
4591     // Try from the _end_ of hops array if we know the given
4592     // neighbour (shortening the path!).
4593     // NOTE: increment total_hops!
4594     finish_cmc_handling (cmc);
4595     return;
4596   }
4597   /* We are the target. Unbox and handle message. */
4598   cmc->im.sender = dvb->origin;
4599   cmc->total_hops = ntohs (dvb->total_hops);
4600   demultiplex_with_cmc (cmc,
4601                         inbox);
4602 }
4603
4604
4605 /**
4606  * Client notified us about transmission from a peer.  Process the request.
4607  *
4608  * @param cls a `struct TransportClient` which sent us the message
4609  * @param obm the send message that was sent
4610  * @return #GNUNET_YES if message is well-formed
4611  */
4612 static int
4613 check_incoming_msg (void *cls,
4614                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
4615 {
4616   struct TransportClient *tc = cls;
4617
4618   if (CT_COMMUNICATOR != tc->type)
4619   {
4620     GNUNET_break (0);
4621     return GNUNET_SYSERR;
4622   }
4623   GNUNET_MQ_check_boxed_message (im);
4624   return GNUNET_OK;
4625 }
4626
4627
4628 /**
4629  * Communicator gave us a transport address validation challenge.  Process the request.
4630  *
4631  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4632  * @param tvc the message that was received
4633  */
4634 static void
4635 handle_validation_challenge (void *cls,
4636                              const struct TransportValidationChallenge *tvc)
4637 {
4638   struct CommunicatorMessageContext *cmc = cls;
4639   struct TransportValidationResponse *tvr;
4640
4641   if (cmc->total_hops > 0)
4642   {
4643     /* DV routing is not allowed for validation challenges! */
4644     GNUNET_break_op (0);
4645     finish_cmc_handling (cmc);
4646     return;
4647   }
4648   tvr = GNUNET_new (struct TransportValidationResponse);
4649   tvr->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
4650   tvr->header.size = htons (sizeof (*tvr));
4651   tvr->challenge = tvc->challenge;
4652   tvr->origin_time = tvc->sender_time;
4653   tvr->validity_duration = cmc->im.expected_address_validity;
4654   {
4655     /* create signature */
4656     struct TransportValidationPS tvp = {
4657       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
4658       .purpose.size = htonl (sizeof (tvp)),
4659       .validity_duration = tvr->validity_duration,
4660       .challenge = tvc->challenge
4661     };
4662
4663     GNUNET_assert (GNUNET_OK ==
4664                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
4665                                              &tvp.purpose,
4666                                              &tvr->signature));
4667   }
4668   route_message (&cmc->im.sender,
4669                  &tvr->header);
4670   finish_cmc_handling (cmc);
4671 }
4672
4673
4674 /**
4675  * Closure for #check_known_challenge.
4676  */
4677 struct CheckKnownChallengeContext
4678 {
4679   /**
4680    * Set to the challenge we are looking for.
4681    */
4682   const struct GNUNET_ShortHashCode *challenge;
4683
4684   /**
4685    * Set to a matching validation state, if one was found.
4686    */
4687   struct ValidationState *vs;
4688 };
4689
4690
4691 /**
4692  * Test if the validation state in @a value matches the
4693  * challenge from @a cls.
4694  *
4695  * @param cls a `struct CheckKnownChallengeContext`
4696  * @param pid unused (must match though)
4697  * @param value a `struct ValidationState`
4698  * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
4699  */
4700 static int
4701 check_known_challenge (void *cls,
4702                        const struct GNUNET_PeerIdentity *pid,
4703                        void *value)
4704 {
4705   struct CheckKnownChallengeContext *ckac = cls;
4706   struct ValidationState *vs = value;
4707
4708   (void) pid;
4709   if (0 != GNUNET_memcmp (&vs->challenge,
4710                           ckac->challenge))
4711     return GNUNET_OK;
4712   ckac->vs = vs;
4713   return GNUNET_NO;
4714 }
4715
4716
4717 /**
4718  * Function called when peerstore is done storing a
4719  * validated address.
4720  *
4721  * @param cls a `struct ValidationState`
4722  * @param success #GNUNET_YES on success
4723  */
4724 static void
4725 peerstore_store_validation_cb (void *cls,
4726                                int success)
4727 {
4728   struct ValidationState *vs = cls;
4729
4730   vs->sc = NULL;
4731   if (GNUNET_YES == success)
4732     return;
4733   GNUNET_STATISTICS_update (GST_stats,
4734                             "# Peerstore failed to store foreign address",
4735                             1,
4736                             GNUNET_NO);
4737 }
4738
4739
4740 /**
4741  * Task run periodically to validate some address based on #validation_heap.
4742  *
4743  * @param cls NULL
4744  */
4745 static void
4746 validation_start_cb (void *cls);
4747
4748
4749 /**
4750  * Set the time for next_challenge of @a vs to @a new_time.
4751  * Updates the heap and if necessary reschedules the job.
4752  *
4753  * @param vs validation state to update
4754  * @param new_time new time for revalidation
4755  */
4756 static void
4757 update_next_challenge_time (struct ValidationState *vs,
4758                             struct GNUNET_TIME_Absolute new_time)
4759 {
4760   struct GNUNET_TIME_Relative delta;
4761
4762   if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
4763     return; /* be lazy */
4764   vs->next_challenge = new_time;
4765   if (NULL == vs->hn)
4766     vs->hn = GNUNET_CONTAINER_heap_insert (validation_heap,
4767                                            vs,
4768                                            new_time.abs_value_us);
4769   else
4770     GNUNET_CONTAINER_heap_update_cost (vs->hn,
4771                                        new_time.abs_value_us);
4772   if ( (vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
4773        (NULL != validation_task) )
4774     return;
4775   if (NULL != validation_task)
4776     GNUNET_SCHEDULER_cancel (validation_task);
4777   /* randomize a bit */
4778   delta.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
4779                                                  MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
4780   new_time = GNUNET_TIME_absolute_add (new_time,
4781                                        delta);
4782   validation_task = GNUNET_SCHEDULER_add_at (new_time,
4783                                              &validation_start_cb,
4784                                              NULL);
4785 }
4786
4787
4788 /**
4789  * Communicator gave us a transport address validation response.  Process the request.
4790  *
4791  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4792  * @param tvr the message that was received
4793  */
4794 static void
4795 handle_validation_response (void *cls,
4796                             const struct TransportValidationResponse *tvr)
4797 {
4798   struct CommunicatorMessageContext *cmc = cls;
4799   struct ValidationState *vs;
4800   struct CheckKnownChallengeContext ckac = {
4801     .challenge = &tvr->challenge,
4802     .vs = NULL
4803   };
4804   struct GNUNET_TIME_Absolute origin_time;
4805
4806   /* check this is one of our challenges */
4807   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
4808                                                      &cmc->im.sender,
4809                                                      &check_known_challenge,
4810                                                      &ckac);
4811   if (NULL == (vs = ckac.vs))
4812   {
4813     /* This can happen simply if we 'forgot' the challenge by now,
4814        i.e. because we received the validation response twice */
4815     GNUNET_STATISTICS_update (GST_stats,
4816                               "# Validations dropped, challenge unknown",
4817                               1,
4818                               GNUNET_NO);
4819     finish_cmc_handling (cmc);
4820     return;
4821   }
4822
4823   /* sanity check on origin time */
4824   origin_time = GNUNET_TIME_absolute_ntoh (tvr->origin_time);
4825   if ( (origin_time.abs_value_us < vs->first_challenge_use.abs_value_us) ||
4826        (origin_time.abs_value_us > vs->last_challenge_use.abs_value_us) )
4827   {
4828     GNUNET_break_op (0);
4829     finish_cmc_handling (cmc);
4830     return;
4831   }
4832
4833   {
4834     /* check signature */
4835     struct TransportValidationPS tvp = {
4836       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
4837       .purpose.size = htonl (sizeof (tvp)),
4838       .validity_duration = tvr->validity_duration,
4839       .challenge = tvr->challenge
4840     };
4841
4842     if (GNUNET_OK !=
4843         GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE,
4844                                     &tvp.purpose,
4845                                     &tvr->signature,
4846                                     &cmc->im.sender.public_key))
4847     {
4848       GNUNET_break_op (0);
4849       finish_cmc_handling (cmc);
4850       return;
4851     }
4852   }
4853
4854   /* validity is capped by our willingness to keep track of the
4855      validation entry and the maximum the other peer allows */
4856   vs->valid_until
4857     = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_min (GNUNET_TIME_relative_ntoh (tvr->validity_duration),
4858                                                                   MAX_ADDRESS_VALID_UNTIL));
4859   vs->validated_until
4860     = GNUNET_TIME_absolute_min (vs->valid_until,
4861                                 GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME));
4862   vs->validation_rtt = GNUNET_TIME_absolute_get_duration (origin_time);
4863   vs->challenge_backoff = GNUNET_TIME_UNIT_ZERO;
4864   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
4865                               &vs->challenge,
4866                               sizeof (vs->challenge));
4867   vs->first_challenge_use = GNUNET_TIME_absolute_subtract (vs->validated_until,
4868                                                            GNUNET_TIME_relative_multiply (vs->validation_rtt,
4869                                                                                           VALIDATION_RTT_BUFFER_FACTOR));
4870   vs->last_challenge_use = GNUNET_TIME_UNIT_ZERO_ABS; /* challenge was not yet used */
4871   update_next_challenge_time (vs,
4872                               vs->first_challenge_use);
4873   vs->sc = GNUNET_PEERSTORE_store (peerstore,
4874                                    "transport",
4875                                    &cmc->im.sender,
4876                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
4877                                    vs->address,
4878                                    strlen (vs->address) + 1,
4879                                    vs->valid_until,
4880                                    GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
4881                                    &peerstore_store_validation_cb,
4882                                    vs);
4883   // FIXME: should we find the matching queue and update the RTT?
4884   finish_cmc_handling (cmc);
4885 }
4886
4887
4888 /**
4889  * Incoming meessage.  Process the request.
4890  *
4891  * @param im the send message that was received
4892  */
4893 static void
4894 handle_incoming_msg (void *cls,
4895                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
4896 {
4897   struct TransportClient *tc = cls;
4898   struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
4899
4900   cmc->tc = tc;
4901   cmc->im = *im;
4902   demultiplex_with_cmc (cmc,
4903                         (const struct GNUNET_MessageHeader *) &im[1]);
4904 }
4905
4906
4907 /**
4908  * Given an inbound message @a msg from a communicator @a cmc,
4909  * demultiplex it based on the type calling the right handler.
4910  *
4911  * @param cmc context for demultiplexing
4912  * @param msg message to demultiplex
4913  */
4914 static void
4915 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
4916                       const struct GNUNET_MessageHeader *msg)
4917 {
4918   struct GNUNET_MQ_MessageHandler handlers[] = {
4919     GNUNET_MQ_hd_var_size (fragment_box,
4920                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
4921                            struct TransportFragmentBox,
4922                            &cmc),
4923     GNUNET_MQ_hd_fixed_size (fragment_ack,
4924                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
4925                              struct TransportFragmentAckMessage,
4926                              &cmc),
4927     GNUNET_MQ_hd_var_size (reliability_box,
4928                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
4929                            struct TransportReliabilityBox,
4930                            &cmc),
4931     GNUNET_MQ_hd_fixed_size (reliability_ack,
4932                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
4933                              struct TransportReliabilityAckMessage,
4934                              &cmc),
4935     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
4936                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
4937                            struct TransportBackchannelEncapsulationMessage,
4938                            &cmc),
4939     GNUNET_MQ_hd_var_size (dv_learn,
4940                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
4941                            struct TransportDVLearn,
4942                            &cmc),
4943     GNUNET_MQ_hd_var_size (dv_box,
4944                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
4945                            struct TransportDVBox,
4946                            &cmc),
4947     GNUNET_MQ_hd_fixed_size (validation_challenge,
4948                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
4949                              struct TransportValidationChallenge,
4950                              &cmc),
4951     GNUNET_MQ_hd_fixed_size (validation_response,
4952                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
4953                              struct TransportValidationResponse,
4954                              &cmc),
4955     GNUNET_MQ_handler_end()
4956   };
4957   int ret;
4958
4959   ret = GNUNET_MQ_handle_message (handlers,
4960                                   msg);
4961   if (GNUNET_SYSERR == ret)
4962   {
4963     GNUNET_break (0);
4964     GNUNET_SERVICE_client_drop (cmc->tc->client);
4965     GNUNET_free (cmc);
4966     return;
4967   }
4968   if (GNUNET_NO == ret)
4969   {
4970     /* unencapsulated 'raw' message */
4971     handle_raw_message (&cmc,
4972                         msg);
4973   }
4974 }
4975
4976
4977 /**
4978  * New queue became available.  Check message.
4979  *
4980  * @param cls the client
4981  * @param aqm the send message that was sent
4982  */
4983 static int
4984 check_add_queue_message (void *cls,
4985                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
4986 {
4987   struct TransportClient *tc = cls;
4988
4989   if (CT_COMMUNICATOR != tc->type)
4990   {
4991     GNUNET_break (0);
4992     return GNUNET_SYSERR;
4993   }
4994   GNUNET_MQ_check_zero_termination (aqm);
4995   return GNUNET_OK;
4996 }
4997
4998
4999 /**
5000  * Bandwidth tracker informs us that the delay until we should receive
5001  * more has changed.
5002  *
5003  * @param cls a `struct Queue` for which the delay changed
5004  */
5005 static void
5006 tracker_update_in_cb (void *cls)
5007 {
5008   struct Queue *queue = cls;
5009   struct GNUNET_TIME_Relative in_delay;
5010   unsigned int rsize;
5011
5012   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
5013   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
5014                                                  rsize);
5015   // FIXME: how exactly do we do inbound flow control?
5016 }
5017
5018
5019 /**
5020  * If necessary, generates the UUID for a @a pm
5021  *
5022  * @param pm pending message to generate UUID for.
5023  */
5024 static void
5025 set_pending_message_uuid (struct PendingMessage *pm)
5026 {
5027   if (pm->msg_uuid_set)
5028     return;
5029   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
5030                               &pm->msg_uuid,
5031                               sizeof (pm->msg_uuid));
5032   pm->msg_uuid_set = GNUNET_YES;
5033 }
5034
5035
5036 /**
5037  * Fragment the given @a pm to the given @a mtu.  Adds
5038  * additional fragments to the neighbour as well. If the
5039  * @a mtu is too small, generates and error for the @a pm
5040  * and returns NULL.
5041  *
5042  * @param pm pending message to fragment for transmission
5043  * @param mtu MTU to apply
5044  * @return new message to transmit
5045  */
5046 static struct PendingMessage *
5047 fragment_message (struct PendingMessage *pm,
5048                   uint16_t mtu)
5049 {
5050   struct PendingMessage *ff;
5051
5052   set_pending_message_uuid (pm);
5053
5054   /* This invariant is established in #handle_add_queue_message() */
5055   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
5056
5057   /* select fragment for transmission, descending the tree if it has
5058      been expanded until we are at a leaf or at a fragment that is small enough */
5059   ff = pm;
5060   while ( ( (ff->bytes_msg > mtu) ||
5061             (pm == ff) ) &&
5062           (ff->frag_off == ff->bytes_msg) &&
5063           (NULL != ff->head_frag) )
5064   {
5065     ff = ff->head_frag; /* descent into fragmented fragments */
5066   }
5067
5068   if ( ( (ff->bytes_msg > mtu) ||
5069          (pm == ff) ) &&
5070        (pm->frag_off < pm->bytes_msg) )
5071   {
5072     /* Did not yet calculate all fragments, calculate next fragment */
5073     struct PendingMessage *frag;
5074     struct TransportFragmentBox tfb;
5075     const char *orig;
5076     char *msg;
5077     uint16_t fragmax;
5078     uint16_t fragsize;
5079     uint16_t msize;
5080     uint16_t xoff = 0;
5081
5082     orig = (const char *) &ff[1];
5083     msize = ff->bytes_msg;
5084     if (pm != ff)
5085     {
5086       const struct TransportFragmentBox *tfbo;
5087
5088       tfbo = (const struct TransportFragmentBox *) orig;
5089       orig += sizeof (struct TransportFragmentBox);
5090       msize -= sizeof (struct TransportFragmentBox);
5091       xoff = ntohs (tfbo->frag_off);
5092     }
5093     fragmax = mtu - sizeof (struct TransportFragmentBox);
5094     fragsize = GNUNET_MIN (msize - ff->frag_off,
5095                            fragmax);
5096     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
5097                           sizeof (struct TransportFragmentBox) +
5098                           fragsize);
5099     frag->target = pm->target;
5100     frag->frag_parent = ff;
5101     frag->timeout = pm->timeout;
5102     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
5103     frag->pmt = PMT_FRAGMENT_BOX;
5104     msg = (char *) &frag[1];
5105     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
5106     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
5107                              fragsize);
5108     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
5109     tfb.msg_uuid = pm->msg_uuid;
5110     tfb.frag_off = htons (ff->frag_off + xoff);
5111     tfb.msg_size = htons (pm->bytes_msg);
5112     memcpy (msg,
5113             &tfb,
5114             sizeof (tfb));
5115     memcpy (&msg[sizeof (tfb)],
5116             &orig[ff->frag_off],
5117             fragsize);
5118     GNUNET_CONTAINER_MDLL_insert (frag,
5119                                   ff->head_frag,
5120                                   ff->tail_frag,
5121                                   frag);
5122     ff->frag_off += fragsize;
5123     ff = frag;
5124   }
5125
5126   /* Move head to the tail and return it */
5127   GNUNET_CONTAINER_MDLL_remove (frag,
5128                                 ff->frag_parent->head_frag,
5129                                 ff->frag_parent->tail_frag,
5130                                 ff);
5131   GNUNET_CONTAINER_MDLL_insert_tail (frag,
5132                                      ff->frag_parent->head_frag,
5133                                      ff->frag_parent->tail_frag,
5134                                      ff);
5135   return ff;
5136 }
5137
5138
5139 /**
5140  * Reliability-box the given @a pm. On error (can there be any), NULL
5141  * may be returned, otherwise the "replacement" for @a pm (which
5142  * should then be added to the respective neighbour's queue instead of
5143  * @a pm).  If the @a pm is already fragmented or reliability boxed,
5144  * or itself an ACK, this function simply returns @a pm.
5145  *
5146  * @param pm pending message to box for transmission over unreliabile queue
5147  * @return new message to transmit
5148  */
5149 static struct PendingMessage *
5150 reliability_box_message (struct PendingMessage *pm)
5151 {
5152   struct TransportReliabilityBox rbox;
5153   struct PendingMessage *bpm;
5154   char *msg;
5155
5156   if (PMT_CORE != pm->pmt)
5157     return pm;  /* already fragmented or reliability boxed, or control message: do nothing */
5158   if (NULL != pm->bpm)
5159     return pm->bpm; /* already computed earlier: do nothing */
5160   GNUNET_assert (NULL == pm->head_frag);
5161   if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
5162   {
5163     /* failed hard */
5164     GNUNET_break (0);
5165     client_send_response (pm,
5166                           GNUNET_NO,
5167                           0);
5168     return NULL;
5169   }
5170   bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
5171                        sizeof (rbox) +
5172                        pm->bytes_msg);
5173   bpm->target = pm->target;
5174   bpm->frag_parent = pm;
5175   GNUNET_CONTAINER_MDLL_insert (frag,
5176                                 pm->head_frag,
5177                                 pm->tail_frag,
5178                                 bpm);
5179   bpm->timeout = pm->timeout;
5180   bpm->pmt = PMT_RELIABILITY_BOX;
5181   bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
5182   set_pending_message_uuid (bpm);
5183   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
5184   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
5185   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
5186   rbox.msg_uuid = pm->msg_uuid;
5187   msg = (char *) &bpm[1];
5188   memcpy (msg,
5189           &rbox,
5190           sizeof (rbox));
5191   memcpy (&msg[sizeof (rbox)],
5192           &pm[1],
5193           pm->bytes_msg);
5194   pm->bpm = bpm;
5195   return bpm;
5196 }
5197
5198
5199 /**
5200  * We believe we are ready to transmit a message on a queue. Double-checks
5201  * with the queue's "tracker_out" and then gives the message to the
5202  * communicator for transmission (updating the tracker, and re-scheduling
5203  * itself if applicable).
5204  *
5205  * @param cls the `struct Queue` to process transmissions for
5206  */
5207 static void
5208 transmit_on_queue (void *cls)
5209 {
5210   struct Queue *queue = cls;
5211   struct Neighbour *n = queue->neighbour;
5212   struct PendingMessage *pm;
5213   struct PendingMessage *s;
5214   uint32_t overhead;
5215   struct GNUNET_TRANSPORT_SendMessageTo *smt;
5216   struct GNUNET_MQ_Envelope *env;
5217
5218   queue->transmit_task = NULL;
5219   if (NULL == (pm = n->pending_msg_head))
5220   {
5221     /* no message pending, nothing to do here! */
5222     return;
5223   }
5224   schedule_transmit_on_queue (queue);
5225   if (NULL != queue->transmit_task)
5226     return; /* do it later */
5227   overhead = 0;
5228   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
5229     overhead += sizeof (struct TransportReliabilityBox);
5230   s = pm;
5231   if ( ( (0 != queue->mtu) &&
5232          (pm->bytes_msg + overhead > queue->mtu) ) ||
5233        (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
5234        (NULL != pm->head_frag /* fragments already exist, should
5235                                  respect that even if MTU is 0 for
5236                                  this queue */) )
5237     s = fragment_message (s,
5238                           (0 == queue->mtu)
5239                           ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
5240                           : queue->mtu);
5241   if (NULL == s)
5242   {
5243     /* Fragmentation failed, try next message... */
5244     schedule_transmit_on_queue (queue);
5245     return;
5246   }
5247   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
5248     s = reliability_box_message (s);
5249   if (NULL == s)
5250   {
5251     /* Reliability boxing failed, try next message... */
5252     schedule_transmit_on_queue (queue);
5253     return;
5254   }
5255
5256   /* Pass 's' for transission to the communicator */
5257   env = GNUNET_MQ_msg_extra (smt,
5258                              s->bytes_msg,
5259                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
5260   smt->qid = queue->qid;
5261   smt->mid = queue->mid_gen;
5262   smt->receiver = n->pid;
5263   memcpy (&smt[1],
5264           &s[1],
5265           s->bytes_msg);
5266   {
5267     /* Pass the env to the communicator of queue for transmission. */
5268     struct QueueEntry *qe;
5269
5270     qe = GNUNET_new (struct QueueEntry);
5271     qe->mid = queue->mid_gen++;
5272     qe->queue = queue;
5273     // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
5274     GNUNET_CONTAINER_DLL_insert (queue->queue_head,
5275                                  queue->queue_tail,
5276                                  qe);
5277     GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
5278     queue->queue_length++;
5279     queue->tc->details.communicator.total_queue_length++;
5280     GNUNET_MQ_send (queue->tc->mq,
5281                     env);
5282   }
5283
5284   // FIXME: do something similar to the logic below
5285   // in defragmentation / reliability ACK handling!
5286
5287   /* Check if this transmission somehow conclusively finished handing 'pm'
5288      even without any explicit ACKs */
5289   if ( (PMT_CORE == s->pmt) &&
5290        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
5291   {
5292     /* Full message sent, and over reliabile channel */
5293     client_send_response (pm,
5294                           GNUNET_YES,
5295                           pm->bytes_msg);
5296   }
5297   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
5298             (PMT_FRAGMENT_BOX == s->pmt) )
5299   {
5300     struct PendingMessage *pos;
5301
5302     /* Fragment sent over reliabile channel */
5303     free_fragment_tree (s);
5304     pos = s->frag_parent;
5305     GNUNET_CONTAINER_MDLL_remove (frag,
5306                                   pos->head_frag,
5307                                   pos->tail_frag,
5308                                   s);
5309     GNUNET_free (s);
5310     /* check if subtree is done */
5311     while ( (NULL == pos->head_frag) &&
5312             (pos->frag_off == pos->bytes_msg) &&
5313             (pos != pm) )
5314     {
5315       s = pos;
5316       pos = s->frag_parent;
5317       GNUNET_CONTAINER_MDLL_remove (frag,
5318                                     pos->head_frag,
5319                                     pos->tail_frag,
5320                                     s);
5321       GNUNET_free (s);
5322     }
5323
5324     /* Was this the last applicable fragmment? */
5325     if ( (NULL == pm->head_frag) &&
5326          (pm->frag_off == pm->bytes_msg) )
5327       client_send_response (pm,
5328                             GNUNET_YES,
5329                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
5330   }
5331   else if (PMT_CORE != pm->pmt)
5332   {
5333     /* This was an acknowledgement of some type, always free */
5334     free_pending_message (pm);
5335   }
5336   else
5337   {
5338     /* message not finished, waiting for acknowledgement */
5339     struct Neighbour *neighbour = pm->target;
5340     /* Update time by which we might retransmit 's' based on queue
5341        characteristics (i.e. RTT); it takes one RTT for the message to
5342        arrive and the ACK to come back in the best case; but the other
5343        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
5344        retransmitting.  Note that in the future this heuristic should
5345        likely be improved further (measure RTT stability, consider
5346        message urgency and size when delaying ACKs, etc.) */
5347     s->next_attempt = GNUNET_TIME_relative_to_absolute
5348       (GNUNET_TIME_relative_multiply (queue->rtt,
5349                                       4));
5350     if (s == pm)
5351     {
5352       struct PendingMessage *pos;
5353
5354       /* re-insert sort in neighbour list */
5355       GNUNET_CONTAINER_MDLL_remove (neighbour,
5356                                     neighbour->pending_msg_head,
5357                                     neighbour->pending_msg_tail,
5358                                     pm);
5359       pos = neighbour->pending_msg_tail;
5360       while ( (NULL != pos) &&
5361               (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
5362         pos = pos->prev_neighbour;
5363       GNUNET_CONTAINER_MDLL_insert_after (neighbour,
5364                                           neighbour->pending_msg_head,
5365                                           neighbour->pending_msg_tail,
5366                                           pos,
5367                                           pm);
5368     }
5369     else
5370     {
5371       /* re-insert sort in fragment list */
5372       struct PendingMessage *fp = s->frag_parent;
5373       struct PendingMessage *pos;
5374
5375       GNUNET_CONTAINER_MDLL_remove (frag,
5376                                     fp->head_frag,
5377                                     fp->tail_frag,
5378                                     s);
5379       pos = fp->tail_frag;
5380       while ( (NULL != pos) &&
5381               (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
5382         pos = pos->prev_frag;
5383       GNUNET_CONTAINER_MDLL_insert_after (frag,
5384                                           fp->head_frag,
5385                                           fp->tail_frag,
5386                                           pos,
5387                                           s);
5388     }
5389   }
5390
5391   /* finally, re-schedule queue transmission task itself */
5392   schedule_transmit_on_queue (queue);
5393 }
5394
5395
5396 /**
5397  * Bandwidth tracker informs us that the delay until we
5398  * can transmit again changed.
5399  *
5400  * @param cls a `struct Queue` for which the delay changed
5401  */
5402 static void
5403 tracker_update_out_cb (void *cls)
5404 {
5405   struct Queue *queue = cls;
5406   struct Neighbour *n = queue->neighbour;
5407
5408   if (NULL == n->pending_msg_head)
5409   {
5410     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5411                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
5412                 queue->address);
5413     return; /* no message pending, nothing to do here! */
5414   }
5415   GNUNET_SCHEDULER_cancel (queue->transmit_task);
5416   queue->transmit_task = NULL;
5417   schedule_transmit_on_queue (queue);
5418 }
5419
5420
5421 /**
5422  * Bandwidth tracker informs us that excessive outbound bandwidth was
5423  * allocated which is not being used.
5424  *
5425  * @param cls a `struct Queue` for which the excess was noted
5426  */
5427 static void
5428 tracker_excess_out_cb (void *cls)
5429 {
5430   (void) cls;
5431
5432   /* FIXME: trigger excess bandwidth report to core? Right now,
5433      this is done internally within transport_api2_core already,
5434      but we probably want to change the logic and trigger it
5435      from here via a message instead! */
5436   /* TODO: maybe inform someone at this point? */
5437   GNUNET_STATISTICS_update (GST_stats,
5438                             "# Excess outbound bandwidth reported",
5439                             1,
5440                             GNUNET_NO);
5441 }
5442
5443
5444
5445 /**
5446  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
5447  * which is not being used.
5448  *
5449  * @param cls a `struct Queue` for which the excess was noted
5450  */
5451 static void
5452 tracker_excess_in_cb (void *cls)
5453 {
5454   (void) cls;
5455
5456   /* TODO: maybe inform somone at this point? */
5457   GNUNET_STATISTICS_update (GST_stats,
5458                             "# Excess inbound bandwidth reported",
5459                             1,
5460                             GNUNET_NO);
5461 }
5462
5463
5464 /**
5465  * Queue to a peer went down.  Process the request.
5466  *
5467  * @param cls the client
5468  * @param dqm the send message that was sent
5469  */
5470 static void
5471 handle_del_queue_message (void *cls,
5472                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
5473 {
5474   struct TransportClient *tc = cls;
5475
5476   if (CT_COMMUNICATOR != tc->type)
5477   {
5478     GNUNET_break (0);
5479     GNUNET_SERVICE_client_drop (tc->client);
5480     return;
5481   }
5482   for (struct Queue *queue = tc->details.communicator.queue_head;
5483        NULL != queue;
5484        queue = queue->next_client)
5485   {
5486     struct Neighbour *neighbour = queue->neighbour;
5487
5488     if ( (dqm->qid != queue->qid) ||
5489          (0 != GNUNET_memcmp (&dqm->receiver,
5490                               &neighbour->pid)) )
5491       continue;
5492     free_queue (queue);
5493     GNUNET_SERVICE_client_continue (tc->client);
5494     return;
5495   }
5496   GNUNET_break (0);
5497   GNUNET_SERVICE_client_drop (tc->client);
5498 }
5499
5500
5501 /**
5502  * Message was transmitted.  Process the request.
5503  *
5504  * @param cls the client
5505  * @param sma the send message that was sent
5506  */
5507 static void
5508 handle_send_message_ack (void *cls,
5509                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
5510 {
5511   struct TransportClient *tc = cls;
5512   struct QueueEntry *qe;
5513
5514   if (CT_COMMUNICATOR != tc->type)
5515   {
5516     GNUNET_break (0);
5517     GNUNET_SERVICE_client_drop (tc->client);
5518     return;
5519   }
5520
5521   /* find our queue entry matching the ACK */
5522   qe = NULL;
5523   for (struct Queue *queue = tc->details.communicator.queue_head;
5524        NULL != queue;
5525        queue = queue->next_client)
5526   {
5527     if (0 != GNUNET_memcmp (&queue->neighbour->pid,
5528                             &sma->receiver))
5529       continue;
5530     for (struct QueueEntry *qep = queue->queue_head;
5531          NULL != qep;
5532          qep = qep->next)
5533     {
5534       if (qep->mid != sma->mid)
5535         continue;
5536       qe = qep;
5537       break;
5538     }
5539     break;
5540   }
5541   if (NULL == qe)
5542   {
5543     /* this should never happen */
5544     GNUNET_break (0);
5545     GNUNET_SERVICE_client_drop (tc->client);
5546     return;
5547   }
5548   GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
5549                                qe->queue->queue_tail,
5550                                qe);
5551   qe->queue->queue_length--;
5552   tc->details.communicator.total_queue_length--;
5553   GNUNET_SERVICE_client_continue (tc->client);
5554
5555   /* if applicable, resume transmissions that waited on ACK */
5556   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
5557   {
5558     /* Communicator dropped below threshold, resume all queues */
5559     GNUNET_STATISTICS_update (GST_stats,
5560                               "# Transmission throttled due to communicator queue limit",
5561                               -1,
5562                               GNUNET_NO);
5563     for (struct Queue *queue = tc->details.communicator.queue_head;
5564          NULL != queue;
5565          queue = queue->next_client)
5566       schedule_transmit_on_queue (queue);
5567   }
5568   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
5569   {
5570     /* queue dropped below threshold; only resume this one queue */
5571     GNUNET_STATISTICS_update (GST_stats,
5572                               "# Transmission throttled due to queue queue limit",
5573                               -1,
5574                               GNUNET_NO);
5575     schedule_transmit_on_queue (qe->queue);
5576   }
5577
5578   /* TODO: we also should react on the status! */
5579   // FIXME: this probably requires queue->pm = s assignment!
5580   // FIXME: react to communicator status about transmission request. We got:
5581   sma->status; // OK success, SYSERR failure
5582
5583   GNUNET_free (qe);
5584 }
5585
5586
5587 /**
5588  * Iterator telling new MONITOR client about all existing
5589  * queues to peers.
5590  *
5591  * @param cls the new `struct TransportClient`
5592  * @param pid a connected peer
5593  * @param value the `struct Neighbour` with more information
5594  * @return #GNUNET_OK (continue to iterate)
5595  */
5596 static int
5597 notify_client_queues (void *cls,
5598                       const struct GNUNET_PeerIdentity *pid,
5599                       void *value)
5600 {
5601   struct TransportClient *tc = cls;
5602   struct Neighbour *neighbour = value;
5603
5604   GNUNET_assert (CT_MONITOR == tc->type);
5605   for (struct Queue *q = neighbour->queue_head;
5606        NULL != q;
5607        q = q->next_neighbour)
5608   {
5609     struct MonitorEvent me = {
5610       .rtt = q->rtt,
5611       .cs = q->cs,
5612       .num_msg_pending = q->num_msg_pending,
5613       .num_bytes_pending = q->num_bytes_pending
5614     };
5615
5616     notify_monitor (tc,
5617                     pid,
5618                     q->address,
5619                     q->nt,
5620                     &me);
5621   }
5622   return GNUNET_OK;
5623 }
5624
5625
5626 /**
5627  * Initialize a monitor client.
5628  *
5629  * @param cls the client
5630  * @param start the start message that was sent
5631  */
5632 static void
5633 handle_monitor_start (void *cls,
5634                       const struct GNUNET_TRANSPORT_MonitorStart *start)
5635 {
5636   struct TransportClient *tc = cls;
5637
5638   if (CT_NONE != tc->type)
5639   {
5640     GNUNET_break (0);
5641     GNUNET_SERVICE_client_drop (tc->client);
5642     return;
5643   }
5644   tc->type = CT_MONITOR;
5645   tc->details.monitor.peer = start->peer;
5646   tc->details.monitor.one_shot = ntohl (start->one_shot);
5647   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
5648                                          &notify_client_queues,
5649                                          tc);
5650   GNUNET_SERVICE_client_mark_monitor (tc->client);
5651   GNUNET_SERVICE_client_continue (tc->client);
5652 }
5653
5654
5655 /**
5656  * Find transport client providing communication service
5657  * for the protocol @a prefix.
5658  *
5659  * @param prefix communicator name
5660  * @return NULL if no such transport client is available
5661  */
5662 static struct TransportClient *
5663 lookup_communicator (const char *prefix)
5664 {
5665   for (struct TransportClient *tc = clients_head;
5666        NULL != tc;
5667        tc = tc->next)
5668   {
5669     if (CT_COMMUNICATOR != tc->type)
5670       continue;
5671     if (0 == strcmp (prefix,
5672                      tc->details.communicator.address_prefix))
5673       return tc;
5674   }
5675   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
5676               "Somone suggested use of communicator for `%s', but we do not have such a communicator!\n",
5677               prefix);
5678   return NULL;
5679 }
5680
5681
5682 /**
5683  * Signature of a function called with a communicator @a address of a peer
5684  * @a pid that an application wants us to connect to.
5685  *
5686  * @param pid target peer
5687  * @param address the address to try
5688  */
5689 static void
5690 suggest_to_connect (const struct GNUNET_PeerIdentity *pid,
5691                     const char *address)
5692 {
5693   static uint32_t idgen;
5694   struct TransportClient *tc;
5695   char *prefix;
5696   struct GNUNET_TRANSPORT_CreateQueue *cqm;
5697   struct GNUNET_MQ_Envelope *env;
5698   size_t alen;
5699
5700   prefix = GNUNET_HELLO_address_to_prefix (address);
5701   if (NULL == prefix)
5702   {
5703     GNUNET_break (0); /* We got an invalid address!? */
5704     return;
5705   }
5706   tc = lookup_communicator (prefix);
5707   if (NULL == tc)
5708   {
5709     GNUNET_STATISTICS_update (GST_stats,
5710                               "# Suggestions ignored due to missing communicator",
5711                               1,
5712                               GNUNET_NO);
5713     return;
5714   }
5715   /* forward suggestion for queue creation to communicator */
5716   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5717               "Request #%u for `%s' communicator to create queue to `%s'\n",
5718               (unsigned int) idgen,
5719               prefix,
5720               address);
5721   alen = strlen (address) + 1;
5722   env = GNUNET_MQ_msg_extra (cqm,
5723                              alen,
5724                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
5725   cqm->request_id = htonl (idgen++);
5726   cqm->receiver = *pid;
5727   memcpy (&cqm[1],
5728           address,
5729           alen);
5730   GNUNET_MQ_send (tc->mq,
5731                   env);
5732 }
5733
5734
5735 /**
5736  * The queue @a q (which matches the peer and address in @a vs) is
5737  * ready for queueing. We should now queue the validation request.
5738  *
5739  * @param q queue to send on
5740  * @param vs state to derive validation challenge from
5741  */
5742 static void
5743 validation_transmit_on_queue (struct Queue *q,
5744                               struct ValidationState *vs)
5745 {
5746   struct GNUNET_MQ_Envelope *env;
5747   struct TransportValidationChallenge *tvc;
5748
5749   vs->last_challenge_use = GNUNET_TIME_absolute_get ();
5750   env = GNUNET_MQ_msg (tvc,
5751                        GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
5752   tvc->reserved = htonl (0);
5753   tvc->challenge = vs->challenge;
5754   tvc->sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
5755   // FIXME: not so easy, need to BOX this message
5756   // in a transmission request! (mistake also done elsewhere!)
5757   GNUNET_MQ_send (q->tc->mq,
5758                   env);
5759 }
5760
5761
5762 /**
5763  * Task run periodically to validate some address based on #validation_heap.
5764  *
5765  * @param cls NULL
5766  */
5767 static void
5768 validation_start_cb (void *cls)
5769 {
5770   struct ValidationState *vs;
5771   struct Neighbour *n;
5772   struct Queue *q;
5773
5774   (void) cls;
5775   validation_task = NULL;
5776   vs = GNUNET_CONTAINER_heap_peek (validation_heap);
5777   /* drop validations past their expiration */
5778   while ( (NULL != vs) &&
5779           (0 == GNUNET_TIME_absolute_get_remaining (vs->valid_until).rel_value_us) )
5780   {
5781     free_validation_state (vs);
5782     vs = GNUNET_CONTAINER_heap_peek (validation_heap);
5783   }
5784   if (NULL == vs)
5785     return; /* woopsie, no more addresses known, should only
5786                happen if we're really a lonely peer */
5787   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
5788                                          &vs->pid);
5789   q = NULL;
5790   if (NULL != n)
5791   {
5792     for (struct Queue *pos = n->queue_head;
5793          NULL != pos;
5794          pos = pos->next_neighbour)
5795     {
5796       if (0 == strcmp (pos->address,
5797                        vs->address))
5798       {
5799         q = pos;
5800         break;
5801       }
5802     }
5803   }
5804   if (NULL == q)
5805   {
5806     vs->awaiting_queue = GNUNET_YES;
5807     suggest_to_connect (&vs->pid,
5808                         vs->address);
5809   }
5810   else
5811     validation_transmit_on_queue (q,
5812                                   vs);
5813   /* Finally, reschedule next attempt */
5814   vs->challenge_backoff = GNUNET_TIME_randomized_backoff (vs->challenge_backoff,
5815                                                           MAX_VALIDATION_CHALLENGE_FREQ);
5816   update_next_challenge_time (vs,
5817                               GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
5818 }
5819
5820
5821 /**
5822  * Closure for #check_connection_quality.
5823  */
5824 struct QueueQualityContext
5825 {
5826   /**
5827    * Set to the @e k'th queue encountered.
5828    */
5829   struct Queue *q;
5830
5831   /**
5832    * Set to the number of quality queues encountered.
5833    */
5834   unsigned int quality_count;
5835
5836   /**
5837    * Set to the total number of queues encountered.
5838    */
5839   unsigned int num_queues;
5840
5841   /**
5842    * Decremented for each queue, for selection of the
5843    * k-th queue in @e q.
5844    */
5845   unsigned int k;
5846
5847 };
5848
5849
5850 /**
5851  * Check whether any queue to the given neighbour is
5852  * of a good "quality" and if so, increment the counter.
5853  * Also counts the total number of queues, and returns
5854  * the k-th queue found.
5855  *
5856  * @param cls a `struct QueueQualityContext *` with counters
5857  * @param pid peer this is about
5858  * @param value a `struct Neighbour`
5859  * @return #GNUNET_OK (continue to iterate)
5860  */
5861 static int
5862 check_connection_quality (void *cls,
5863                           const struct GNUNET_PeerIdentity *pid,
5864                           void *value)
5865 {
5866   struct QueueQualityContext *ctx = cls;
5867   struct Neighbour *n = value;
5868   int do_inc;
5869
5870   (void) pid;
5871   do_inc = GNUNET_NO;
5872   for (struct Queue *q = n->queue_head;
5873        NULL != q;
5874        q = q->next_neighbour)
5875   {
5876     if (0 != q->distance)
5877       continue; /* DV does not count */
5878     ctx->num_queues++;
5879     if (0 == ctx->k--)
5880       ctx->q = q;
5881     /* OPTIMIZE-FIXME: in the future, add reliability / goodput
5882        statistics and consider those as well here? */
5883     if (q->rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
5884       do_inc = GNUNET_YES;
5885   }
5886   if (GNUNET_YES == do_inc)
5887     ctx->quality_count++;
5888   return GNUNET_OK;
5889 }
5890
5891
5892 /**
5893  * Task run when we CONSIDER initiating a DV learn
5894  * process. We first check that sending out a message is
5895  * even possible (queues exist), then that it is desirable
5896  * (if not, reschedule the task for later), and finally
5897  * we may then begin the job.  If there are too many
5898  * entries in the #dvlearn_map, we purge the oldest entry
5899  * using #lle_tail.
5900  *
5901  * @param cls NULL
5902  */
5903 static void
5904 start_dv_learn (void *cls)
5905 {
5906   struct LearnLaunchEntry *lle;
5907   struct QueueQualityContext qqc;
5908   struct GNUNET_MQ_Envelope *env;
5909   struct TransportDVLearn *dvl;
5910
5911   (void) cls;
5912   dvlearn_task = NULL;
5913   if (0 ==
5914       GNUNET_CONTAINER_multipeermap_size (neighbours))
5915     return; /* lost all connectivity, cannot do learning */
5916   qqc.quality_count = 0;
5917   qqc.num_queues = 0;
5918   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
5919                                          &check_connection_quality,
5920                                          &qqc);
5921   if (qqc.quality_count > DV_LEARN_QUALITY_THRESHOLD)
5922   {
5923     struct GNUNET_TIME_Relative delay;
5924     unsigned int factor;
5925
5926     /* scale our retries by how far we are above the threshold */
5927     factor = qqc.quality_count / DV_LEARN_QUALITY_THRESHOLD;
5928     delay = GNUNET_TIME_relative_multiply (DV_LEARN_BASE_FREQUENCY,
5929                                            factor);
5930     dvlearn_task = GNUNET_SCHEDULER_add_delayed (delay,
5931                                                  &start_dv_learn,
5932                                                  NULL);
5933     return;
5934   }
5935   /* remove old entries in #dvlearn_map if it has grown too big */
5936   while (MAX_DV_LEARN_PENDING >=
5937          GNUNET_CONTAINER_multishortmap_size (dvlearn_map))
5938   {
5939     lle = lle_tail;
5940     GNUNET_assert (GNUNET_YES ==
5941                    GNUNET_CONTAINER_multishortmap_remove (dvlearn_map,
5942                                                           &lle->challenge,
5943                                                           lle));
5944     GNUNET_CONTAINER_DLL_remove (lle_head,
5945                                  lle_tail,
5946                                  lle);
5947     GNUNET_free (lle);
5948   }
5949   /* setup data structure for learning */
5950   lle = GNUNET_new (struct LearnLaunchEntry);
5951   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
5952                               &lle->challenge,
5953                               sizeof (lle->challenge));
5954   GNUNET_CONTAINER_DLL_insert (lle_head,
5955                                lle_tail,
5956                                lle);
5957   GNUNET_break (GNUNET_YES ==
5958                 GNUNET_CONTAINER_multishortmap_put (dvlearn_map,
5959                                                     &lle->challenge,
5960                                                     lle,
5961                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
5962   env = GNUNET_MQ_msg (dvl,
5963                        GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
5964   dvl->num_hops = htons (0);
5965   dvl->bidirectional = htons (0);
5966   dvl->non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
5967   {
5968     struct DvInitPS dvip = {
5969       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
5970       .purpose.size = htonl (sizeof (dvip)),
5971       .challenge = lle->challenge
5972     };
5973
5974     GNUNET_assert (GNUNET_OK ==
5975                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
5976                                              &dvip.purpose,
5977                                              &dvl->init_sig));
5978   }
5979   dvl->initiator = GST_my_identity;
5980   dvl->challenge = lle->challenge;
5981
5982   qqc.quality_count = 0;
5983   qqc.k = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
5984                                     qqc.num_queues);
5985   qqc.num_queues = 0;
5986   qqc.q = NULL;
5987   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
5988                                          &check_connection_quality,
5989                                          &qqc);
5990   GNUNET_assert (NULL != qqc.q);
5991
5992   /* Do this as close to transmission time as possible! */
5993   lle->launch_time = GNUNET_TIME_absolute_get ();
5994   // FIXME: not so easy, need to BOX this message
5995   // in a transmission request! (mistake also done elsewhere!)
5996   GNUNET_MQ_send (qqc.q->tc->mq,
5997                   env);
5998
5999   /* reschedule this job, randomizing the time it runs (but no
6000      actual backoff!) */
6001   dvlearn_task
6002     = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY),
6003                                     &start_dv_learn,
6004                                     NULL);
6005 }
6006
6007
6008 /**
6009  * A new queue has been created, check if any address validation
6010  * requests have been waiting for it.
6011  *
6012  * @param cls a `struct Queue`
6013  * @param pid peer concerned (unused)
6014  * @param value a `struct ValidationState`
6015  * @return #GNUNET_NO if a match was found and we can stop looking
6016  */
6017 static int
6018 check_validation_request_pending (void *cls,
6019                                   const struct GNUNET_PeerIdentity *pid,
6020                                   void *value)
6021 {
6022   struct Queue *q = cls;
6023   struct ValidationState *vs = value;
6024
6025   (void) pid;
6026   if ( (GNUNET_YES == vs->awaiting_queue) &&
6027        (0 == strcmp (vs->address,
6028                      q->address)) )
6029   {
6030     vs->awaiting_queue = GNUNET_NO;
6031     validation_transmit_on_queue (q,
6032                                   vs);
6033     return GNUNET_NO;
6034   }
6035   return GNUNET_OK;
6036 }
6037
6038
6039 /**
6040  * New queue became available.  Process the request.
6041  *
6042  * @param cls the client
6043  * @param aqm the send message that was sent
6044  */
6045 static void
6046 handle_add_queue_message (void *cls,
6047                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
6048 {
6049   struct TransportClient *tc = cls;
6050   struct Queue *queue;
6051   struct Neighbour *neighbour;
6052   const char *addr;
6053   uint16_t addr_len;
6054
6055   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
6056   {
6057     /* MTU so small as to be useless for transmissions,
6058        required for #fragment_message()! */
6059     GNUNET_break_op (0);
6060     GNUNET_SERVICE_client_drop (tc->client);
6061     return;
6062   }
6063   neighbour = lookup_neighbour (&aqm->receiver);
6064   if (NULL == neighbour)
6065   {
6066     neighbour = GNUNET_new (struct Neighbour);
6067     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
6068     neighbour->pid = aqm->receiver;
6069     GNUNET_assert (GNUNET_OK ==
6070                    GNUNET_CONTAINER_multipeermap_put (neighbours,
6071                                                       &neighbour->pid,
6072                                                       neighbour,
6073                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6074     cores_send_connect_info (&neighbour->pid,
6075                              GNUNET_BANDWIDTH_ZERO);
6076   }
6077   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
6078   addr = (const char *) &aqm[1];
6079
6080   queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
6081   queue->tc = tc;
6082   queue->address = (const char *) &queue[1];
6083   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
6084   queue->qid = aqm->qid;
6085   queue->mtu = ntohl (aqm->mtu);
6086   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
6087   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
6088   queue->neighbour = neighbour;
6089   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
6090                                   &tracker_update_in_cb,
6091                                   queue,
6092                                   GNUNET_BANDWIDTH_ZERO,
6093                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
6094                                   &tracker_excess_in_cb,
6095                                   queue);
6096   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
6097                                   &tracker_update_out_cb,
6098                                   queue,
6099                                   GNUNET_BANDWIDTH_ZERO,
6100                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
6101                                   &tracker_excess_out_cb,
6102                                   queue);
6103   memcpy (&queue[1],
6104           addr,
6105           addr_len);
6106   /* notify monitors about new queue */
6107   {
6108     struct MonitorEvent me = {
6109       .rtt = queue->rtt,
6110       .cs = queue->cs
6111     };
6112
6113     notify_monitors (&neighbour->pid,
6114                      queue->address,
6115                      queue->nt,
6116                      &me);
6117   }
6118   GNUNET_CONTAINER_MDLL_insert (neighbour,
6119                                 neighbour->queue_head,
6120                                 neighbour->queue_tail,
6121                                 queue);
6122   GNUNET_CONTAINER_MDLL_insert (client,
6123                                 tc->details.communicator.queue_head,
6124                                 tc->details.communicator.queue_tail,
6125                                 queue);
6126   /* check if valdiations are waiting for the queue */
6127   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
6128                                                      &aqm->receiver,
6129                                                      &check_validation_request_pending,
6130                                                      queue);
6131   /* might be our first queue, try launching DV learning */
6132   if (NULL == dvlearn_task)
6133     dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn,
6134                                              NULL);
6135   GNUNET_SERVICE_client_continue (tc->client);
6136 }
6137
6138
6139 /**
6140  * Communicator tells us that our request to create a queue "worked", that
6141  * is setting up the queue is now in process.
6142  *
6143  * @param cls the `struct TransportClient`
6144  * @param cqr confirmation message
6145  */
6146 static void
6147 handle_queue_create_ok (void *cls,
6148                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
6149 {
6150   struct TransportClient *tc = cls;
6151
6152   if (CT_COMMUNICATOR != tc->type)
6153   {
6154     GNUNET_break (0);
6155     GNUNET_SERVICE_client_drop (tc->client);
6156     return;
6157   }
6158   GNUNET_STATISTICS_update (GST_stats,
6159                             "# Suggestions succeeded at communicator",
6160                             1,
6161                             GNUNET_NO);
6162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6163               "Request #%u for communicator to create queue succeeded\n",
6164               (unsigned int) ntohs (cqr->request_id));
6165   GNUNET_SERVICE_client_continue (tc->client);
6166 }
6167
6168
6169 /**
6170  * Communicator tells us that our request to create a queue failed. This usually
6171  * indicates that the provided address is simply invalid or that the communicator's
6172  * resources are exhausted.
6173  *
6174  * @param cls the `struct TransportClient`
6175  * @param cqr failure message
6176  */
6177 static void
6178 handle_queue_create_fail (void *cls,
6179                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
6180 {
6181   struct TransportClient *tc = cls;
6182
6183   if (CT_COMMUNICATOR != tc->type)
6184   {
6185     GNUNET_break (0);
6186     GNUNET_SERVICE_client_drop (tc->client);
6187     return;
6188   }
6189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6190               "Request #%u for communicator to create queue failed\n",
6191               (unsigned int) ntohs (cqr->request_id));
6192   GNUNET_STATISTICS_update (GST_stats,
6193                             "# Suggestions failed in queue creation at communicator",
6194                             1,
6195                             GNUNET_NO);
6196   GNUNET_SERVICE_client_continue (tc->client);
6197 }
6198
6199
6200 /**
6201  * We have received a `struct ExpressPreferenceMessage` from an application client.
6202  *
6203  * @param cls handle to the client
6204  * @param msg the start message
6205  */
6206 static void
6207 handle_suggest_cancel (void *cls,
6208                        const struct ExpressPreferenceMessage *msg)
6209 {
6210   struct TransportClient *tc = cls;
6211   struct PeerRequest *pr;
6212
6213   if (CT_APPLICATION != tc->type)
6214   {
6215     GNUNET_break (0);
6216     GNUNET_SERVICE_client_drop (tc->client);
6217     return;
6218   }
6219   pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests,
6220                                           &msg->peer);
6221   if (NULL == pr)
6222   {
6223     GNUNET_break (0);
6224     GNUNET_SERVICE_client_drop (tc->client);
6225     return;
6226   }
6227   (void) stop_peer_request (tc,
6228                             &pr->pid,
6229                             pr);
6230   GNUNET_SERVICE_client_continue (tc->client);
6231 }
6232
6233
6234 /**
6235  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
6236  * messages. We do nothing here, real verification is done later.
6237  *
6238  * @param cls a `struct TransportClient *`
6239  * @param msg message to verify
6240  * @return #GNUNET_OK
6241  */
6242 static int
6243 check_address_consider_verify (void *cls,
6244                                const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
6245 {
6246   (void) cls;
6247   (void) hdr;
6248   return GNUNET_OK;
6249 }
6250
6251
6252 /**
6253  * Closure for #check_known_address.
6254  */
6255 struct CheckKnownAddressContext
6256 {
6257   /**
6258    * Set to the address we are looking for.
6259    */
6260   const char *address;
6261
6262   /**
6263    * Set to a matching validation state, if one was found.
6264    */
6265   struct ValidationState *vs;
6266 };
6267
6268
6269 /**
6270  * Test if the validation state in @a value matches the
6271  * address from @a cls.
6272  *
6273  * @param cls a `struct CheckKnownAddressContext`
6274  * @param pid unused (must match though)
6275  * @param value a `struct ValidationState`
6276  * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
6277  */
6278 static int
6279 check_known_address (void *cls,
6280                      const struct GNUNET_PeerIdentity *pid,
6281                      void *value)
6282 {
6283   struct CheckKnownAddressContext *ckac = cls;
6284   struct ValidationState *vs = value;
6285
6286   (void) pid;
6287   if (0 != strcmp (vs->address,
6288                    ckac->address))
6289     return GNUNET_OK;
6290   ckac->vs = vs;
6291   return GNUNET_NO;
6292 }
6293
6294
6295 /**
6296  * Start address validation.
6297  *
6298  * @param pid peer the @a address is for
6299  * @param address an address to reach @a pid (presumably)
6300  * @param expiration when did @a pid claim @a address will become invalid
6301  */
6302 static void
6303 start_address_validation (const struct GNUNET_PeerIdentity *pid,
6304                           const char *address,
6305                           struct GNUNET_TIME_Absolute expiration)
6306 {
6307   struct GNUNET_TIME_Absolute now;
6308   struct ValidationState *vs;
6309   struct CheckKnownAddressContext ckac = {
6310     .address = address,
6311     .vs = NULL
6312   };
6313
6314   if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
6315     return; /* expired */
6316   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
6317                                                      pid,
6318                                                      &check_known_address,
6319                                                      &ckac);
6320   if (NULL != (vs = ckac.vs))
6321   {
6322     /* if 'vs' is not currently valid, we need to speed up retrying the validation */
6323     if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
6324     {
6325       /* reduce backoff as we got a fresh advertisement */
6326       vs->challenge_backoff = GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
6327                                                         GNUNET_TIME_relative_divide (vs->challenge_backoff,
6328                                                                                      2));
6329       update_next_challenge_time (vs,
6330                                   GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
6331     }
6332     return;
6333   }
6334   now = GNUNET_TIME_absolute_get();
6335   vs = GNUNET_new (struct ValidationState);
6336   vs->pid = *pid;
6337   vs->valid_until = expiration;
6338   vs->first_challenge_use = now;
6339   vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
6340   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
6341                               &vs->challenge,
6342                               sizeof (vs->challenge));
6343   vs->address = GNUNET_strdup (address);
6344   GNUNET_assert (GNUNET_YES ==
6345                  GNUNET_CONTAINER_multipeermap_put (validation_map,
6346                                                     &vs->pid,
6347                                                     vs,
6348                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6349   update_next_challenge_time (vs,
6350                               now);
6351 }
6352
6353
6354 /**
6355  * Function called by PEERSTORE for each matching record.
6356  *
6357  * @param cls closure
6358  * @param record peerstore record information
6359  * @param emsg error message, or NULL if no errors
6360  */
6361 static void
6362 handle_hello (void *cls,
6363               const struct GNUNET_PEERSTORE_Record *record,
6364               const char *emsg)
6365 {
6366   struct PeerRequest *pr = cls;
6367   const char *val;
6368
6369   if (NULL != emsg)
6370   {
6371     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
6372                 "Got failure from PEERSTORE: %s\n",
6373                 emsg);
6374     return;
6375   }
6376   val = record->value;
6377   if ( (0 == record->value_size) ||
6378        ('\0' != val[record->value_size - 1]) )
6379   {
6380     GNUNET_break (0);
6381     return;
6382   }
6383   start_address_validation (&pr->pid,
6384                             (const char *) record->value,
6385                             record->expiry);
6386 }
6387
6388
6389 /**
6390  * We have received a `struct ExpressPreferenceMessage` from an application client.
6391  *
6392  * @param cls handle to the client
6393  * @param msg the start message
6394  */
6395 static void
6396 handle_suggest (void *cls,
6397                 const struct ExpressPreferenceMessage *msg)
6398 {
6399   struct TransportClient *tc = cls;
6400   struct PeerRequest *pr;
6401
6402   if (CT_NONE == tc->type)
6403   {
6404     tc->type = CT_APPLICATION;
6405     tc->details.application.requests
6406       = GNUNET_CONTAINER_multipeermap_create (16,
6407                                               GNUNET_YES);
6408   }
6409   if (CT_APPLICATION != tc->type)
6410   {
6411     GNUNET_break (0);
6412     GNUNET_SERVICE_client_drop (tc->client);
6413     return;
6414   }
6415   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6416               "Client suggested we talk to %s with preference %d at rate %u\n",
6417               GNUNET_i2s (&msg->peer),
6418               (int) ntohl (msg->pk),
6419               (int) ntohl (msg->bw.value__));
6420   pr = GNUNET_new (struct PeerRequest);
6421   pr->tc = tc;
6422   pr->pid = msg->peer;
6423   pr->bw = msg->bw;
6424   pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk);
6425   if (GNUNET_YES !=
6426       GNUNET_CONTAINER_multipeermap_put (tc->details.application.requests,
6427                                          &pr->pid,
6428                                          pr,
6429                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
6430   {
6431     GNUNET_break (0);
6432     GNUNET_free (pr);
6433     GNUNET_SERVICE_client_drop (tc->client);
6434     return;
6435   }
6436   pr->wc = GNUNET_PEERSTORE_watch (peerstore,
6437                                    "transport",
6438                                    &pr->pid,
6439                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
6440                                    &handle_hello,
6441                                    pr);
6442   GNUNET_SERVICE_client_continue (tc->client);
6443 }
6444
6445
6446 /**
6447  * Given another peers address, consider checking it for validity
6448  * and then adding it to the Peerstore.
6449  *
6450  * @param cls a `struct TransportClient`
6451  * @param hdr message containing the raw address data and
6452  *        signature in the body, see #GNUNET_HELLO_extract_address()
6453  */
6454 static void
6455 handle_address_consider_verify (void *cls,
6456                                 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
6457 {
6458   struct TransportClient *tc = cls;
6459   char *address;
6460   enum GNUNET_NetworkType nt;
6461   struct GNUNET_TIME_Absolute expiration;
6462
6463   (void) cls;
6464   // FIXME: checking that we know this address already should
6465   //        be done BEFORE checking the signature => HELLO API change!
6466   // FIXME: pre-check: rate-limit signature verification / validation?!
6467   address = GNUNET_HELLO_extract_address (&hdr[1],
6468                                           ntohs (hdr->header.size) - sizeof (*hdr),
6469                                           &hdr->peer,
6470                                           &nt,
6471                                           &expiration);
6472   if (NULL == address)
6473   {
6474     GNUNET_break_op (0);
6475     return;
6476   }
6477   start_address_validation (&hdr->peer,
6478                             address,
6479                             expiration);
6480   GNUNET_free (address);
6481   GNUNET_SERVICE_client_continue (tc->client);
6482 }
6483
6484
6485 /**
6486  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_HELLO_VALIDATION
6487  * messages.
6488  *
6489  * @param cls a `struct TransportClient *`
6490  * @param m message to verify
6491  * @return #GNUNET_OK on success
6492  */
6493 static int
6494 check_request_hello_validation (void *cls,
6495                                 const struct RequestHelloValidationMessage *m)
6496 {
6497   (void) cls;
6498   GNUNET_MQ_check_zero_termination (m);
6499   return GNUNET_OK;
6500 }
6501
6502
6503 /**
6504  * A client encountered an address of another peer. Consider validating it,
6505  * and if validation succeeds, persist it to PEERSTORE.
6506  *
6507  * @param cls a `struct TransportClient *`
6508  * @param m message to verify
6509  */
6510 static void
6511 handle_request_hello_validation (void *cls,
6512                                  const struct RequestHelloValidationMessage *m)
6513 {
6514   struct TransportClient *tc = cls;
6515
6516   start_address_validation (&m->peer,
6517                             (const char *) &m[1],
6518                             GNUNET_TIME_absolute_ntoh (m->expiration));
6519   GNUNET_SERVICE_client_continue (tc->client);
6520 }
6521
6522
6523 /**
6524  * Free neighbour entry.
6525  *
6526  * @param cls NULL
6527  * @param pid unused
6528  * @param value a `struct Neighbour`
6529  * @return #GNUNET_OK (always)
6530  */
6531 static int
6532 free_neighbour_cb (void *cls,
6533                    const struct GNUNET_PeerIdentity *pid,
6534                    void *value)
6535 {
6536   struct Neighbour *neighbour = value;
6537
6538   (void) cls;
6539   (void) pid;
6540   GNUNET_break (0); // should this ever happen?
6541   free_neighbour (neighbour);
6542
6543   return GNUNET_OK;
6544 }
6545
6546
6547 /**
6548  * Free DV route entry.
6549  *
6550  * @param cls NULL
6551  * @param pid unused
6552  * @param value a `struct DistanceVector`
6553  * @return #GNUNET_OK (always)
6554  */
6555 static int
6556 free_dv_routes_cb (void *cls,
6557                    const struct GNUNET_PeerIdentity *pid,
6558                    void *value)
6559 {
6560   struct DistanceVector *dv = value;
6561
6562   (void) cls;
6563   (void) pid;
6564   free_dv_route (dv);
6565
6566   return GNUNET_OK;
6567 }
6568
6569
6570 /**
6571  * Free ephemeral entry.
6572  *
6573  * @param cls NULL
6574  * @param pid unused
6575  * @param value a `struct EphemeralCacheEntry`
6576  * @return #GNUNET_OK (always)
6577  */
6578 static int
6579 free_ephemeral_cb (void *cls,
6580                    const struct GNUNET_PeerIdentity *pid,
6581                    void *value)
6582 {
6583   struct EphemeralCacheEntry *ece = value;
6584
6585   (void) cls;
6586   (void) pid;
6587   free_ephemeral (ece);
6588   return GNUNET_OK;
6589 }
6590
6591
6592 /**
6593  * Free validation state.
6594  *
6595  * @param cls NULL
6596  * @param pid unused
6597  * @param value a `struct ValidationState`
6598  * @return #GNUNET_OK (always)
6599  */
6600 static int
6601 free_validation_state_cb (void *cls,
6602                           const struct GNUNET_PeerIdentity *pid,
6603                           void *value)
6604 {
6605   struct ValidationState *vs = value;
6606
6607   (void) cls;
6608   (void) pid;
6609   free_validation_state (vs);
6610   return GNUNET_OK;
6611 }
6612
6613
6614 /**
6615  * Function called when the service shuts down.  Unloads our plugins
6616  * and cancels pending validations.
6617  *
6618  * @param cls closure, unused
6619  */
6620 static void
6621 do_shutdown (void *cls)
6622 {
6623   struct LearnLaunchEntry *lle;
6624   (void) cls;
6625
6626   if (NULL != ephemeral_task)
6627   {
6628     GNUNET_SCHEDULER_cancel (ephemeral_task);
6629     ephemeral_task = NULL;
6630   }
6631   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
6632                                          &free_neighbour_cb,
6633                                          NULL);
6634   if (NULL != peerstore)
6635   {
6636     GNUNET_PEERSTORE_disconnect (peerstore,
6637                                  GNUNET_NO);
6638     peerstore = NULL;
6639   }
6640   if (NULL != GST_stats)
6641   {
6642     GNUNET_STATISTICS_destroy (GST_stats,
6643                                GNUNET_NO);
6644     GST_stats = NULL;
6645   }
6646   if (NULL != GST_my_private_key)
6647   {
6648     GNUNET_free (GST_my_private_key);
6649     GST_my_private_key = NULL;
6650   }
6651   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
6652   neighbours = NULL;
6653   GNUNET_CONTAINER_multipeermap_iterate (validation_map,
6654                                          &free_validation_state_cb,
6655                                          NULL);
6656   GNUNET_CONTAINER_multipeermap_destroy (validation_map);
6657   validation_map = NULL;
6658   while (NULL != (lle = lle_head))
6659   {
6660     GNUNET_CONTAINER_DLL_remove (lle_head,
6661                                  lle_tail,
6662                                  lle);
6663     GNUNET_free (lle);
6664   }
6665   GNUNET_CONTAINER_multishortmap_destroy (dvlearn_map);
6666   dvlearn_map = NULL;
6667   GNUNET_CONTAINER_heap_destroy (validation_heap);
6668   validation_heap = NULL;
6669   GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
6670                                          &free_dv_routes_cb,
6671                                          NULL);
6672   GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
6673   dv_routes = NULL;
6674   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
6675                                          &free_ephemeral_cb,
6676                                          NULL);
6677   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
6678   ephemeral_map = NULL;
6679   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
6680   ephemeral_heap = NULL;
6681 }
6682
6683
6684 /**
6685  * Initiate transport service.
6686  *
6687  * @param cls closure
6688  * @param c configuration to use
6689  * @param service the initialized service
6690  */
6691 static void
6692 run (void *cls,
6693      const struct GNUNET_CONFIGURATION_Handle *c,
6694      struct GNUNET_SERVICE_Handle *service)
6695 {
6696   (void) cls;
6697   (void) service;
6698   /* setup globals */
6699   GST_cfg = c;
6700   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
6701                                                      GNUNET_YES);
6702   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
6703                                                     GNUNET_YES);
6704   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
6705                                                         GNUNET_YES);
6706   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
6707   dvlearn_map = GNUNET_CONTAINER_multishortmap_create (2 * MAX_DV_LEARN_PENDING,
6708                                                        GNUNET_YES);
6709   validation_map = GNUNET_CONTAINER_multipeermap_create (1024,
6710                                                          GNUNET_YES);
6711   validation_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
6712   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
6713   if (NULL == GST_my_private_key)
6714   {
6715     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
6716                 _("Transport service is lacking key configuration settings. Exiting.\n"));
6717     GNUNET_SCHEDULER_shutdown ();
6718     return;
6719   }
6720   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
6721                                       &GST_my_identity.public_key);
6722   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
6723              "My identity is `%s'\n",
6724              GNUNET_i2s_full (&GST_my_identity));
6725   GST_stats = GNUNET_STATISTICS_create ("transport",
6726                                         GST_cfg);
6727   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
6728                                  NULL);
6729   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
6730   if (NULL == peerstore)
6731   {
6732     GNUNET_break (0);
6733     GNUNET_SCHEDULER_shutdown ();
6734     return;
6735   }
6736 }
6737
6738
6739 /**
6740  * Define "main" method using service macro.
6741  */
6742 GNUNET_SERVICE_MAIN
6743 ("transport",
6744  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
6745  &run,
6746  &client_connect_cb,
6747  &client_disconnect_cb,
6748  NULL,
6749  /* communication with applications */
6750  GNUNET_MQ_hd_fixed_size (suggest,
6751                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST,
6752                           struct ExpressPreferenceMessage,
6753                           NULL),
6754  GNUNET_MQ_hd_fixed_size (suggest_cancel,
6755                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL,
6756                           struct ExpressPreferenceMessage,
6757                           NULL),
6758  GNUNET_MQ_hd_var_size (request_hello_validation,
6759                         GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_HELLO_VALIDATION,
6760                         struct RequestHelloValidationMessage,
6761                         NULL),
6762  /* communication with core */
6763  GNUNET_MQ_hd_fixed_size (client_start,
6764                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
6765                           struct StartMessage,
6766                           NULL),
6767  GNUNET_MQ_hd_var_size (client_send,
6768                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
6769                         struct OutboundMessage,
6770                         NULL),
6771  /* communication with communicators */
6772  GNUNET_MQ_hd_var_size (communicator_available,
6773                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
6774                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
6775                         NULL),
6776  GNUNET_MQ_hd_var_size (communicator_backchannel,
6777                         GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
6778                         struct GNUNET_TRANSPORT_CommunicatorBackchannel,
6779                         NULL),
6780  GNUNET_MQ_hd_var_size (add_address,
6781                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
6782                         struct GNUNET_TRANSPORT_AddAddressMessage,
6783                         NULL),
6784  GNUNET_MQ_hd_fixed_size (del_address,
6785                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
6786                           struct GNUNET_TRANSPORT_DelAddressMessage,
6787                           NULL),
6788  GNUNET_MQ_hd_var_size (incoming_msg,
6789                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
6790                         struct GNUNET_TRANSPORT_IncomingMessage,
6791                         NULL),
6792  GNUNET_MQ_hd_fixed_size (queue_create_ok,
6793                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
6794                           struct GNUNET_TRANSPORT_CreateQueueResponse,
6795                           NULL),
6796  GNUNET_MQ_hd_fixed_size (queue_create_fail,
6797                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
6798                           struct GNUNET_TRANSPORT_CreateQueueResponse,
6799                           NULL),
6800  GNUNET_MQ_hd_var_size (add_queue_message,
6801                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
6802                         struct GNUNET_TRANSPORT_AddQueueMessage,
6803                         NULL),
6804  GNUNET_MQ_hd_var_size (address_consider_verify,
6805                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
6806                         struct GNUNET_TRANSPORT_AddressToVerify,
6807                         NULL),
6808  GNUNET_MQ_hd_fixed_size (del_queue_message,
6809                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
6810                           struct GNUNET_TRANSPORT_DelQueueMessage,
6811                           NULL),
6812  GNUNET_MQ_hd_fixed_size (send_message_ack,
6813                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
6814                           struct GNUNET_TRANSPORT_SendMessageToAck,
6815                           NULL),
6816  /* communication with monitors */
6817  GNUNET_MQ_hd_fixed_size (monitor_start,
6818                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
6819                           struct GNUNET_TRANSPORT_MonitorStart,
6820                           NULL),
6821  GNUNET_MQ_handler_end ());
6822
6823
6824 /* end of file gnunet-service-transport.c */