Merge branch 'master' of git+ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement next:
36  * - DV data structures:
37  *   + initiation of DV learn (incl. RTT measurement logic!)
38  *     - security considerations? add signatures to routes? initiator signature?
39  *   + using DV routes!
40  *     - handling of DV-boxed messages that need to be forwarded
41  *     - route_message implementation, including using DV data structures
42  *       (but not when routing certain message types, like DV learn,
43  *        MUST pay attention to content here -- or pass extra flags?)
44  * - ACK handling / retransmission
45  * - track RTT, distance, loss, etc.
46  * - backchannel message encryption & decryption
47  *
48  * Later:
49  * - change transport-core API to provide proper flow control in both
50  *   directions, allow multiple messages per peer simultaneously (tag
51  *   confirmations with unique message ID), and replace quota-out with
52  *   proper flow control;
53  * - if messages are below MTU, consider adding ACKs and other stuff
54  *   (requires planning at receiver, and additional MST-style demultiplex
55  *    at receiver!)
56  * - could avoid copying body of message into each fragment and keep
57  *   fragments as just pointers into the original message and only
58  *   fully build fragments just before transmission (optimization, should
59  *   reduce CPU and memory use)
60  *
61  * Design realizations / discussion:
62  * - communicators do flow control by calling MQ "notify sent"
63  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
64  *   or explicitly via backchannel FC ACKs.  As long as the
65  *   channel is not full, they may 'notify sent' even if the other
66  *   peer has not yet confirmed receipt. The other peer confirming
67  *   is _only_ for FC, not for more reliable transmission; reliable
68  *   transmission (i.e. of fragments) is left to _transport_.
69  * - ACKs sent back in uni-directional communicators are done via
70  *   the background channel API; here transport _may_ initially
71  *   broadcast (with bounded # hops) if no path is known;
72  * - transport should _integrate_ DV-routing and build a view of
73  *   the network; then background channel traffic can be
74  *   routed via DV as well as explicit "DV" traffic.
75  * - background channel is also used for ACKs and NAT traversal support
76  * - transport service is responsible for AEAD'ing the background
77  *   channel, timestamps and monotonic time are used against replay
78  *   of old messages -> peerstore needs to be supplied with
79  *   "latest timestamps seen" data
80  * - if transport implements DV, we likely need a 3rd peermap
81  *   in addition to ephemerals and (direct) neighbours
82  *   ==> check if stuff needs to be moved out of "Neighbour"
83  * - transport should encapsualte core-level messages and do its
84  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
85  *   for retransmission
86  */
87 #include "platform.h"
88 #include "gnunet_util_lib.h"
89 #include "gnunet_statistics_service.h"
90 #include "gnunet_transport_monitor_service.h"
91 #include "gnunet_peerstore_service.h"
92 #include "gnunet_hello_lib.h"
93 #include "gnunet_signatures.h"
94 #include "transport.h"
95
96
97 /**
98  * What is the size we assume for a read operation in the
99  * absence of an MTU for the purpose of flow control?
100  */
101 #define IN_PACKET_SIZE_WITHOUT_MTU 128
102
103 /**
104  * Minimum number of hops we should forward DV learn messages
105  * even if they are NOT useful for us in hope of looping
106  * back to the initiator?
107  *
108  * FIXME: allow initiator some control here instead?
109  */
110 #define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
111
112 /**
113  * Maximum DV distance allowed ever.
114  */
115 #define MAX_DV_HOPS_ALLOWED 16
116
117 /**
118  * Maximum number of DV learning activities we may
119  * have pending at the same time.
120  */
121 #define MAX_DV_LEARN_PENDING 64
122
123 /**
124  * Maximum number of DV paths we keep simultaneously to the same target.
125  */
126 #define MAX_DV_PATHS_TO_TARGET 3
127
128 /**
129  * If a queue delays the next message by more than this number
130  * of seconds we log a warning. Note: this is for testing,
131  * the value chosen here might be too aggressively low!
132  */
133 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
134
135 /**
136  * We only consider queues as "quality" connections when      
137  * suppressing the generation of DV initiation messages if 
138  * the latency of the queue is below this threshold.
139  */
140 #define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
141
142 /**
143  * How long do we consider a DV path valid if we see no
144  * further updates on it? Note: the value chosen here might be too low!
145  */
146 #define DV_PATH_VALIDITY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
147
148 /**
149  * How long before paths expire would we like to (re)discover DV paths? Should
150  * be below #DV_PATH_VALIDITY_TIMEOUT.
151  */
152 #define DV_PATH_DISCOVERY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
153
154 /**
155  * How long are ephemeral keys valid?
156  */
157 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
158
159 /**
160  * How long do we keep partially reassembled messages around before giving up?
161  */
162 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
163
164 /**
165  * What is the fastest rate at which we send challenges *if* we keep learning
166  * an address (gossip, DHT, etc.)?
167  */
168 #define FAST_VALIDATION_CHALLENGE_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
169
170 /**
171  * What is the slowest rate at which we send challenges?
172  */
173 #define MAX_VALIDATION_CHALLENGE_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
174
175 /**
176  * What is the non-randomized base frequency at which we
177  * would initiate DV learn messages?
178  */
179 #define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
180
181 /**
182  * How many good connections (confirmed, bi-directional, not DV)
183  * do we need to have to suppress initiating DV learn messages?
184  */
185 #define DV_LEARN_QUALITY_THRESHOLD 100
186   
187 /**
188  * When do we forget an invalid address for sure?
189  */
190 #define MAX_ADDRESS_VALID_UNTIL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
191 /**
192  * How long do we consider an address valid if we just checked?
193  */
194 #define ADDRESS_VALIDATION_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
195
196 /**
197  * What is the maximum frequency at which we do address validation?
198  * A random value between 0 and this value is added when scheduling
199  * the #validation_task (both to ensure we do not validate too often,
200  * and to randomize a bit).
201  */
202 #define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
203
204 /**
205  * How many network RTTs before an address validation expires should we begin
206  * trying to revalidate? (Note that the RTT used here is the one that we
207  * experienced during the last validation, not necessarily the latest RTT
208  * observed).
209  */
210 #define VALIDATION_RTT_BUFFER_FACTOR 3
211
212 /**
213  * How many messages can we have pending for a given communicator
214  * process before we start to throttle that communicator?
215  *
216  * Used if a communicator might be CPU-bound and cannot handle the traffic.
217  */
218 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
219
220 /**
221  * How many messages can we have pending for a given queue (queue to
222  * a particular peer via a communicator) process before we start to
223  * throttle that queue?
224  */
225 #define QUEUE_LENGTH_LIMIT 32
226
227
228 GNUNET_NETWORK_STRUCT_BEGIN
229
230 /**
231  * Outer layer of an encapsulated backchannel message.
232  */
233 struct TransportBackchannelEncapsulationMessage
234 {
235   /**
236    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
237    */
238   struct GNUNET_MessageHeader header;
239
240   /**
241    * Distance the backchannel message has traveled, to be updated at
242    * each hop.  Used to bound the number of hops in case a backchannel
243    * message is broadcast and thus travels without routing
244    * information (during initial backchannel discovery).
245    */
246   uint32_t distance;
247
248   /**
249    * Target's peer identity (as backchannels may be transmitted
250    * indirectly, or even be broadcast).
251    */
252   struct GNUNET_PeerIdentity target;
253
254   /**
255    * Ephemeral key setup by the sender for @e target, used
256    * to encrypt the payload.
257    */
258   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
259
260   // FIXME: probably should add random IV here as well,
261   // especially if we re-use ephemeral keys!
262
263   /**
264    * HMAC over the ciphertext of the encrypted, variable-size
265    * body that follows.  Verified via DH of @e target and
266    * @e ephemeral_key
267    */
268   struct GNUNET_HashCode hmac;
269
270   /* Followed by encrypted, variable-size payload */
271 };
272
273
274 /**
275  * Body by which a peer confirms that it is using an ephemeral key.
276  */
277 struct EphemeralConfirmation
278 {
279
280   /**
281    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
282    */
283   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
284
285   /**
286    * How long is this signature over the ephemeral key valid?
287    * Note that the receiver MUST IGNORE the absolute time, and
288    * only interpret the value as a mononic time and reject
289    * "older" values than the last one observed.  Even with this,
290    * there is no real guarantee against replay achieved here,
291    * as the latest timestamp is not persisted.  This is
292    * necessary as we do not want to require synchronized
293    * clocks and may not have a bidirectional communication
294    * channel.  Communicators must protect against replay
295    * attacks when using backchannel communication!
296    */
297   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
298
299   /**
300    * Target's peer identity.
301    */
302   struct GNUNET_PeerIdentity target;
303
304   /**
305    * Ephemeral key setup by the sender for @e target, used
306    * to encrypt the payload.
307    */
308   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
309
310 };
311
312
313 /**
314  * Plaintext of the variable-size payload that is encrypted
315  * within a `struct TransportBackchannelEncapsulationMessage`
316  */
317 struct TransportBackchannelRequestPayload
318 {
319
320   /**
321    * Sender's peer identity.
322    */
323   struct GNUNET_PeerIdentity sender;
324
325   /**
326    * Signature of the sender over an
327    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
328    */
329   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
330
331   /**
332    * How long is this signature over the ephemeral key
333    * valid?
334    */
335   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
336
337   /**
338    * Current monotonic time of the sending transport service.  Used to
339    * detect replayed messages.  Note that the receiver should remember
340    * a list of the recently seen timestamps and only reject messages
341    * if the timestamp is in the list, or the list is "full" and the
342    * timestamp is smaller than the lowest in the list.  This list of
343    * timestamps per peer should be persisted to guard against replays
344    * after restarts.
345    */
346   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
347
348   /* Followed by a `struct GNUNET_MessageHeader` with a message
349      for a communicator */
350
351   /* Followed by a 0-termianted string specifying the name of
352      the communicator which is to receive the message */
353
354 };
355
356
357 /**
358  * Outer layer of an encapsulated unfragmented application message sent
359  * over an unreliable channel.
360  */
361 struct TransportReliabilityBox
362 {
363   /**
364    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
365    */
366   struct GNUNET_MessageHeader header;
367
368   /**
369    * Number of messages still to be sent before a commulative
370    * ACK is requested.  Zero if an ACK is requested immediately.
371    * In NBO.  Note that the receiver may send the ACK faster
372    * if it believes that is reasonable.
373    */
374   uint32_t ack_countdown GNUNET_PACKED;
375
376   /**
377    * Unique ID of the message used for signalling receipt of
378    * messages sent over possibly unreliable channels.  Should
379    * be a random.
380    */
381   struct GNUNET_ShortHashCode msg_uuid;
382 };
383
384
385 /**
386  * Confirmation that the receiver got a
387  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
388  * confirmation may be transmitted over a completely different queue,
389  * so ACKs are identified by a combination of PID of sender and
390  * message UUID, without the queue playing any role!
391  */
392 struct TransportReliabilityAckMessage
393 {
394   /**
395    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
396    */
397   struct GNUNET_MessageHeader header;
398
399   /**
400    * Reserved. Zero.
401    */
402   uint32_t reserved GNUNET_PACKED;
403
404   /**
405    * How long was the ACK delayed relative to the average time of
406    * receipt of the messages being acknowledged?  Used to calculate
407    * the average RTT by taking the receipt time of the ack minus the
408    * average transmission time of the sender minus this value.
409    */
410   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
411
412   /* followed by any number of `struct GNUNET_ShortHashCode`
413      messages providing ACKs */
414 };
415
416
417 /**
418  * Outer layer of an encapsulated fragmented application message.
419  */
420 struct TransportFragmentBox
421 {
422   /**
423    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
424    */
425   struct GNUNET_MessageHeader header;
426
427   /**
428    * Unique ID of this fragment (and fragment transmission!). Will
429    * change even if a fragement is retransmitted to make each
430    * transmission attempt unique! Should be incremented by one for
431    * each fragment transmission. If a client receives a duplicate
432    * fragment (same @e frag_off), it must send
433    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
434    */
435   uint32_t frag_uuid GNUNET_PACKED;
436
437   /**
438    * Original message ID for of the message that all the1
439    * fragments belong to.  Must be the same for all fragments.
440    */
441   struct GNUNET_ShortHashCode msg_uuid;
442
443   /**
444    * Offset of this fragment in the overall message.
445    */
446   uint16_t frag_off GNUNET_PACKED;
447
448   /**
449    * Total size of the message that is being fragmented.
450    */
451   uint16_t msg_size GNUNET_PACKED;
452
453 };
454
455
456 /**
457  * Outer layer of an fragmented application message sent over a queue
458  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
459  * received, the receiver has two RTTs or 64 further fragments with
460  * the same basic message time to send an acknowledgement, possibly
461  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
462  * sent immediately once all fragments were sent.
463  */
464 struct TransportFragmentAckMessage
465 {
466   /**
467    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
468    */
469   struct GNUNET_MessageHeader header;
470
471   /**
472    * Unique ID of the lowest fragment UUID being acknowledged.
473    */
474   uint32_t frag_uuid GNUNET_PACKED;
475
476   /**
477    * Bitfield of up to 64 additional fragments following the
478    * @e msg_uuid being acknowledged by this message.
479    */
480   uint64_t extra_acks GNUNET_PACKED;
481
482   /**
483    * Original message ID for of the message that all the
484    * fragments belong to.
485    */
486   struct GNUNET_ShortHashCode msg_uuid;
487
488   /**
489    * How long was the ACK delayed relative to the average time of
490    * receipt of the fragments being acknowledged?  Used to calculate
491    * the average RTT by taking the receipt time of the ack minus the
492    * average transmission time of the sender minus this value.
493    */
494   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
495
496   /**
497    * How long until the receiver will stop trying reassembly
498    * of this message?
499    */
500   struct GNUNET_TIME_RelativeNBO reassembly_timeout;
501 };
502
503
504 /**
505  * Content signed by each peer during DV learning.
506  */
507 struct DvInitPS
508 {
509   /**
510    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
511    */
512   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
513
514   /**
515    * Challenge value used by the initiator to re-identify the path.
516    */
517   struct GNUNET_ShortHashCode challenge;
518
519 };
520
521
522 /**
523  * Content signed by each peer during DV learning.
524  */
525 struct DvHopPS
526 {
527   /**
528    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP
529    */
530   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
531
532   /**
533    * Identity of the previous peer on the path.
534    */
535   struct GNUNET_PeerIdentity pred;
536
537   /**
538    * Identity of the next peer on the path.
539    */
540   struct GNUNET_PeerIdentity succ;
541
542   /**
543    * Challenge value used by the initiator to re-identify the path.
544    */
545   struct GNUNET_ShortHashCode challenge;
546
547 };
548
549
550 /**
551  * An entry describing a peer on a path in a
552  * `struct TransportDVLearn` message.
553  */
554 struct DVPathEntryP
555 {
556   /**
557    * Identity of a peer on the path.
558    */
559   struct GNUNET_PeerIdentity hop;
560
561   /**
562    * Signature of this hop over the path, of purpose
563    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP
564    */
565   struct GNUNET_CRYPTO_EddsaSignature hop_sig;
566
567 };
568
569
570 /**
571  * Internal message used by transport for distance vector learning.
572  * If @e num_hops does not exceed the threshold, peers should append
573  * themselves to the peer list and flood the message (possibly only
574  * to a subset of their neighbours to limit discoverability of the
575  * network topology).  To the extend that the @e bidirectional bits
576  * are set, peers may learn the inverse paths even if they did not
577  * initiate.
578  *
579  * Unless received on a bidirectional queue and @e num_hops just
580  * zero, peers that can forward to the initator should always try to
581  * forward to the initiator.
582  */
583 struct TransportDVLearn
584 {
585   /**
586    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
587    */
588   struct GNUNET_MessageHeader header;
589
590   /**
591    * Number of hops this messages has travelled, in NBO. Zero if
592    * sent by initiator.
593    */
594   uint16_t num_hops GNUNET_PACKED;
595
596   /**
597    * Bitmask of the last 16 hops indicating whether they are confirmed
598    * available (without DV) in both directions or not, in NBO.  Used
599    * to possibly instantly learn a path in both directions.  Each peer
600    * should shift this value by one to the left, and then set the
601    * lowest bit IF the current sender can be reached from it (without
602    * DV routing).
603    */
604   uint16_t bidirectional GNUNET_PACKED;
605
606   /**
607    * Peers receiving this message and delaying forwarding to other
608    * peers for any reason should increment this value by the non-network
609    * delay created by the peer.
610    */
611   struct GNUNET_TIME_RelativeNBO non_network_delay;
612
613   /**
614    * Signature of this hop over the path, of purpose
615    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
616    */
617   struct GNUNET_CRYPTO_EddsaSignature init_sig;
618
619   /**
620    * Identity of the peer that started this learning activity.
621    */
622   struct GNUNET_PeerIdentity initiator;
623
624   /**
625    * Challenge value used by the initiator to re-identify the path.
626    */
627   struct GNUNET_ShortHashCode challenge;
628
629   /* Followed by @e num_hops `struct DVPathEntryP` values,
630      excluding the initiator of the DV trace; the last entry is the
631      current sender; the current peer must not be included. */
632
633 };
634
635
636 /**
637  * Outer layer of an encapsulated message send over multiple hops.
638  * The path given only includes the identities of the subsequent
639  * peers, i.e. it will be empty if we are the receiver. Each
640  * forwarding peer should scan the list from the end, and if it can,
641  * forward to the respective peer. The list should then be shortened
642  * by all the entries up to and including that peer.  Each hop should
643  * also increment @e total_hops to allow the receiver to get a precise
644  * estimate on the number of hops the message travelled.  Senders must
645  * provide a learned path that thus should work, but intermediaries
646  * know of a shortcut, they are allowed to send the message via that
647  * shortcut.
648  *
649  * If a peer finds itself still on the list, it must drop the message.
650  */
651 struct TransportDVBox
652 {
653   /**
654    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
655    */
656   struct GNUNET_MessageHeader header;
657
658   /**
659    * Number of total hops this messages travelled. In NBO.
660    * @e origin sets this to zero, to be incremented at
661    * each hop.
662    */
663   uint16_t total_hops GNUNET_PACKED;
664
665   /**
666    * Number of hops this messages includes. In NBO.
667    */
668   uint16_t num_hops GNUNET_PACKED;
669
670   /**
671    * Identity of the peer that originated the message.
672    */
673   struct GNUNET_PeerIdentity origin;
674
675   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
676      excluding the @e origin and the current peer, the last must be
677      the ultimate target; if @e num_hops is zero, the receiver of this
678      message is the ultimate target. */
679
680   /* Followed by the actual message, which itself may be
681      another box, but not a DV_LEARN or DV_BOX message! */
682 };
683
684
685 /**
686  * Message send to another peer to validate that it can indeed
687  * receive messages at a particular address.
688  */
689 struct TransportValidationChallenge
690 {
691
692   /**
693    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE
694    */
695   struct GNUNET_MessageHeader header;
696
697   /**
698    * Zero.
699    */
700   uint32_t reserved GNUNET_PACKED;
701
702   /**
703    * Challenge to be signed by the receiving peer.
704    */
705   struct GNUNET_ShortHashCode challenge;
706
707   /**
708    * Timestamp of the sender, to be copied into the reply
709    * to allow sender to calculate RTT.
710    */
711   struct GNUNET_TIME_AbsoluteNBO sender_time;
712 };
713
714
715 /**
716  * Message signed by a peer to confirm that it can indeed
717  * receive messages at a particular address.
718  */
719 struct TransportValidationPS
720 {
721
722   /**
723    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE
724    */
725   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
726
727   /**
728    * How long does the sender believe the address on
729    * which the challenge was received to remain valid?
730    */
731   struct GNUNET_TIME_RelativeNBO validity_duration;
732
733   /**
734    * Challenge signed by the receiving peer.
735    */
736   struct GNUNET_ShortHashCode challenge;
737
738 };
739
740
741 /**
742  * Message send to a peer to respond to a
743  * #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
744  */
745 struct TransportValidationResponse
746 {
747
748   /**
749    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE
750    */
751   struct GNUNET_MessageHeader header;
752
753   /**
754    * Zero.
755    */
756   uint32_t reserved GNUNET_PACKED;
757
758   /**
759    * The peer's signature matching the
760    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE purpose.
761    */
762   struct GNUNET_CRYPTO_EddsaSignature signature;
763
764   /**
765    * The challenge that was signed by the receiving peer.
766    */
767   struct GNUNET_ShortHashCode challenge;
768
769   /**
770    * Original timestamp of the sender (was @code{sender_time}),
771    * copied into the reply to allow sender to calculate RTT.
772    */
773   struct GNUNET_TIME_AbsoluteNBO origin_time;
774
775   /**
776    * How long does the sender believe this address to remain
777    * valid?
778    */
779   struct GNUNET_TIME_RelativeNBO validity_duration;
780 };
781
782
783
784 GNUNET_NETWORK_STRUCT_END
785
786
787 /**
788  * What type of client is the `struct TransportClient` about?
789  */
790 enum ClientType
791 {
792   /**
793    * We do not know yet (client is fresh).
794    */
795   CT_NONE = 0,
796
797   /**
798    * Is the CORE service, we need to forward traffic to it.
799    */
800   CT_CORE = 1,
801
802   /**
803    * It is a monitor, forward monitor data.
804    */
805   CT_MONITOR = 2,
806
807   /**
808    * It is a communicator, use for communication.
809    */
810   CT_COMMUNICATOR = 3,
811
812   /**
813    * "Application" telling us where to connect (i.e. TOPOLOGY, DHT or CADET).
814    */
815   CT_APPLICATION = 4
816 };
817
818
819 /**
820  * When did we launch this DV learning activity? 
821  */
822 struct LearnLaunchEntry
823 {
824
825   /**
826    * Kept (also) in a DLL sorted by launch time.
827    */
828   struct LearnLaunchEntry *prev;
829
830   /**
831    * Kept (also) in a DLL sorted by launch time.
832    */
833   struct LearnLaunchEntry *next;
834
835   /**
836    * Challenge that uniquely identifies this activity.
837    */
838   struct GNUNET_ShortHashCode challenge;
839
840   /**
841    * When did we transmit the DV learn message (used to
842    * calculate RTT).
843    */
844   struct GNUNET_TIME_Absolute launch_time;
845
846 };
847
848
849 /**
850  * Entry in our cache of ephemeral keys we currently use.
851  * This way, we only sign an ephemeral once per @e target,
852  * and then can re-use it over multiple
853  * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION
854  * messages (as signing is expensive).
855  */
856 struct EphemeralCacheEntry
857 {
858
859   /**
860    * Target's peer identity (we don't re-use ephemerals
861    * to limit linkability of messages).
862    */
863   struct GNUNET_PeerIdentity target;
864
865   /**
866    * Signature affirming @e ephemeral_key of type
867    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
868    */
869   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
870
871   /**
872    * How long is @e sender_sig valid
873    */
874   struct GNUNET_TIME_Absolute ephemeral_validity;
875
876   /**
877    * Our ephemeral key.
878    */
879   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
880
881   /**
882    * Our private ephemeral key.
883    */
884   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
885
886   /**
887    * Node in the ephemeral cache for this entry.
888    * Used for expiration.
889    */
890   struct GNUNET_CONTAINER_HeapNode *hn;
891 };
892
893
894 /**
895  * Client connected to the transport service.
896  */
897 struct TransportClient;
898
899
900 /**
901  * A neighbour that at least one communicator is connected to.
902  */
903 struct Neighbour;
904
905
906 /**
907  * Entry in our #dv_routes table, representing a (set of) distance
908  * vector routes to a particular peer.
909  */
910 struct DistanceVector;
911
912 /**
913  * One possible hop towards a DV target.
914  */
915 struct DistanceVectorHop
916 {
917
918   /**
919    * Kept in a MDLL, sorted by @e timeout.
920    */
921   struct DistanceVectorHop *next_dv;
922
923   /**
924    * Kept in a MDLL, sorted by @e timeout.
925    */
926   struct DistanceVectorHop *prev_dv;
927
928   /**
929    * Kept in a MDLL.
930    */
931   struct DistanceVectorHop *next_neighbour;
932
933   /**
934    * Kept in a MDLL.
935    */
936   struct DistanceVectorHop *prev_neighbour;
937
938   /**
939    * What would be the next hop to @e target?
940    */
941   struct Neighbour *next_hop;
942
943   /**
944    * Distance vector entry this hop belongs with.
945    */
946   struct DistanceVector *dv;
947
948   /**
949    * Array of @e distance hops to the target, excluding @e next_hop.
950    * NULL if the entire path is us to @e next_hop to `target`. Allocated
951    * at the end of this struct.
952    */
953   const struct GNUNET_PeerIdentity *path;
954
955   /**
956    * At what time do we forget about this path unless we see it again
957    * while learning?
958    */
959   struct GNUNET_TIME_Absolute timeout;
960
961   /**
962    * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
963    * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
964    */
965   unsigned int distance;
966 };
967
968
969 /**
970  * Entry in our #dv_routes table, representing a (set of) distance
971  * vector routes to a particular peer.
972  */
973 struct DistanceVector
974 {
975
976   /**
977    * To which peer is this a route?
978    */
979   struct GNUNET_PeerIdentity target;
980
981   /**
982    * Known paths to @e target.
983    */
984   struct DistanceVectorHop *dv_head;
985
986   /**
987    * Known paths to @e target.
988    */
989   struct DistanceVectorHop *dv_tail;
990
991   /**
992    * Task scheduled to purge expired paths from @e dv_head MDLL.
993    */
994   struct GNUNET_SCHEDULER_Task *timeout_task;
995 };
996
997
998 /**
999  * A queue is a message queue provided by a communicator
1000  * via which we can reach a particular neighbour.
1001  */
1002 struct Queue;
1003
1004
1005 /**
1006  * Entry identifying transmission in one of our `struct
1007  * Queue` which still awaits an ACK.  This is used to
1008  * ensure we do not overwhelm a communicator and limit the number of
1009  * messages outstanding per communicator (say in case communicator is
1010  * CPU bound) and per queue (in case bandwidth allocation exceeds
1011  * what the communicator can actually provide towards a particular
1012  * peer/target).
1013  */
1014 struct QueueEntry
1015 {
1016
1017   /**
1018    * Kept as a DLL.
1019    */
1020   struct QueueEntry *next;
1021
1022   /**
1023    * Kept as a DLL.
1024    */
1025   struct QueueEntry *prev;
1026
1027   /**
1028    * Queue this entry is queued with.
1029    */
1030   struct Queue *queue;
1031
1032   /**
1033    * Message ID used for this message with the queue used for transmission.
1034    */
1035   uint64_t mid;
1036 };
1037
1038
1039 /**
1040  * A queue is a message queue provided by a communicator
1041  * via which we can reach a particular neighbour.
1042  */
1043 struct Queue
1044 {
1045   /**
1046    * Kept in a MDLL.
1047    */
1048   struct Queue *next_neighbour;
1049
1050   /**
1051    * Kept in a MDLL.
1052    */
1053   struct Queue *prev_neighbour;
1054
1055   /**
1056    * Kept in a MDLL.
1057    */
1058   struct Queue *prev_client;
1059
1060   /**
1061    * Kept in a MDLL.
1062    */
1063   struct Queue *next_client;
1064
1065   /**
1066    * Head of DLL of unacked transmission requests.
1067    */
1068   struct QueueEntry *queue_head;
1069
1070   /**
1071    * End of DLL of unacked transmission requests.
1072    */
1073   struct QueueEntry *queue_tail;
1074
1075   /**
1076    * Which neighbour is this queue for?
1077    */
1078   struct Neighbour *neighbour;
1079
1080   /**
1081    * Which communicator offers this queue?
1082    */
1083   struct TransportClient *tc;
1084
1085   /**
1086    * Address served by the queue.
1087    */
1088   const char *address;
1089
1090   /**
1091    * Task scheduled for the time when this queue can (likely) transmit the
1092    * next message. Still needs to check with the @e tracker_out to be sure.
1093    */
1094   struct GNUNET_SCHEDULER_Task *transmit_task;
1095
1096   /**
1097    * Our current RTT estimate for this queue.
1098    */
1099   struct GNUNET_TIME_Relative rtt;
1100
1101   /**
1102    * Message ID generator for transmissions on this queue.
1103    */
1104   uint64_t mid_gen;
1105
1106   /**
1107    * Unique identifier of this queue with the communicator.
1108    */
1109   uint32_t qid;
1110
1111   /**
1112    * Maximum transmission unit supported by this queue.
1113    */
1114   uint32_t mtu;
1115
1116   /**
1117    * Distance to the target of this queue.
1118    * FIXME: needed? DV is done differently these days...
1119    */
1120   uint32_t distance;
1121
1122   /**
1123    * Messages pending.
1124    */
1125   uint32_t num_msg_pending;
1126
1127   /**
1128    * Bytes pending.
1129    */
1130   uint32_t num_bytes_pending;
1131
1132   /**
1133    * Length of the DLL starting at @e queue_head.
1134    */
1135   unsigned int queue_length;
1136
1137   /**
1138    * Network type offered by this queue.
1139    */
1140   enum GNUNET_NetworkType nt;
1141
1142   /**
1143    * Connection status for this queue.
1144    */
1145   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1146
1147   /**
1148    * How much outbound bandwidth do we have available for this queue?
1149    */
1150   struct GNUNET_BANDWIDTH_Tracker tracker_out;
1151
1152   /**
1153    * How much inbound bandwidth do we have available for this queue?
1154    */
1155   struct GNUNET_BANDWIDTH_Tracker tracker_in;
1156 };
1157
1158
1159 /**
1160  * Information we keep for a message that we are reassembling.
1161  */
1162 struct ReassemblyContext
1163 {
1164
1165   /**
1166    * Original message ID for of the message that all the
1167    * fragments belong to.
1168    */
1169   struct GNUNET_ShortHashCode msg_uuid;
1170
1171   /**
1172    * Which neighbour is this context for?
1173    */
1174   struct Neighbour *neighbour;
1175
1176   /**
1177    * Entry in the reassembly heap (sorted by expiration).
1178    */
1179   struct GNUNET_CONTAINER_HeapNode *hn;
1180
1181   /**
1182    * Bitfield with @e msg_size bits representing the positions
1183    * where we have received fragments.  When we receive a fragment,
1184    * we check the bits in @e bitfield before incrementing @e msg_missing.
1185    *
1186    * Allocated after the reassembled message.
1187    */
1188   uint8_t *bitfield;
1189
1190   /**
1191    * Task for sending ACK. We may send ACKs either because of hitting
1192    * the @e extra_acks limit, or based on time and @e num_acks.  This
1193    * task is for the latter case.
1194    */
1195   struct GNUNET_SCHEDULER_Task *ack_task;
1196
1197   /**
1198    * At what time will we give up reassembly of this message?
1199    */
1200   struct GNUNET_TIME_Absolute reassembly_timeout;
1201
1202   /**
1203    * Average delay of all acks in @e extra_acks and @e frag_uuid.
1204    * Should be reset to zero when @e num_acks is set to 0.
1205    */
1206   struct GNUNET_TIME_Relative avg_ack_delay;
1207
1208   /**
1209    * Time we received the last fragment.  @e avg_ack_delay must be
1210    * incremented by now - @e last_frag multiplied by @e num_acks.
1211    */
1212   struct GNUNET_TIME_Absolute last_frag;
1213
1214   /**
1215    * Bitfield of up to 64 additional fragments following @e frag_uuid
1216    * to be acknowledged in the next cummulative ACK.
1217    */
1218   uint64_t extra_acks;
1219
1220   /**
1221    * Unique ID of the lowest fragment UUID to be acknowledged in the
1222    * next cummulative ACK.  Only valid if @e num_acks > 0.
1223    */
1224   uint32_t frag_uuid;
1225
1226   /**
1227    * Number of ACKs we have accumulated so far.  Reset to 0
1228    * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
1229    */
1230   unsigned int num_acks;
1231
1232   /**
1233    * How big is the message we are reassembling in total?
1234    */
1235   uint16_t msg_size;
1236
1237   /**
1238    * How many bytes of the message are still missing?  Defragmentation
1239    * is complete when @e msg_missing == 0.
1240    */
1241   uint16_t msg_missing;
1242
1243   /* Followed by @e msg_size bytes of the (partially) defragmented original message */
1244
1245   /* Followed by @e bitfield data */
1246 };
1247
1248
1249 /**
1250  * A neighbour that at least one communicator is connected to.
1251  */
1252 struct Neighbour
1253 {
1254
1255   /**
1256    * Which peer is this about?
1257    */
1258   struct GNUNET_PeerIdentity pid;
1259
1260   /**
1261    * Map with `struct ReassemblyContext` structs for fragments under
1262    * reassembly. May be NULL if we currently have no fragments from
1263    * this @e pid (lazy initialization).
1264    */
1265   struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
1266
1267   /**
1268    * Heap with `struct ReassemblyContext` structs for fragments under
1269    * reassembly. May be NULL if we currently have no fragments from
1270    * this @e pid (lazy initialization).
1271    */
1272   struct GNUNET_CONTAINER_Heap *reassembly_heap;
1273
1274   /**
1275    * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
1276    */
1277   struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
1278
1279   /**
1280    * Head of list of messages pending for this neighbour.
1281    */
1282   struct PendingMessage *pending_msg_head;
1283
1284   /**
1285    * Tail of list of messages pending for this neighbour.
1286    */
1287   struct PendingMessage *pending_msg_tail;
1288
1289   /**
1290    * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1291    * purged if this neighbour goes down.
1292    */
1293   struct DistanceVectorHop *dv_head;
1294
1295   /**
1296    * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1297    * purged if this neighbour goes down.
1298    */
1299   struct DistanceVectorHop *dv_tail;
1300
1301   /**
1302    * Head of DLL of queues to this peer.
1303    */
1304   struct Queue *queue_head;
1305
1306   /**
1307    * Tail of DLL of queues to this peer.
1308    */
1309   struct Queue *queue_tail;
1310
1311   /**
1312    * Task run to cleanup pending messages that have exceeded their timeout.
1313    */
1314   struct GNUNET_SCHEDULER_Task *timeout_task;
1315
1316   /**
1317    * Quota at which CORE is allowed to transmit to this peer.
1318    *
1319    * FIXME: not yet used, tricky to get right given multiple queues!
1320    *        (=> Idea: measure???)
1321    * FIXME: how do we set this value initially when we tell CORE?
1322    *    Options: start at a minimum value or at literally zero?
1323    *         (=> Current thought: clean would be zero!)
1324    */
1325   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1326
1327   /**
1328    * What is the earliest timeout of any message in @e pending_msg_tail?
1329    */
1330   struct GNUNET_TIME_Absolute earliest_timeout;
1331
1332 };
1333
1334
1335 /**
1336  * A peer that an application (client) would like us to talk to directly.
1337  */
1338 struct PeerRequest
1339 {
1340
1341   /**
1342    * Which peer is this about?
1343    */
1344   struct GNUNET_PeerIdentity pid;
1345
1346   /**
1347    * Client responsible for the request.
1348    */
1349   struct TransportClient *tc;
1350
1351   /**
1352    * Handle for watching the peerstore for HELLOs for this peer.
1353    */
1354   struct GNUNET_PEERSTORE_WatchContext *wc;
1355
1356   /**
1357    * What kind of performance preference does this @e tc have?
1358    */
1359   enum GNUNET_MQ_PreferenceKind pk;
1360
1361   /**
1362    * How much bandwidth would this @e tc like to see?
1363    */
1364   struct GNUNET_BANDWIDTH_Value32NBO bw;
1365
1366 };
1367
1368
1369 /**
1370  * Types of different pending messages.
1371  */
1372 enum PendingMessageType
1373 {
1374
1375   /**
1376    * Ordinary message received from the CORE service.
1377    */
1378   PMT_CORE = 0,
1379
1380   /**
1381    * Fragment box.
1382    */
1383   PMT_FRAGMENT_BOX = 1,
1384
1385   /**
1386    * Reliability box.
1387    */
1388   PMT_RELIABILITY_BOX = 2,
1389
1390   /**
1391    * Any type of acknowledgement.
1392    */
1393   PMT_ACKNOWLEDGEMENT = 3
1394
1395 };
1396
1397
1398 /**
1399  * Transmission request that is awaiting delivery.  The original
1400  * transmission requests from CORE may be too big for some queues.
1401  * In this case, a *tree* of fragments is created.  At each
1402  * level of the tree, fragments are kept in a DLL ordered by which
1403  * fragment should be sent next (at the head).  The tree is searched
1404  * top-down, with the original message at the root.
1405  *
1406  * To select a node for transmission, first it is checked if the
1407  * current node's message fits with the MTU.  If it does not, we
1408  * either calculate the next fragment (based on @e frag_off) from the
1409  * current node, or, if all fragments have already been created,
1410  * descend to the @e head_frag.  Even though the node was already
1411  * fragmented, the fragment may be too big if the fragment was
1412  * generated for a queue with a larger MTU. In this case, the node
1413  * may be fragmented again, thus creating a tree.
1414  *
1415  * When acknowledgements for fragments are received, the tree
1416  * must be pruned, removing those parts that were already
1417  * acknowledged.  When fragments are sent over a reliable
1418  * channel, they can be immediately removed.
1419  *
1420  * If a message is ever fragmented, then the original "full" message
1421  * is never again transmitted (even if it fits below the MTU), and
1422  * only (remaining) fragments are sent.
1423  */
1424 struct PendingMessage
1425 {
1426   /**
1427    * Kept in a MDLL of messages for this @a target.
1428    */
1429   struct PendingMessage *next_neighbour;
1430
1431   /**
1432    * Kept in a MDLL of messages for this @a target.
1433    */
1434   struct PendingMessage *prev_neighbour;
1435
1436   /**
1437    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1438    */
1439   struct PendingMessage *next_client;
1440
1441   /**
1442    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
1443    */
1444   struct PendingMessage *prev_client;
1445
1446   /**
1447    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1448    */
1449   struct PendingMessage *next_frag;
1450
1451   /**
1452    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
1453    */
1454   struct PendingMessage *prev_frag;
1455
1456   /**
1457    * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1458    */
1459   struct PendingMessage *bpm;
1460
1461   /**
1462    * Target of the request.
1463    */
1464   struct Neighbour *target;
1465
1466   /**
1467    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1468    */
1469   struct TransportClient *client;
1470
1471   /**
1472    * Head of a MDLL of fragments created for this core message.
1473    */
1474   struct PendingMessage *head_frag;
1475
1476   /**
1477    * Tail of a MDLL of fragments created for this core message.
1478    */
1479   struct PendingMessage *tail_frag;
1480
1481   /**
1482    * Our parent in the fragmentation tree.
1483    */
1484   struct PendingMessage *frag_parent;
1485
1486   /**
1487    * At what time should we give up on the transmission (and no longer retry)?
1488    */
1489   struct GNUNET_TIME_Absolute timeout;
1490
1491   /**
1492    * What is the earliest time for us to retry transmission of this message?
1493    */
1494   struct GNUNET_TIME_Absolute next_attempt;
1495
1496   /**
1497    * UUID to use for this message (used for reassembly of fragments, only
1498    * initialized if @e msg_uuid_set is #GNUNET_YES).
1499    */
1500   struct GNUNET_ShortHashCode msg_uuid;
1501
1502   /**
1503    * Counter incremented per generated fragment.
1504    */
1505   uint32_t frag_uuidgen;
1506
1507   /**
1508    * Type of the pending message.
1509    */
1510   enum PendingMessageType pmt;
1511
1512   /**
1513    * Size of the original message.
1514    */
1515   uint16_t bytes_msg;
1516
1517   /**
1518    * Offset at which we should generate the next fragment.
1519    */
1520   uint16_t frag_off;
1521
1522   /**
1523    * #GNUNET_YES once @e msg_uuid was initialized
1524    */
1525   int16_t msg_uuid_set;
1526
1527   /* Followed by @e bytes_msg to transmit */
1528 };
1529
1530
1531 /**
1532  * One of the addresses of this peer.
1533  */
1534 struct AddressListEntry
1535 {
1536
1537   /**
1538    * Kept in a DLL.
1539    */
1540   struct AddressListEntry *next;
1541
1542   /**
1543    * Kept in a DLL.
1544    */
1545   struct AddressListEntry *prev;
1546
1547   /**
1548    * Which communicator provides this address?
1549    */
1550   struct TransportClient *tc;
1551
1552   /**
1553    * The actual address.
1554    */
1555   const char *address;
1556
1557   /**
1558    * Current context for storing this address in the peerstore.
1559    */
1560   struct GNUNET_PEERSTORE_StoreContext *sc;
1561
1562   /**
1563    * Task to periodically do @e st operation.
1564    */
1565   struct GNUNET_SCHEDULER_Task *st;
1566
1567   /**
1568    * What is a typical lifetime the communicator expects this
1569    * address to have? (Always from now.)
1570    */
1571   struct GNUNET_TIME_Relative expiration;
1572
1573   /**
1574    * Address identifier used by the communicator.
1575    */
1576   uint32_t aid;
1577
1578   /**
1579    * Network type offered by this address.
1580    */
1581   enum GNUNET_NetworkType nt;
1582
1583 };
1584
1585
1586 /**
1587  * Client connected to the transport service.
1588  */
1589 struct TransportClient
1590 {
1591
1592   /**
1593    * Kept in a DLL.
1594    */
1595   struct TransportClient *next;
1596
1597   /**
1598    * Kept in a DLL.
1599    */
1600   struct TransportClient *prev;
1601
1602   /**
1603    * Handle to the client.
1604    */
1605   struct GNUNET_SERVICE_Client *client;
1606
1607   /**
1608    * Message queue to the client.
1609    */
1610   struct GNUNET_MQ_Handle *mq;
1611
1612   /**
1613    * What type of client is this?
1614    */
1615   enum ClientType type;
1616
1617   union
1618   {
1619
1620     /**
1621      * Information for @e type #CT_CORE.
1622      */
1623     struct {
1624
1625       /**
1626        * Head of list of messages pending for this client, sorted by
1627        * transmission time ("next_attempt" + possibly internal prioritization).
1628        */
1629       struct PendingMessage *pending_msg_head;
1630
1631       /**
1632        * Tail of list of messages pending for this client.
1633        */
1634       struct PendingMessage *pending_msg_tail;
1635
1636     } core;
1637
1638     /**
1639      * Information for @e type #CT_MONITOR.
1640      */
1641     struct {
1642
1643       /**
1644        * Peer identity to monitor the addresses of.
1645        * Zero to monitor all neighbours.  Valid if
1646        * @e type is #CT_MONITOR.
1647        */
1648       struct GNUNET_PeerIdentity peer;
1649
1650       /**
1651        * Is this a one-shot monitor?
1652        */
1653       int one_shot;
1654
1655     } monitor;
1656
1657
1658     /**
1659      * Information for @e type #CT_COMMUNICATOR.
1660      */
1661     struct {
1662       /**
1663        * If @e type is #CT_COMMUNICATOR, this communicator
1664        * supports communicating using these addresses.
1665        */
1666       char *address_prefix;
1667
1668       /**
1669        * Head of DLL of queues offered by this communicator.
1670        */
1671       struct Queue *queue_head;
1672
1673       /**
1674        * Tail of DLL of queues offered by this communicator.
1675        */
1676       struct Queue *queue_tail;
1677
1678       /**
1679        * Head of list of the addresses of this peer offered by this communicator.
1680        */
1681       struct AddressListEntry *addr_head;
1682
1683       /**
1684        * Tail of list of the addresses of this peer offered by this communicator.
1685        */
1686       struct AddressListEntry *addr_tail;
1687
1688       /**
1689        * Number of queue entries in all queues to this communicator. Used
1690        * throttle sending to a communicator if we see that the communicator
1691        * is globally unable to keep up.
1692        */
1693       unsigned int total_queue_length;
1694
1695       /**
1696        * Characteristics of this communicator.
1697        */
1698       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1699
1700     } communicator;
1701
1702     /**
1703      * Information for @e type #CT_APPLICATION
1704      */
1705     struct {
1706
1707       /**
1708        * Map of requests for peers the given client application would like to
1709        * see connections for.  Maps from PIDs to `struct PeerRequest`.
1710        */
1711       struct GNUNET_CONTAINER_MultiPeerMap *requests;
1712
1713     } application;
1714
1715   } details;
1716
1717 };
1718
1719
1720 /**
1721  * State we keep for validation activities.  Each of these
1722  * is both in the #validation_heap and the #validation_map.
1723  */
1724 struct ValidationState
1725 {
1726
1727   /**
1728    * For which peer is @a address to be validated (or possibly valid)?
1729    * Serves as key in the #validation_map.
1730    */
1731   struct GNUNET_PeerIdentity pid;
1732
1733   /**
1734    * How long did the peer claim this @e address to be valid? Capped at
1735    * minimum of #MAX_ADDRESS_VALID_UNTIL relative to the time where we last
1736    * were told about the address and the value claimed by the other peer at
1737    * that time.  May be updated similarly when validation succeeds.
1738    */
1739   struct GNUNET_TIME_Absolute valid_until;
1740
1741   /**
1742    * How long do *we* consider this @e address to be valid?
1743    * In the past or zero if we have not yet validated it.
1744    */
1745   struct GNUNET_TIME_Absolute validated_until;
1746
1747   /**
1748    * When did we FIRST use the current @e challenge in a message?
1749    * Used to sanity-check @code{origin_time} in the response when
1750    * calculating the RTT. If the @code{origin_time} is not in
1751    * the expected range, the response is discarded as malicious.
1752    */
1753   struct GNUNET_TIME_Absolute first_challenge_use;
1754
1755   /**
1756    * When did we LAST use the current @e challenge in a message?
1757    * Used to sanity-check @code{origin_time} in the response when
1758    * calculating the RTT.  If the @code{origin_time} is not in
1759    * the expected range, the response is discarded as malicious.
1760    */
1761   struct GNUNET_TIME_Absolute last_challenge_use;
1762
1763   /**
1764    * Next time we will send the @e challenge to the peer, if this time is past
1765    * @e valid_until, this validation state is released at this time.  If the
1766    * address is valid, @e next_challenge is set to @e validated_until MINUS @e
1767    * validation_delay * #VALIDATION_RTT_BUFFER_FACTOR, such that we will try
1768    * to re-validate before the validity actually expires.
1769    */
1770   struct GNUNET_TIME_Absolute next_challenge;
1771
1772   /**
1773    * Current backoff factor we're applying for sending the @a challenge.
1774    * Reset to 0 if the @a challenge is confirmed upon validation.
1775    * Reduced to minimum of #FAST_VALIDATION_CHALLENGE_FREQ and half of the
1776    * existing value if we receive an unvalidated address again over
1777    * another channel (and thus should consider the information "fresh").
1778    * Maximum is #MAX_VALIDATION_CHALLENGE_FREQ.
1779    */
1780   struct GNUNET_TIME_Relative challenge_backoff;
1781
1782   /**
1783    * Initially set to "forever". Once @e validated_until is set, this value is
1784    * set to the RTT that tells us how long it took to receive the validation.
1785    */
1786   struct GNUNET_TIME_Relative validation_rtt;
1787
1788   /**
1789    * The challenge we sent to the peer to get it to validate the address. Note
1790    * that we rotate the challenge whenever we update @e validated_until to
1791    * avoid attacks where a peer simply replays an old challenge in the future.
1792    * (We must not rotate more often as otherwise we may discard valid answers
1793    * due to packet losses, latency and reorderings on the network).
1794    */
1795   struct GNUNET_ShortHashCode challenge;
1796
1797   /**
1798    * Claimed address of the peer.
1799    */
1800   char *address;
1801
1802   /**
1803    * Entry in the #validation_heap, which is sorted by @e next_challenge. The
1804    * heap is used to figure out when the next validation activity should be
1805    * run.
1806    */
1807   struct GNUNET_CONTAINER_HeapNode *hn;
1808
1809   /**
1810    * Handle to a PEERSTORE store operation for this @e address.  NULL if
1811    * no PEERSTORE operation is pending.
1812    */
1813   struct GNUNET_PEERSTORE_StoreContext *sc;
1814
1815   /**
1816    * We are technically ready to send the challenge, but we are waiting for
1817    * the respective queue to become available for transmission.
1818    */
1819   int awaiting_queue;
1820
1821 };
1822
1823
1824 /**
1825  * Head of linked list of all clients to this service.
1826  */
1827 static struct TransportClient *clients_head;
1828
1829 /**
1830  * Tail of linked list of all clients to this service.
1831  */
1832 static struct TransportClient *clients_tail;
1833
1834 /**
1835  * Statistics handle.
1836  */
1837 static struct GNUNET_STATISTICS_Handle *GST_stats;
1838
1839 /**
1840  * Configuration handle.
1841  */
1842 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1843
1844 /**
1845  * Our public key.
1846  */
1847 static struct GNUNET_PeerIdentity GST_my_identity;
1848
1849 /**
1850  * Our private key.
1851  */
1852 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1853
1854 /**
1855  * Map from PIDs to `struct Neighbour` entries.  A peer is
1856  * a neighbour if we have an MQ to it from some communicator.
1857  */
1858 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1859
1860 /**
1861  * Map from PIDs to `struct DistanceVector` entries describing
1862  * known paths to the peer.
1863  */
1864 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1865
1866 /**
1867  * Map from PIDs to `struct ValidationState` entries describing
1868  * addresses we are aware of and their validity state.
1869  */
1870 static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
1871
1872 /**
1873  * Map from challenges to `struct LearnLaunchEntry` values.
1874  */
1875 static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map;
1876
1877 /**
1878  * Head of a DLL sorted by launch time.
1879  */
1880 static struct LearnLaunchEntry *lle_head;
1881
1882 /**
1883  * Tail of a DLL sorted by launch time.
1884  */
1885 static struct LearnLaunchEntry *lle_tail;
1886
1887 /**
1888  * MIN Heap sorted by "next_challenge" to `struct ValidationState` entries
1889  * sorting addresses we are aware of by when we should next try to (re)validate
1890  * (or expire) them.
1891  */
1892 static struct GNUNET_CONTAINER_Heap *validation_heap;
1893
1894 /**
1895  * Database for peer's HELLOs.
1896  */
1897 static struct GNUNET_PEERSTORE_Handle *peerstore;
1898
1899 /**
1900  * Heap sorting `struct EphemeralCacheEntry` by their
1901  * key/signature validity.
1902  */
1903 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1904
1905 /**
1906  * Hash map for looking up `struct EphemeralCacheEntry`s
1907  * by peer identity. (We may have ephemerals in our
1908  * cache for which we do not have a neighbour entry,
1909  * and similar many neighbours may not need ephemerals,
1910  * so we use a second map.)
1911  */
1912 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1913
1914 /**
1915  * Task to free expired ephemerals.
1916  */
1917 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
1918
1919 /**
1920  * Task run to initiate DV learning.
1921  */
1922 static struct GNUNET_SCHEDULER_Task *dvlearn_task;
1923
1924 /**
1925  * Task to run address validation.
1926  */
1927 static struct GNUNET_SCHEDULER_Task *validation_task;
1928
1929
1930 /**
1931  * Free cached ephemeral key.
1932  *
1933  * @param ece cached signature to free
1934  */
1935 static void
1936 free_ephemeral (struct EphemeralCacheEntry *ece)
1937 {
1938   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1939                                         &ece->target,
1940                                         ece);
1941   GNUNET_CONTAINER_heap_remove_node (ece->hn);
1942   GNUNET_free (ece);
1943 }
1944
1945
1946 /**
1947  * Free validation state.
1948  *
1949  * @param vs validation state to free
1950  */
1951 static void
1952 free_validation_state (struct ValidationState *vs)
1953 {
1954   GNUNET_CONTAINER_multipeermap_remove (validation_map,
1955                                         &vs->pid,
1956                                         vs);
1957   GNUNET_CONTAINER_heap_remove_node (vs->hn);
1958   vs->hn = NULL;
1959   if (NULL != vs->sc)
1960   {
1961     GNUNET_PEERSTORE_store_cancel (vs->sc);
1962     vs->sc = NULL;
1963   }
1964   GNUNET_free (vs->address);
1965   GNUNET_free (vs);
1966 }
1967
1968
1969 /**
1970  * Lookup neighbour record for peer @a pid.
1971  *
1972  * @param pid neighbour to look for
1973  * @return NULL if we do not have this peer as a neighbour
1974  */
1975 static struct Neighbour *
1976 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1977 {
1978   return GNUNET_CONTAINER_multipeermap_get (neighbours,
1979                                             pid);
1980 }
1981
1982
1983 /**
1984  * Details about what to notify monitors about.
1985  */
1986 struct MonitorEvent
1987 {
1988   /**
1989    * @deprecated To be discussed if we keep these...
1990    */
1991   struct GNUNET_TIME_Absolute last_validation;
1992   struct GNUNET_TIME_Absolute valid_until;
1993   struct GNUNET_TIME_Absolute next_validation;
1994
1995   /**
1996    * Current round-trip time estimate.
1997    */
1998   struct GNUNET_TIME_Relative rtt;
1999
2000   /**
2001    * Connection status.
2002    */
2003   enum GNUNET_TRANSPORT_ConnectionStatus cs;
2004
2005   /**
2006    * Messages pending.
2007    */
2008   uint32_t num_msg_pending;
2009
2010   /**
2011    * Bytes pending.
2012    */
2013   uint32_t num_bytes_pending;
2014
2015
2016 };
2017
2018
2019 /**
2020  * Free a @dvh. Callers MAY want to check if this was the last path to the
2021  * `target`, and if so call #free_dv_route to also free the associated DV
2022  * entry in #dv_routes (if not, the associated scheduler job should eventually
2023  * take care of it).
2024  *
2025  * @param dvh hop to free
2026  */
2027 static void
2028 free_distance_vector_hop (struct DistanceVectorHop *dvh)
2029 {
2030   struct Neighbour *n = dvh->next_hop;
2031   struct DistanceVector *dv = dvh->dv;
2032
2033   GNUNET_CONTAINER_MDLL_remove (neighbour,
2034                                 n->dv_head,
2035                                 n->dv_tail,
2036                                 dvh);
2037   GNUNET_CONTAINER_MDLL_remove (dv,
2038                                 dv->dv_head,
2039                                 dv->dv_tail,
2040                                 dvh);
2041   GNUNET_free (dvh);
2042 }
2043
2044
2045 /**
2046  * Free entry in #dv_routes.  First frees all hops to the target, and
2047  * if there are no entries left, frees @a dv as well.
2048  *
2049  * @param dv route to free
2050  */
2051 static void
2052 free_dv_route (struct DistanceVector *dv)
2053 {
2054   struct DistanceVectorHop *dvh;
2055
2056   while (NULL != (dvh = dv->dv_head))
2057     free_distance_vector_hop (dvh);
2058   if (NULL == dv->dv_head)
2059   {
2060     GNUNET_assert (GNUNET_YES ==
2061                    GNUNET_CONTAINER_multipeermap_remove (dv_routes,
2062                                                          &dv->target,
2063                                                          dv));
2064     if (NULL != dv->timeout_task)
2065       GNUNET_SCHEDULER_cancel (dv->timeout_task);
2066     GNUNET_free (dv);
2067   }
2068 }
2069
2070
2071 /**
2072  * Notify monitor @a tc about an event.  That @a tc
2073  * cares about the event has already been checked.
2074  *
2075  * Send @a tc information in @a me about a @a peer's status with
2076  * respect to some @a address to all monitors that care.
2077  *
2078  * @param tc monitor to inform
2079  * @param peer peer the information is about
2080  * @param address address the information is about
2081  * @param nt network type associated with @a address
2082  * @param me detailed information to transmit
2083  */
2084 static void
2085 notify_monitor (struct TransportClient *tc,
2086                 const struct GNUNET_PeerIdentity *peer,
2087                 const char *address,
2088                 enum GNUNET_NetworkType nt,
2089                 const struct MonitorEvent *me)
2090 {
2091   struct GNUNET_MQ_Envelope *env;
2092   struct GNUNET_TRANSPORT_MonitorData *md;
2093   size_t addr_len = strlen (address) + 1;
2094
2095   env = GNUNET_MQ_msg_extra (md,
2096                              addr_len,
2097                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
2098   md->nt = htonl ((uint32_t) nt);
2099   md->peer = *peer;
2100   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
2101   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
2102   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
2103   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
2104   md->cs = htonl ((uint32_t) me->cs);
2105   md->num_msg_pending = htonl (me->num_msg_pending);
2106   md->num_bytes_pending = htonl (me->num_bytes_pending);
2107   memcpy (&md[1],
2108           address,
2109           addr_len);
2110   GNUNET_MQ_send (tc->mq,
2111                   env);
2112 }
2113
2114
2115 /**
2116  * Send information in @a me about a @a peer's status with respect
2117  * to some @a address to all monitors that care.
2118  *
2119  * @param peer peer the information is about
2120  * @param address address the information is about
2121  * @param nt network type associated with @a address
2122  * @param me detailed information to transmit
2123  */
2124 static void
2125 notify_monitors (const struct GNUNET_PeerIdentity *peer,
2126                  const char *address,
2127                  enum GNUNET_NetworkType nt,
2128                  const struct MonitorEvent *me)
2129 {
2130   for (struct TransportClient *tc = clients_head;
2131        NULL != tc;
2132        tc = tc->next)
2133   {
2134     if (CT_MONITOR != tc->type)
2135       continue;
2136     if (tc->details.monitor.one_shot)
2137       continue;
2138     if ( (0 != GNUNET_is_zero (&tc->details.monitor.peer)) &&
2139          (0 != GNUNET_memcmp (&tc->details.monitor.peer,
2140                               peer)) )
2141       continue;
2142     notify_monitor (tc,
2143                     peer,
2144                     address,
2145                     nt,
2146                     me);
2147   }
2148 }
2149
2150
2151 /**
2152  * Called whenever a client connects.  Allocates our
2153  * data structures associated with that client.
2154  *
2155  * @param cls closure, NULL
2156  * @param client identification of the client
2157  * @param mq message queue for the client
2158  * @return our `struct TransportClient`
2159  */
2160 static void *
2161 client_connect_cb (void *cls,
2162                    struct GNUNET_SERVICE_Client *client,
2163                    struct GNUNET_MQ_Handle *mq)
2164 {
2165   struct TransportClient *tc;
2166
2167   (void) cls;
2168   tc = GNUNET_new (struct TransportClient);
2169   tc->client = client;
2170   tc->mq = mq;
2171   GNUNET_CONTAINER_DLL_insert (clients_head,
2172                                clients_tail,
2173                                tc);
2174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2175               "Client %p connected\n",
2176               tc);
2177   return tc;
2178 }
2179
2180
2181 /**
2182  * Free @a rc
2183  *
2184  * @param rc data structure to free
2185  */
2186 static void
2187 free_reassembly_context (struct ReassemblyContext *rc)
2188 {
2189   struct Neighbour *n = rc->neighbour;
2190
2191   GNUNET_assert (rc ==
2192                  GNUNET_CONTAINER_heap_remove_node (rc->hn));
2193   GNUNET_assert (GNUNET_OK ==
2194                  GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
2195                                                         &rc->msg_uuid,
2196                                                         rc));
2197   GNUNET_free (rc);
2198 }
2199
2200
2201 /**
2202  * Task run to clean up reassembly context of a neighbour that have expired.
2203  *
2204  * @param cls a `struct Neighbour`
2205  */
2206 static void
2207 reassembly_cleanup_task (void *cls)
2208 {
2209   struct Neighbour *n = cls;
2210   struct ReassemblyContext *rc;
2211
2212   n->reassembly_timeout_task = NULL;
2213   while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
2214   {
2215     if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
2216     {
2217       free_reassembly_context (rc);
2218       continue;
2219     }
2220     GNUNET_assert (NULL == n->reassembly_timeout_task);
2221     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
2222                                                           &reassembly_cleanup_task,
2223                                                           n);
2224     return;
2225   }
2226 }
2227
2228
2229 /**
2230  * function called to #free_reassembly_context().
2231  *
2232  * @param cls NULL
2233  * @param key unused
2234  * @param value a `struct ReassemblyContext` to free
2235  * @return #GNUNET_OK (continue iteration)
2236  */
2237 static int
2238 free_reassembly_cb (void *cls,
2239                     const struct GNUNET_ShortHashCode *key,
2240                     void *value)
2241 {
2242   struct ReassemblyContext *rc = value;
2243   (void) cls;
2244   (void) key;
2245
2246   free_reassembly_context (rc);
2247   return GNUNET_OK;
2248 }
2249
2250
2251 /**
2252  * Release memory used by @a neighbour.
2253  *
2254  * @param neighbour neighbour entry to free
2255  */
2256 static void
2257 free_neighbour (struct Neighbour *neighbour)
2258 {
2259   struct DistanceVectorHop *dvh;
2260
2261   GNUNET_assert (NULL == neighbour->queue_head);
2262   GNUNET_assert (GNUNET_YES ==
2263                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
2264                                                        &neighbour->pid,
2265                                                        neighbour));
2266   if (NULL != neighbour->timeout_task)
2267     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
2268   if (NULL != neighbour->reassembly_map)
2269   {
2270     GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
2271                                             &free_reassembly_cb,
2272                                             NULL);
2273     GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
2274     neighbour->reassembly_map = NULL;
2275     GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
2276     neighbour->reassembly_heap = NULL;
2277   }
2278   while (NULL != (dvh = neighbour->dv_head))
2279   {
2280     struct DistanceVector *dv = dvh->dv;
2281
2282     free_distance_vector_hop (dvh);
2283     if (NULL == dv->dv_head)
2284       free_dv_route (dv);
2285   }
2286   if (NULL != neighbour->reassembly_timeout_task)
2287     GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
2288   GNUNET_free (neighbour);
2289 }
2290
2291
2292 /**
2293  * Send message to CORE clients that we lost a connection.
2294  *
2295  * @param tc client to inform (must be CORE client)
2296  * @param pid peer the connection is for
2297  * @param quota_out current quota for the peer
2298  */
2299 static void
2300 core_send_connect_info (struct TransportClient *tc,
2301                         const struct GNUNET_PeerIdentity *pid,
2302                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2303 {
2304   struct GNUNET_MQ_Envelope *env;
2305   struct ConnectInfoMessage *cim;
2306
2307   GNUNET_assert (CT_CORE == tc->type);
2308   env = GNUNET_MQ_msg (cim,
2309                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2310   cim->quota_out = quota_out;
2311   cim->id = *pid;
2312   GNUNET_MQ_send (tc->mq,
2313                   env);
2314 }
2315
2316
2317 /**
2318  * Send message to CORE clients that we gained a connection
2319  *
2320  * @param pid peer the queue was for
2321  * @param quota_out current quota for the peer
2322  */
2323 static void
2324 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
2325                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2326 {
2327   for (struct TransportClient *tc = clients_head;
2328        NULL != tc;
2329        tc = tc->next)
2330   {
2331     if (CT_CORE != tc->type)
2332       continue;
2333     core_send_connect_info (tc,
2334                             pid,
2335                             quota_out);
2336   }
2337 }
2338
2339
2340 /**
2341  * Send message to CORE clients that we lost a connection.
2342  *
2343  * @param pid peer the connection was for
2344  */
2345 static void
2346 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
2347 {
2348   for (struct TransportClient *tc = clients_head;
2349        NULL != tc;
2350        tc = tc->next)
2351   {
2352     struct GNUNET_MQ_Envelope *env;
2353     struct DisconnectInfoMessage *dim;
2354
2355     if (CT_CORE != tc->type)
2356       continue;
2357     env = GNUNET_MQ_msg (dim,
2358                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
2359     dim->peer = *pid;
2360     GNUNET_MQ_send (tc->mq,
2361                     env);
2362   }
2363 }
2364
2365
2366 /**
2367  * We believe we are ready to transmit a message on a queue. Double-checks
2368  * with the queue's "tracker_out" and then gives the message to the
2369  * communicator for transmission (updating the tracker, and re-scheduling
2370  * itself if applicable).
2371  *
2372  * @param cls the `struct Queue` to process transmissions for
2373  */
2374 static void
2375 transmit_on_queue (void *cls);
2376
2377
2378 /**
2379  * Schedule next run of #transmit_on_queue().  Does NOTHING if
2380  * we should run immediately or if the message queue is empty.
2381  * Test for no task being added AND queue not being empty to
2382  * transmit immediately afterwards!  This function must only
2383  * be called if the message queue is non-empty!
2384  *
2385  * @param queue the queue to do scheduling for
2386  */
2387 static void
2388 schedule_transmit_on_queue (struct Queue *queue)
2389 {
2390   struct Neighbour *n = queue->neighbour;
2391   struct PendingMessage *pm = n->pending_msg_head;
2392   struct GNUNET_TIME_Relative out_delay;
2393   unsigned int wsize;
2394
2395   GNUNET_assert (NULL != pm);
2396   if (queue->tc->details.communicator.total_queue_length >=
2397       COMMUNICATOR_TOTAL_QUEUE_LIMIT)
2398   {
2399     GNUNET_STATISTICS_update (GST_stats,
2400                               "# Transmission throttled due to communicator queue limit",
2401                               1,
2402                               GNUNET_NO);
2403     return;
2404   }
2405   if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
2406   {
2407     GNUNET_STATISTICS_update (GST_stats,
2408                               "# Transmission throttled due to queue queue limit",
2409                               1,
2410                               GNUNET_NO);
2411     return;
2412   }
2413
2414   wsize = (0 == queue->mtu)
2415     ? pm->bytes_msg /* FIXME: add overheads? */
2416     : queue->mtu;
2417   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
2418                                                   wsize);
2419   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
2420                                         out_delay);
2421   if (0 == out_delay.rel_value_us)
2422     return; /* we should run immediately! */
2423   /* queue has changed since we were scheduled, reschedule again */
2424   queue->transmit_task
2425     = GNUNET_SCHEDULER_add_delayed (out_delay,
2426                                     &transmit_on_queue,
2427                                     queue);
2428   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
2429     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2430                 "Next transmission on queue `%s' in %s (high delay)\n",
2431                 queue->address,
2432                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2433                                                         GNUNET_YES));
2434   else
2435     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2436                 "Next transmission on queue `%s' in %s\n",
2437                 queue->address,
2438                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2439                                                         GNUNET_YES));
2440 }
2441
2442
2443 /**
2444  * Free @a queue.
2445  *
2446  * @param queue the queue to free
2447  */
2448 static void
2449 free_queue (struct Queue *queue)
2450 {
2451   struct Neighbour *neighbour = queue->neighbour;
2452   struct TransportClient *tc = queue->tc;
2453   struct MonitorEvent me = {
2454     .cs = GNUNET_TRANSPORT_CS_DOWN,
2455     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
2456   };
2457   struct QueueEntry *qe;
2458   int maxxed;
2459
2460   if (NULL != queue->transmit_task)
2461   {
2462     GNUNET_SCHEDULER_cancel (queue->transmit_task);
2463     queue->transmit_task = NULL;
2464   }
2465   GNUNET_CONTAINER_MDLL_remove (neighbour,
2466                                 neighbour->queue_head,
2467                                 neighbour->queue_tail,
2468                                 queue);
2469   GNUNET_CONTAINER_MDLL_remove (client,
2470                                 tc->details.communicator.queue_head,
2471                                 tc->details.communicator.queue_tail,
2472                                 queue);
2473   maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
2474   while (NULL != (qe = queue->queue_head))
2475   {
2476     GNUNET_CONTAINER_DLL_remove (queue->queue_head,
2477                                  queue->queue_tail,
2478                                  qe);
2479     queue->queue_length--;
2480     tc->details.communicator.total_queue_length--;
2481     GNUNET_free (qe);
2482   }
2483   GNUNET_assert (0 == queue->queue_length);
2484   if ( (maxxed) &&
2485        (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2486   {
2487     /* Communicator dropped below threshold, resume all queues */
2488     GNUNET_STATISTICS_update (GST_stats,
2489                               "# Transmission throttled due to communicator queue limit",
2490                               -1,
2491                               GNUNET_NO);
2492     for (struct Queue *s = tc->details.communicator.queue_head;
2493          NULL != s;
2494          s = s->next_client)
2495       schedule_transmit_on_queue (s);
2496   }
2497   notify_monitors (&neighbour->pid,
2498                    queue->address,
2499                    queue->nt,
2500                    &me);
2501   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2502   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2503   GNUNET_free (queue);
2504   if (NULL == neighbour->queue_head)
2505   {
2506     cores_send_disconnect_info (&neighbour->pid);
2507     free_neighbour (neighbour);
2508   }
2509 }
2510
2511
2512 /**
2513  * Free @a ale
2514  *
2515  * @param ale address list entry to free
2516  */
2517 static void
2518 free_address_list_entry (struct AddressListEntry *ale)
2519 {
2520   struct TransportClient *tc = ale->tc;
2521
2522   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2523                                tc->details.communicator.addr_tail,
2524                                ale);
2525   if (NULL != ale->sc)
2526   {
2527     GNUNET_PEERSTORE_store_cancel (ale->sc);
2528     ale->sc = NULL;
2529   }
2530   if (NULL != ale->st)
2531   {
2532     GNUNET_SCHEDULER_cancel (ale->st);
2533     ale->st = NULL;
2534   }
2535   GNUNET_free (ale);
2536 }
2537
2538
2539 /**
2540  * Stop the peer request in @a value.
2541  *
2542  * @param cls a `struct TransportClient` that no longer makes the request
2543  * @param pid the peer's identity
2544  * @param value a `struct PeerRequest`
2545  * @return #GNUNET_YES (always)
2546  */
2547 static int
2548 stop_peer_request (void *cls,
2549                    const struct GNUNET_PeerIdentity *pid,
2550                    void *value)
2551 {
2552   struct TransportClient *tc = cls;
2553   struct PeerRequest *pr = value;
2554
2555   GNUNET_PEERSTORE_watch_cancel (pr->wc);
2556   GNUNET_assert (GNUNET_YES ==
2557                  GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
2558                                                        pid,
2559                                                        pr));
2560   GNUNET_free (pr);
2561
2562   return GNUNET_OK;
2563 }
2564
2565
2566 /**
2567  * Called whenever a client is disconnected.  Frees our
2568  * resources associated with that client.
2569  *
2570  * @param cls closure, NULL
2571  * @param client identification of the client
2572  * @param app_ctx our `struct TransportClient`
2573  */
2574 static void
2575 client_disconnect_cb (void *cls,
2576                       struct GNUNET_SERVICE_Client *client,
2577                       void *app_ctx)
2578 {
2579   struct TransportClient *tc = app_ctx;
2580
2581   (void) cls;
2582   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2583               "Client %p disconnected, cleaning up.\n",
2584               tc);
2585   GNUNET_CONTAINER_DLL_remove (clients_head,
2586                                clients_tail,
2587                                tc);
2588   switch (tc->type)
2589   {
2590   case CT_NONE:
2591     break;
2592   case CT_CORE:
2593     {
2594       struct PendingMessage *pm;
2595
2596       while (NULL != (pm = tc->details.core.pending_msg_head))
2597       {
2598         GNUNET_CONTAINER_MDLL_remove (client,
2599                                       tc->details.core.pending_msg_head,
2600                                       tc->details.core.pending_msg_tail,
2601                                       pm);
2602         pm->client = NULL;
2603       }
2604     }
2605     break;
2606   case CT_MONITOR:
2607     break;
2608   case CT_COMMUNICATOR:
2609     {
2610       struct Queue *q;
2611       struct AddressListEntry *ale;
2612
2613       while (NULL != (q = tc->details.communicator.queue_head))
2614         free_queue (q);
2615       while (NULL != (ale = tc->details.communicator.addr_head))
2616         free_address_list_entry (ale);
2617       GNUNET_free (tc->details.communicator.address_prefix);
2618     }
2619     break;
2620   case CT_APPLICATION:
2621     GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
2622                                            &stop_peer_request,
2623                                            tc);
2624     GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
2625     break;
2626   }
2627   GNUNET_free (tc);
2628 }
2629
2630
2631 /**
2632  * Iterator telling new CORE client about all existing
2633  * connections to peers.
2634  *
2635  * @param cls the new `struct TransportClient`
2636  * @param pid a connected peer
2637  * @param value the `struct Neighbour` with more information
2638  * @return #GNUNET_OK (continue to iterate)
2639  */
2640 static int
2641 notify_client_connect_info (void *cls,
2642                             const struct GNUNET_PeerIdentity *pid,
2643                             void *value)
2644 {
2645   struct TransportClient *tc = cls;
2646   struct Neighbour *neighbour = value;
2647
2648   core_send_connect_info (tc,
2649                           pid,
2650                           neighbour->quota_out);
2651   return GNUNET_OK;
2652 }
2653
2654
2655 /**
2656  * Initialize a "CORE" client.  We got a start message from this
2657  * client, so add it to the list of clients for broadcasting of
2658  * inbound messages.
2659  *
2660  * @param cls the client
2661  * @param start the start message that was sent
2662  */
2663 static void
2664 handle_client_start (void *cls,
2665                      const struct StartMessage *start)
2666 {
2667   struct TransportClient *tc = cls;
2668   uint32_t options;
2669
2670   options = ntohl (start->options);
2671   if ( (0 != (1 & options)) &&
2672        (0 !=
2673         GNUNET_memcmp (&start->self,
2674                        &GST_my_identity)) )
2675   {
2676     /* client thinks this is a different peer, reject */
2677     GNUNET_break (0);
2678     GNUNET_SERVICE_client_drop (tc->client);
2679     return;
2680   }
2681   if (CT_NONE != tc->type)
2682   {
2683     GNUNET_break (0);
2684     GNUNET_SERVICE_client_drop (tc->client);
2685     return;
2686   }
2687   tc->type = CT_CORE;
2688   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2689                                          &notify_client_connect_info,
2690                                          tc);
2691   GNUNET_SERVICE_client_continue (tc->client);
2692 }
2693
2694
2695 /**
2696  * Client asked for transmission to a peer.  Process the request.
2697  *
2698  * @param cls the client
2699  * @param obm the send message that was sent
2700  */
2701 static int
2702 check_client_send (void *cls,
2703                    const struct OutboundMessage *obm)
2704 {
2705   struct TransportClient *tc = cls;
2706   uint16_t size;
2707   const struct GNUNET_MessageHeader *obmm;
2708
2709   if (CT_CORE != tc->type)
2710   {
2711     GNUNET_break (0);
2712     return GNUNET_SYSERR;
2713   }
2714   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2715   if (size < sizeof (struct GNUNET_MessageHeader))
2716   {
2717     GNUNET_break (0);
2718     return GNUNET_SYSERR;
2719   }
2720   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2721   if (size != ntohs (obmm->size))
2722   {
2723     GNUNET_break (0);
2724     return GNUNET_SYSERR;
2725   }
2726   return GNUNET_OK;
2727 }
2728
2729
2730 /**
2731  * Free fragment tree below @e root, excluding @e root itself.
2732  *
2733  * @param root root of the tree to free
2734  */
2735 static void
2736 free_fragment_tree (struct PendingMessage *root)
2737 {
2738   struct PendingMessage *frag;
2739
2740   while (NULL != (frag = root->head_frag))
2741   {
2742     free_fragment_tree (frag);
2743     GNUNET_CONTAINER_MDLL_remove (frag,
2744                                   root->head_frag,
2745                                   root->tail_frag,
2746                                   frag);
2747     GNUNET_free (frag);
2748   }
2749 }
2750
2751
2752 /**
2753  * Release memory associated with @a pm and remove @a pm from associated
2754  * data structures.  @a pm must be a top-level pending message and not
2755  * a fragment in the tree.  The entire tree is freed (if applicable).
2756  *
2757  * @param pm the pending message to free
2758  */
2759 static void
2760 free_pending_message (struct PendingMessage *pm)
2761 {
2762   struct TransportClient *tc = pm->client;
2763   struct Neighbour *target = pm->target;
2764
2765   if (NULL != tc)
2766   {
2767     GNUNET_CONTAINER_MDLL_remove (client,
2768                                   tc->details.core.pending_msg_head,
2769                                   tc->details.core.pending_msg_tail,
2770                                   pm);
2771   }
2772   GNUNET_CONTAINER_MDLL_remove (neighbour,
2773                                 target->pending_msg_head,
2774                                 target->pending_msg_tail,
2775                                 pm);
2776   free_fragment_tree (pm);
2777   GNUNET_free_non_null (pm->bpm);
2778   GNUNET_free (pm);
2779 }
2780
2781
2782 /**
2783  * Send a response to the @a pm that we have processed a
2784  * "send" request with status @a success. We
2785  * transmitted @a bytes_physical on the actual wire.
2786  * Sends a confirmation to the "core" client responsible
2787  * for the original request and free's @a pm.
2788  *
2789  * @param pm handle to the original pending message
2790  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2791  *          for transmission failure
2792  * @param bytes_physical amount of bandwidth consumed
2793  */
2794 static void
2795 client_send_response (struct PendingMessage *pm,
2796                       int success,
2797                       uint32_t bytes_physical)
2798 {
2799   struct TransportClient *tc = pm->client;
2800   struct Neighbour *target = pm->target;
2801   struct GNUNET_MQ_Envelope *env;
2802   struct SendOkMessage *som;
2803
2804   if (NULL != tc)
2805   {
2806     env = GNUNET_MQ_msg (som,
2807                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2808     som->success = htonl ((uint32_t) success);
2809     som->bytes_msg = htons (pm->bytes_msg);
2810     som->bytes_physical = htonl (bytes_physical);
2811     som->peer = target->pid;
2812     GNUNET_MQ_send (tc->mq,
2813                     env);
2814   }
2815   free_pending_message (pm);
2816 }
2817
2818
2819 /**
2820  * Checks the message queue for a neighbour for messages that have timed
2821  * out and purges them.
2822  *
2823  * @param cls a `struct Neighbour`
2824  */
2825 static void
2826 check_queue_timeouts (void *cls)
2827 {
2828   struct Neighbour *n = cls;
2829   struct PendingMessage *pm;
2830   struct GNUNET_TIME_Absolute now;
2831   struct GNUNET_TIME_Absolute earliest_timeout;
2832
2833   n->timeout_task = NULL;
2834   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2835   now = GNUNET_TIME_absolute_get ();
2836   for (struct PendingMessage *pos = n->pending_msg_head;
2837        NULL != pos;
2838        pos = pm)
2839   {
2840     pm = pos->next_neighbour;
2841     if (pos->timeout.abs_value_us <= now.abs_value_us)
2842     {
2843       GNUNET_STATISTICS_update (GST_stats,
2844                                 "# messages dropped (timeout before confirmation)",
2845                                 1,
2846                                 GNUNET_NO);
2847       client_send_response (pm,
2848                             GNUNET_NO,
2849                             0);
2850       continue;
2851     }
2852     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2853                                                  pos->timeout);
2854   }
2855   n->earliest_timeout = earliest_timeout;
2856   if (NULL != n->pending_msg_head)
2857     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2858                                                &check_queue_timeouts,
2859                                                n);
2860 }
2861
2862
2863 /**
2864  * Client asked for transmission to a peer.  Process the request.
2865  *
2866  * @param cls the client
2867  * @param obm the send message that was sent
2868  */
2869 static void
2870 handle_client_send (void *cls,
2871                     const struct OutboundMessage *obm)
2872 {
2873   struct TransportClient *tc = cls;
2874   struct PendingMessage *pm;
2875   const struct GNUNET_MessageHeader *obmm;
2876   struct Neighbour *target;
2877   uint32_t bytes_msg;
2878   int was_empty;
2879
2880   GNUNET_assert (CT_CORE == tc->type);
2881   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2882   bytes_msg = ntohs (obmm->size);
2883   target = lookup_neighbour (&obm->peer);
2884   if (NULL == target)
2885   {
2886     /* Failure: don't have this peer as a neighbour (anymore).
2887        Might have gone down asynchronously, so this is NOT
2888        a protocol violation by CORE. Still count the event,
2889        as this should be rare. */
2890     struct GNUNET_MQ_Envelope *env;
2891     struct SendOkMessage *som;
2892
2893     env = GNUNET_MQ_msg (som,
2894                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2895     som->success = htonl (GNUNET_SYSERR);
2896     som->bytes_msg = htonl (bytes_msg);
2897     som->bytes_physical = htonl (0);
2898     som->peer = obm->peer;
2899     GNUNET_MQ_send (tc->mq,
2900                     env);
2901     GNUNET_SERVICE_client_continue (tc->client);
2902     GNUNET_STATISTICS_update (GST_stats,
2903                               "# messages dropped (neighbour unknown)",
2904                               1,
2905                               GNUNET_NO);
2906     return;
2907   }
2908   was_empty = (NULL == target->pending_msg_head);
2909   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2910   pm->client = tc;
2911   pm->target = target;
2912   pm->bytes_msg = bytes_msg;
2913   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2914   memcpy (&pm[1],
2915           &obm[1],
2916           bytes_msg);
2917   GNUNET_CONTAINER_MDLL_insert (neighbour,
2918                                 target->pending_msg_head,
2919                                 target->pending_msg_tail,
2920                                 pm);
2921   GNUNET_CONTAINER_MDLL_insert (client,
2922                                 tc->details.core.pending_msg_head,
2923                                 tc->details.core.pending_msg_tail,
2924                                 pm);
2925   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2926   {
2927     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2928     if (NULL != target->timeout_task)
2929       GNUNET_SCHEDULER_cancel (target->timeout_task);
2930     target->timeout_task
2931       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2932                                  &check_queue_timeouts,
2933                                  target);
2934   }
2935   if (! was_empty)
2936     return; /* all queues must already be busy */
2937   for (struct Queue *queue = target->queue_head;
2938        NULL != queue;
2939        queue = queue->next_neighbour)
2940   {
2941     /* try transmission on any queue that is idle */
2942     if (NULL == queue->transmit_task)
2943       queue->transmit_task = GNUNET_SCHEDULER_add_now (&transmit_on_queue,
2944                                                        queue);
2945   }
2946 }
2947
2948
2949 /**
2950  * Communicator started.  Test message is well-formed.
2951  *
2952  * @param cls the client
2953  * @param cam the send message that was sent
2954  */
2955 static int
2956 check_communicator_available (void *cls,
2957                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2958 {
2959   struct TransportClient *tc = cls;
2960   uint16_t size;
2961
2962   if (CT_NONE != tc->type)
2963   {
2964     GNUNET_break (0);
2965     return GNUNET_SYSERR;
2966   }
2967   tc->type = CT_COMMUNICATOR;
2968   size = ntohs (cam->header.size) - sizeof (*cam);
2969   if (0 == size)
2970     return GNUNET_OK; /* receive-only communicator */
2971   GNUNET_MQ_check_zero_termination (cam);
2972   return GNUNET_OK;
2973 }
2974
2975
2976 /**
2977  * Communicator started.  Process the request.
2978  *
2979  * @param cls the client
2980  * @param cam the send message that was sent
2981  */
2982 static void
2983 handle_communicator_available (void *cls,
2984                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2985 {
2986   struct TransportClient *tc = cls;
2987   uint16_t size;
2988
2989   size = ntohs (cam->header.size) - sizeof (*cam);
2990   if (0 == size)
2991     return; /* receive-only communicator */
2992   tc->details.communicator.address_prefix
2993     = GNUNET_strdup ((const char *) &cam[1]);
2994   tc->details.communicator.cc
2995     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2996   GNUNET_SERVICE_client_continue (tc->client);
2997 }
2998
2999
3000 /**
3001  * Communicator requests backchannel transmission.  Check the request.
3002  *
3003  * @param cls the client
3004  * @param cb the send message that was sent
3005  * @return #GNUNET_OK if message is well-formed
3006  */
3007 static int
3008 check_communicator_backchannel (void *cls,
3009                                 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
3010 {
3011   const struct GNUNET_MessageHeader *inbox;
3012   const char *is;
3013   uint16_t msize;
3014   uint16_t isize;
3015
3016   (void) cls;
3017   msize = ntohs (cb->header.size) - sizeof (*cb);
3018   if (UINT16_MAX - msize >
3019       sizeof (struct TransportBackchannelEncapsulationMessage) +
3020       sizeof (struct TransportBackchannelRequestPayload) )
3021   {
3022     GNUNET_break (0);
3023     return GNUNET_SYSERR;
3024   }
3025   inbox = (const struct GNUNET_MessageHeader *) &cb[1];
3026   isize = ntohs (inbox->size);
3027   if (isize >= msize)
3028   {
3029     GNUNET_break (0);
3030     return GNUNET_SYSERR;
3031   }
3032   is = (const char *) inbox;
3033   is += isize;
3034   msize -= isize;
3035   GNUNET_assert (msize > 0);
3036   if ('\0' != is[msize-1])
3037   {
3038     GNUNET_break (0);
3039     return GNUNET_SYSERR;
3040   }
3041   return GNUNET_OK;
3042 }
3043
3044
3045 /**
3046  * Remove memory used by expired ephemeral keys.
3047  *
3048  * @param cls NULL
3049  */
3050 static void
3051 expire_ephemerals (void *cls)
3052 {
3053   struct EphemeralCacheEntry *ece;
3054
3055   (void) cls;
3056   ephemeral_task = NULL;
3057   while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
3058   {
3059     if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
3060     {
3061       free_ephemeral (ece);
3062       continue;
3063     }
3064     ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
3065                                               &expire_ephemerals,
3066                                               NULL);
3067     return;
3068   }
3069 }
3070
3071
3072 /**
3073  * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
3074  * one, cache it and return it.
3075  *
3076  * @param pid peer to look up ephemeral for
3077  * @param private_key[out] set to the private key
3078  * @param ephemeral_key[out] set to the key
3079  * @param ephemeral_sender_sig[out] set to the signature
3080  * @param ephemeral_validity[out] set to the validity expiration time
3081  */
3082 static void
3083 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
3084                   struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
3085                   struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
3086                   struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
3087                   struct GNUNET_TIME_Absolute *ephemeral_validity)
3088 {
3089   struct EphemeralCacheEntry *ece;
3090   struct EphemeralConfirmation ec;
3091
3092   ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
3093                                            pid);
3094   if ( (NULL != ece) &&
3095        (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
3096   {
3097     free_ephemeral (ece);
3098     ece = NULL;
3099   }
3100   if (NULL == ece)
3101   {
3102     ece = GNUNET_new (struct EphemeralCacheEntry);
3103     ece->target = *pid;
3104     ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
3105                                                         EPHEMERAL_VALIDITY);
3106     GNUNET_assert (GNUNET_OK ==
3107                    GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
3108     GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
3109                                         &ece->ephemeral_key);
3110     ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
3111     ec.purpose.size = htonl (sizeof (ec));
3112     ec.target = *pid;
3113     ec.ephemeral_key = ece->ephemeral_key;
3114     GNUNET_assert (GNUNET_OK ==
3115                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
3116                                              &ec.purpose,
3117                                              &ece->sender_sig));
3118     ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
3119                                             ece,
3120                                             ece->ephemeral_validity.abs_value_us);
3121     GNUNET_assert (GNUNET_OK ==
3122                    GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
3123                                                       &ece->target,
3124                                                       ece,
3125                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3126     if (NULL == ephemeral_task)
3127       ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
3128                                                 &expire_ephemerals,
3129                                                 NULL);
3130   }
3131   *private_key = ece->private_key;
3132   *ephemeral_key = ece->ephemeral_key;
3133   *ephemeral_sender_sig = ece->sender_sig;
3134   *ephemeral_validity = ece->ephemeral_validity;
3135 }
3136
3137
3138 /**
3139  * We need to transmit @a hdr to @a target.  If necessary, this may
3140  * involve DV routing or even broadcasting and fragmentation.
3141  *
3142  * @param target peer to receive @a hdr
3143  * @param hdr header of the message to route
3144  */
3145 static void
3146 route_message (const struct GNUNET_PeerIdentity *target,
3147                struct GNUNET_MessageHeader *hdr)
3148 {
3149   // FIXME: this one is tricky:
3150   // - we could try a direct, reliable channel
3151   // - if that is unavailable / for load balancing, we may try:
3152   //   * multiple (?) direct unreliable channels - depending on loss rate?
3153   //   * some (?) DV channels - if above unavailable / too lossy?
3154   //   * _random_ other peers ("broadcasting") in hope of *discovering*
3155   //      a path back! - if all else fails
3156   // => need more on DV first!
3157
3158   // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
3159   GNUNET_free (hdr);
3160 }
3161
3162
3163 /**
3164  * Communicator requests backchannel transmission.  Process the request.
3165  *
3166  * @param cls the client
3167  * @param cb the send message that was sent
3168  */
3169 static void
3170 handle_communicator_backchannel (void *cls,
3171                                  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
3172 {
3173   struct TransportClient *tc = cls;
3174   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
3175   struct GNUNET_TIME_Absolute ephemeral_validity;
3176   struct TransportBackchannelEncapsulationMessage *enc;
3177   struct TransportBackchannelRequestPayload ppay;
3178   char *mpos;
3179   uint16_t msize;
3180
3181   /* encapsulate and encrypt message */
3182   msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
3183   enc = GNUNET_malloc (sizeof (*enc) + msize);
3184   enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
3185   enc->header.size = htons (sizeof (*enc) + msize);
3186   enc->target = cb->pid;
3187   lookup_ephemeral (&cb->pid,
3188                     &private_key,
3189                     &enc->ephemeral_key,
3190                     &ppay.sender_sig,
3191                     &ephemeral_validity);
3192   // FIXME: setup 'iv'
3193 #if FIXME
3194   dh_key_derive (&private_key,
3195                  &cb->pid,
3196                  &enc->iv,
3197                  &key);
3198 #endif
3199   ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
3200   ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
3201   mpos = (char *) &enc[1];
3202 #if FIXME
3203   encrypt (key,
3204            &ppay,
3205            &mpos,
3206            sizeof (ppay));
3207   encrypt (key,
3208            &cb[1],
3209            &mpos,
3210            ntohs (cb->header.size) - sizeof (*cb));
3211   hmac (key,
3212         &enc->hmac);
3213 #endif
3214   route_message (&cb->pid,
3215                  &enc->header);
3216   GNUNET_SERVICE_client_continue (tc->client);
3217 }
3218
3219
3220 /**
3221  * Address of our peer added.  Test message is well-formed.
3222  *
3223  * @param cls the client
3224  * @param aam the send message that was sent
3225  * @return #GNUNET_OK if message is well-formed
3226  */
3227 static int
3228 check_add_address (void *cls,
3229                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
3230 {
3231   struct TransportClient *tc = cls;
3232
3233   if (CT_COMMUNICATOR != tc->type)
3234   {
3235     GNUNET_break (0);
3236     return GNUNET_SYSERR;
3237   }
3238   GNUNET_MQ_check_zero_termination (aam);
3239   return GNUNET_OK;
3240 }
3241
3242
3243 /**
3244  * Ask peerstore to store our address.
3245  *
3246  * @param cls an `struct AddressListEntry *`
3247  */
3248 static void
3249 store_pi (void *cls);
3250
3251
3252 /**
3253  * Function called when peerstore is done storing our address.
3254  *
3255  * @param cls a `struct AddressListEntry`
3256  * @param success #GNUNET_YES if peerstore was successful
3257  */
3258 static void
3259 peerstore_store_own_cb (void *cls,
3260                         int success)
3261 {
3262   struct AddressListEntry *ale = cls;
3263
3264   ale->sc = NULL;
3265   if (GNUNET_YES != success)
3266     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3267                 "Failed to store our own address `%s' in peerstore!\n",
3268                 ale->address);
3269   /* refresh period is 1/4 of expiration time, that should be plenty
3270      without being excessive. */
3271   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
3272                                                                        4ULL),
3273                                           &store_pi,
3274                                           ale);
3275 }
3276
3277
3278 /**
3279  * Ask peerstore to store our address.
3280  *
3281  * @param cls an `struct AddressListEntry *`
3282  */
3283 static void
3284 store_pi (void *cls)
3285 {
3286   struct AddressListEntry *ale = cls;
3287   void *addr;
3288   size_t addr_len;
3289   struct GNUNET_TIME_Absolute expiration;
3290
3291   ale->st = NULL;
3292   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
3293   GNUNET_HELLO_sign_address (ale->address,
3294                              ale->nt,
3295                              expiration,
3296                              GST_my_private_key,
3297                              &addr,
3298                              &addr_len);
3299   ale->sc = GNUNET_PEERSTORE_store (peerstore,
3300                                     "transport",
3301                                     &GST_my_identity,
3302                                     GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
3303                                     addr,
3304                                     addr_len,
3305                                     expiration,
3306                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
3307                                     &peerstore_store_own_cb,
3308                                     ale);
3309   GNUNET_free (addr);
3310   if (NULL == ale->sc)
3311   {
3312     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3313                 "Failed to store our address `%s' with peerstore\n",
3314                 ale->address);
3315     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
3316                                             &store_pi,
3317                                             ale);
3318   }
3319 }
3320
3321
3322 /**
3323  * Address of our peer added.  Process the request.
3324  *
3325  * @param cls the client
3326  * @param aam the send message that was sent
3327  */
3328 static void
3329 handle_add_address (void *cls,
3330                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
3331 {
3332   struct TransportClient *tc = cls;
3333   struct AddressListEntry *ale;
3334   size_t slen;
3335
3336   slen = ntohs (aam->header.size) - sizeof (*aam);
3337   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
3338   ale->tc = tc;
3339   ale->address = (const char *) &ale[1];
3340   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
3341   ale->aid = aam->aid;
3342   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
3343   memcpy (&ale[1],
3344           &aam[1],
3345           slen);
3346   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
3347                                tc->details.communicator.addr_tail,
3348                                ale);
3349   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
3350                                       ale);
3351   GNUNET_SERVICE_client_continue (tc->client);
3352 }
3353
3354
3355 /**
3356  * Address of our peer deleted.  Process the request.
3357  *
3358  * @param cls the client
3359  * @param dam the send message that was sent
3360  */
3361 static void
3362 handle_del_address (void *cls,
3363                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
3364 {
3365   struct TransportClient *tc = cls;
3366
3367   if (CT_COMMUNICATOR != tc->type)
3368   {
3369     GNUNET_break (0);
3370     GNUNET_SERVICE_client_drop (tc->client);
3371     return;
3372   }
3373   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
3374        NULL != ale;
3375        ale = ale->next)
3376   {
3377     if (dam->aid != ale->aid)
3378       continue;
3379     GNUNET_assert (ale->tc == tc);
3380     free_address_list_entry (ale);
3381     GNUNET_SERVICE_client_continue (tc->client);
3382   }
3383   GNUNET_break (0);
3384   GNUNET_SERVICE_client_drop (tc->client);
3385 }
3386
3387
3388 /**
3389  * Context from #handle_incoming_msg().  Closure for many
3390  * message handlers below.
3391  */
3392 struct CommunicatorMessageContext
3393 {
3394   /**
3395    * Which communicator provided us with the message.
3396    */
3397   struct TransportClient *tc;
3398
3399   /**
3400    * Additional information for flow control and about the sender.
3401    */
3402   struct GNUNET_TRANSPORT_IncomingMessage im;
3403
3404   /**
3405    * Number of hops the message has travelled (if DV-routed).
3406    * FIXME: make use of this in ACK handling!
3407    */
3408   uint16_t total_hops;
3409 };
3410
3411
3412 /**
3413  * Given an inbound message @a msg from a communicator @a cmc,
3414  * demultiplex it based on the type calling the right handler.
3415  *
3416  * @param cmc context for demultiplexing
3417  * @param msg message to demultiplex
3418  */
3419 static void
3420 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3421                       const struct GNUNET_MessageHeader *msg);
3422
3423
3424 /**
3425  * Send ACK to communicator (if requested) and free @a cmc.
3426  *
3427  * @param cmc context for which we are done handling the message
3428  */
3429 static void
3430 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
3431 {
3432   if (0 != ntohl (cmc->im.fc_on))
3433   {
3434     /* send ACK when done to communicator for flow control! */
3435     struct GNUNET_MQ_Envelope *env;
3436     struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
3437
3438     env = GNUNET_MQ_msg (ack,
3439                          GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
3440     ack->reserved = htonl (0);
3441     ack->fc_id = cmc->im.fc_id;
3442     ack->sender = cmc->im.sender;
3443     GNUNET_MQ_send (cmc->tc->mq,
3444                     env);
3445   }
3446   GNUNET_SERVICE_client_continue (cmc->tc->client);
3447   GNUNET_free (cmc);
3448 }
3449
3450
3451 /**
3452  * Communicator gave us an unencapsulated message to pass as-is to
3453  * CORE.  Process the request.
3454  *
3455  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3456  * @param mh the message that was received
3457  */
3458 static void
3459 handle_raw_message (void *cls,
3460                     const struct GNUNET_MessageHeader *mh)
3461 {
3462   struct CommunicatorMessageContext *cmc = cls;
3463   uint16_t size = ntohs (mh->size);
3464
3465   if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
3466        (size < sizeof (struct GNUNET_MessageHeader)) )
3467   {
3468     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3469
3470     GNUNET_break (0);
3471     finish_cmc_handling (cmc);
3472     GNUNET_SERVICE_client_drop (client);
3473     return;
3474   }
3475   /* Forward to all CORE clients */
3476   for (struct TransportClient *tc = clients_head;
3477        NULL != tc;
3478        tc = tc->next)
3479   {
3480     struct GNUNET_MQ_Envelope *env;
3481     struct InboundMessage *im;
3482
3483     if (CT_CORE != tc->type)
3484       continue;
3485     env = GNUNET_MQ_msg_extra (im,
3486                                size,
3487                                GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
3488     im->peer = cmc->im.sender;
3489     memcpy (&im[1],
3490             mh,
3491             size);
3492     GNUNET_MQ_send (tc->mq,
3493                     env);
3494   }
3495   /* FIXME: consider doing this _only_ once the message
3496      was drained from the CORE MQs to extend flow control to CORE!
3497      (basically, increment counter in cmc, decrement on MQ send continuation! */
3498   finish_cmc_handling (cmc);
3499 }
3500
3501
3502 /**
3503  * Communicator gave us a fragment box.  Check the message.
3504  *
3505  * @param cls a `struct CommunicatorMessageContext`
3506  * @param fb the send message that was sent
3507  * @return #GNUNET_YES if message is well-formed
3508  */
3509 static int
3510 check_fragment_box (void *cls,
3511                     const struct TransportFragmentBox *fb)
3512 {
3513   uint16_t size = ntohs (fb->header.size);
3514   uint16_t bsize = size - sizeof (*fb);
3515
3516   if (0 == bsize)
3517   {
3518     GNUNET_break_op (0);
3519     return GNUNET_SYSERR;
3520   }
3521   if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
3522   {
3523     GNUNET_break_op (0);
3524     return GNUNET_SYSERR;
3525   }
3526   if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
3527   {
3528     GNUNET_break_op (0);
3529     return GNUNET_SYSERR;
3530   }
3531   return GNUNET_YES;
3532 }
3533
3534
3535 /**
3536  * Generate a fragment acknowledgement for an @a rc.
3537  *
3538  * @param rc context to generate ACK for, @a rc ACK state is reset
3539  */
3540 static void
3541 send_fragment_ack (struct ReassemblyContext *rc)
3542 {
3543   struct TransportFragmentAckMessage *ack;
3544
3545   ack = GNUNET_new (struct TransportFragmentAckMessage);
3546   ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3547   ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3548   ack->frag_uuid = htonl (rc->frag_uuid);
3549   ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3550   ack->msg_uuid = rc->msg_uuid;
3551   ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3552   if (0 == rc->msg_missing)
3553     ack->reassembly_timeout
3554       = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3555   else
3556     ack->reassembly_timeout
3557       = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3558   route_message (&rc->neighbour->pid,
3559                  &ack->header);
3560   rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3561   rc->num_acks = 0;
3562   rc->extra_acks = 0LLU;
3563 }
3564
3565
3566 /**
3567  * Communicator gave us a fragment.  Process the request.
3568  *
3569  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3570  * @param fb the message that was received
3571  */
3572 static void
3573 handle_fragment_box (void *cls,
3574                      const struct TransportFragmentBox *fb)
3575 {
3576   struct CommunicatorMessageContext *cmc = cls;
3577   struct Neighbour *n;
3578   struct ReassemblyContext *rc;
3579   const struct GNUNET_MessageHeader *msg;
3580   uint16_t msize;
3581   uint16_t fsize;
3582   uint16_t frag_off;
3583   uint32_t frag_uuid;
3584   char *target;
3585   struct GNUNET_TIME_Relative cdelay;
3586   int ack_now;
3587
3588   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3589                                          &cmc->im.sender);
3590   if (NULL == n)
3591   {
3592     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3593
3594     GNUNET_break (0);
3595     finish_cmc_handling (cmc);
3596     GNUNET_SERVICE_client_drop (client);
3597     return;
3598   }
3599   if (NULL == n->reassembly_map)
3600   {
3601     n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
3602                                                                GNUNET_YES);
3603     n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3604     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
3605                                                                &reassembly_cleanup_task,
3606                                                                n);
3607   }
3608   msize = ntohs (fb->msg_size);
3609   rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
3610                                            &fb->msg_uuid);
3611   if (NULL == rc)
3612   {
3613     rc = GNUNET_malloc (sizeof (*rc) +
3614                         msize + /* reassembly payload buffer */
3615                         (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
3616     rc->msg_uuid = fb->msg_uuid;
3617     rc->neighbour = n;
3618     rc->msg_size = msize;
3619     rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
3620     rc->last_frag = GNUNET_TIME_absolute_get ();
3621     rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
3622                                            rc,
3623                                            rc->reassembly_timeout.abs_value_us);
3624     GNUNET_assert (GNUNET_OK ==
3625                    GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
3626                                                        &rc->msg_uuid,
3627                                                        rc,
3628                                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3629     target = (char *) &rc[1];
3630     rc->bitfield = (uint8_t *) (target + rc->msg_size);
3631     rc->msg_missing = rc->msg_size;
3632   }
3633   else
3634   {
3635     target = (char *) &rc[1];
3636   }
3637   if (msize != rc->msg_size)
3638   {
3639     GNUNET_break (0);
3640     finish_cmc_handling (cmc);
3641     return;
3642   }
3643
3644   /* reassemble */
3645   fsize = ntohs (fb->header.size) - sizeof (*fb);
3646   frag_off = ntohs (fb->frag_off);
3647   memcpy (&target[frag_off],
3648           &fb[1],
3649           fsize);
3650   /* update bitfield and msg_missing */
3651   for (unsigned int i=frag_off;i<frag_off+fsize;i++)
3652   {
3653     if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
3654     {
3655       rc->bitfield[i / 8] |= (1 << (i % 8));
3656       rc->msg_missing--;
3657     }
3658   }
3659
3660   /* Compute cummulative ACK */
3661   frag_uuid = ntohl (fb->frag_uuid);
3662   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
3663   cdelay = GNUNET_TIME_relative_multiply (cdelay,
3664                                           rc->num_acks);
3665   rc->last_frag = GNUNET_TIME_absolute_get ();
3666   rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
3667                                                 cdelay);
3668   ack_now = GNUNET_NO;
3669   if (0 == rc->num_acks)
3670   {
3671     /* case one: first ack */
3672     rc->frag_uuid = frag_uuid;
3673     rc->extra_acks = 0LLU;
3674     rc->num_acks = 1;
3675   }
3676   else if ( (frag_uuid >= rc->frag_uuid) &&
3677             (frag_uuid <= rc->frag_uuid + 64) )
3678   {
3679     /* case two: ack fits after existing min UUID */
3680     if ( (frag_uuid == rc->frag_uuid) ||
3681          (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
3682     {
3683       /* duplicate fragment, ack now! */
3684       ack_now = GNUNET_YES;
3685     }
3686     else
3687     {
3688       rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
3689       rc->num_acks++;
3690     }
3691   }
3692   else if ( (rc->frag_uuid > frag_uuid) &&
3693             ( ( (rc->frag_uuid == frag_uuid + 64) &&
3694                 (0 == rc->extra_acks) ) ||
3695               ( (rc->frag_uuid < frag_uuid + 64) &&
3696                 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
3697   {
3698     /* can fit ack by shifting extra acks and starting at
3699        frag_uid, test above esured that the bits we will
3700        shift 'extra_acks' by are all zero. */
3701     rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
3702     rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
3703     rc->frag_uuid = frag_uuid;
3704     rc->num_acks++;
3705   }
3706   if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
3707     ack_now = GNUNET_YES; /* maximum acks received */
3708   // FIXME: possibly also ACK based on RTT (but for that we'd need to
3709   // determine the queue used for the ACK first!)
3710
3711   /* is reassembly complete? */
3712   if (0 != rc->msg_missing)
3713   {
3714     if (ack_now)
3715       send_fragment_ack (rc);
3716     finish_cmc_handling (cmc);
3717     return;
3718   }
3719   /* reassembly is complete, verify result */
3720   msg = (const struct GNUNET_MessageHeader *) &rc[1];
3721   if (ntohs (msg->size) != rc->msg_size)
3722   {
3723     GNUNET_break (0);
3724     free_reassembly_context (rc);
3725     finish_cmc_handling (cmc);
3726     return;
3727   }
3728   /* successful reassembly */
3729   send_fragment_ack (rc);
3730   demultiplex_with_cmc (cmc,
3731                         msg);
3732   /* FIXME: really free here? Might be bad if fragments are still
3733      en-route and we forget that we finished this reassembly immediately!
3734      -> keep around until timeout?
3735      -> shorten timeout based on ACK? */
3736   free_reassembly_context (rc);
3737 }
3738
3739
3740 /**
3741  * Communicator gave us a fragment acknowledgement.  Process the request.
3742  *
3743  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3744  * @param fa the message that was received
3745  */
3746 static void
3747 handle_fragment_ack (void *cls,
3748                      const struct TransportFragmentAckMessage *fa)
3749 {
3750   struct CommunicatorMessageContext *cmc = cls;
3751
3752   // FIXME: do work: identify original message; then identify fragments being acked;
3753   // remove those from the tree to prevent retransmission;
3754   // compute RTT
3755   // if entire message is ACKed, handle that as well.
3756   finish_cmc_handling (cmc);
3757 }
3758
3759
3760 /**
3761  * Communicator gave us a reliability box.  Check the message.
3762  *
3763  * @param cls a `struct CommunicatorMessageContext`
3764  * @param rb the send message that was sent
3765  * @return #GNUNET_YES if message is well-formed
3766  */
3767 static int
3768 check_reliability_box (void *cls,
3769                        const struct TransportReliabilityBox *rb)
3770 {
3771   GNUNET_MQ_check_boxed_message (rb);
3772   return GNUNET_YES;
3773 }
3774
3775
3776 /**
3777  * Communicator gave us a reliability box.  Process the request.
3778  *
3779  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3780  * @param rb the message that was received
3781  */
3782 static void
3783 handle_reliability_box (void *cls,
3784                         const struct TransportReliabilityBox *rb)
3785 {
3786   struct CommunicatorMessageContext *cmc = cls;
3787   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
3788
3789   if (0 == ntohl (rb->ack_countdown))
3790   {
3791     struct TransportReliabilityAckMessage *ack;
3792
3793     /* FIXME: implement cummulative ACKs and ack_countdown,
3794        then setting the avg_ack_delay field below: */
3795     ack = GNUNET_malloc (sizeof (*ack) +
3796                          sizeof (struct GNUNET_ShortHashCode));
3797     ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
3798     ack->header.size = htons (sizeof (*ack) +
3799                               sizeof (struct GNUNET_ShortHashCode));
3800     memcpy (&ack[1],
3801             &rb->msg_uuid,
3802             sizeof (struct GNUNET_ShortHashCode));
3803     route_message (&cmc->im.sender,
3804                    &ack->header);
3805   }
3806   /* continue with inner message */
3807   demultiplex_with_cmc (cmc,
3808                         inbox);
3809 }
3810
3811
3812 /**
3813  * Communicator gave us a reliability ack.  Process the request.
3814  *
3815  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3816  * @param ra the message that was received
3817  */
3818 static void
3819 handle_reliability_ack (void *cls,
3820                         const struct TransportReliabilityAckMessage *ra)
3821 {
3822   struct CommunicatorMessageContext *cmc = cls;
3823
3824   // FIXME: do work: find message that was acknowledged, and
3825   // remove from transmission queue; update RTT.
3826   finish_cmc_handling (cmc);
3827 }
3828
3829
3830 /**
3831  * Communicator gave us a backchannel encapsulation.  Check the message.
3832  *
3833  * @param cls a `struct CommunicatorMessageContext`
3834  * @param be the send message that was sent
3835  * @return #GNUNET_YES if message is well-formed
3836  */
3837 static int
3838 check_backchannel_encapsulation (void *cls,
3839                                  const struct TransportBackchannelEncapsulationMessage *be)
3840 {
3841   uint16_t size = ntohs (be->header.size);
3842
3843   (void) cls;
3844   if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
3845   {
3846     GNUNET_break_op (0);
3847     return GNUNET_SYSERR;
3848   }
3849   return GNUNET_YES;
3850 }
3851
3852
3853 /**
3854  * Communicator gave us a backchannel encapsulation.  Process the request.
3855  *
3856  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3857  * @param be the message that was received
3858  */
3859 static void
3860 handle_backchannel_encapsulation (void *cls,
3861                                   const struct TransportBackchannelEncapsulationMessage *be)
3862 {
3863   struct CommunicatorMessageContext *cmc = cls;
3864
3865   if (0 != GNUNET_memcmp (&be->target,
3866                           &GST_my_identity))
3867   {
3868     /* not for me, try to route to target */
3869     route_message (&be->target,
3870                    GNUNET_copy_message (&be->header));
3871     finish_cmc_handling (cmc);
3872     return;
3873   }
3874   // FIXME: compute shared secret
3875   // FIXME: check HMAC
3876   // FIXME: decrypt payload
3877   // FIXME: forward to specified communicator!
3878   // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
3879   finish_cmc_handling (cmc);
3880 }
3881
3882
3883 /**
3884  * Task called when we should check if any of the DV paths
3885  * we have learned to a target are due for garbage collection.
3886  *
3887  * Collects stale paths, and possibly frees the entire DV
3888  * entry if no paths are left. Otherwise re-schedules itself.
3889  *
3890  * @param cls a `struct DistanceVector`
3891  */
3892 static void
3893 path_cleanup_cb (void *cls)
3894 {
3895   struct DistanceVector *dv = cls;
3896   struct DistanceVectorHop *pos;
3897
3898   dv->timeout_task = NULL;
3899   while (NULL != (pos = dv->dv_head))
3900   {
3901     GNUNET_assert (dv == pos->dv);
3902     if (GNUNET_TIME_absolute_get_remaining (pos->timeout).rel_value_us > 0)
3903       break;
3904     free_distance_vector_hop (pos);
3905   }
3906   if (NULL == pos)
3907   {
3908     free_dv_route (dv);
3909     return;
3910   }
3911   dv->timeout_task = GNUNET_SCHEDULER_add_at (pos->timeout,
3912                                               &path_cleanup_cb,
3913                                               dv);
3914 }
3915
3916
3917 /**
3918  * We have learned a @a path through the network to some other peer, add it to
3919  * our DV data structure (returning #GNUNET_YES on success).
3920  *
3921  * We do not add paths if we have a sufficient number of shorter
3922  * paths to this target already (returning #GNUNET_NO).
3923  *
3924  * We also do not add problematic paths, like those where we lack the first
3925  * hop in our neighbour list (i.e. due to a topology change) or where some
3926  * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
3927  *
3928  * @param path the path we learned, path[0] should be us,
3929  *             and then path contains a valid path from us to `path[path_len-1]`
3930  *             path[1] should be a direct neighbour (we should check!)
3931  * @param path_len number of entries on the @a path, at least three!
3932  * @param network_latency how long does the message take from us to `path[path_len-1]`?
3933  *          set to "forever" if unknown
3934  * @return #GNUNET_YES on success,
3935  *         #GNUNET_NO if we have better path(s) to the target
3936  *         #GNUNET_SYSERR if the path is useless and/or invalid
3937  *                         (i.e. path[1] not a direct neighbour
3938  *                        or path[i+1] is a direct neighbour for i>0)
3939  */
3940 static int
3941 learn_dv_path (const struct GNUNET_PeerIdentity *path,
3942                unsigned int path_len,
3943                struct GNUNET_TIME_Relative network_latency)
3944 {
3945   struct DistanceVectorHop *hop;
3946   struct DistanceVector *dv;
3947   struct Neighbour *next_hop;
3948   unsigned int shorter_distance;
3949
3950   if (path_len < 3)
3951   {
3952     /* what a boring path! not allowed! */
3953     GNUNET_break (0);
3954     return GNUNET_SYSERR;
3955   }
3956   GNUNET_assert (0 ==
3957                  GNUNET_memcmp (&GST_my_identity,
3958                                 &path[0]));
3959   next_hop = GNUNET_CONTAINER_multipeermap_get (neighbours,
3960                                                 &path[1]);
3961   if (NULL == next_hop)
3962   {
3963     /* next hop must be a neighbour, otherwise this whole thing is useless! */
3964     GNUNET_break (0);
3965     return GNUNET_SYSERR;
3966   }
3967   for (unsigned int i=2;i<path_len;i++)
3968     if (NULL !=
3969         GNUNET_CONTAINER_multipeermap_get (neighbours,
3970                                            &path[i]))
3971     {
3972       /* Useless path, we have a direct connection to some hop
3973          in the middle of the path, so this one doesn't even
3974          seem terribly useful for redundancy */
3975       return GNUNET_SYSERR;
3976     }
3977   dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
3978                                           &path[path_len - 1]);
3979   if (NULL == dv)
3980   {
3981     dv = GNUNET_new (struct DistanceVector);
3982     dv->target = path[path_len - 1];
3983     dv->timeout_task = GNUNET_SCHEDULER_add_delayed (DV_PATH_VALIDITY_TIMEOUT,
3984                                                      &path_cleanup_cb,
3985                                                      dv);
3986     GNUNET_assert (GNUNET_OK ==
3987                    GNUNET_CONTAINER_multipeermap_put (dv_routes,
3988                                                       &dv->target,
3989                                                       dv,
3990                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3991   }
3992   /* Check if we have this path already! */
3993   shorter_distance = 0;
3994   for (struct DistanceVectorHop *pos = dv->dv_head;
3995        NULL != pos;
3996        pos = pos->next_dv)
3997   {
3998     if (pos->distance < path_len - 2)
3999       shorter_distance++;
4000     /* Note that the distances in 'pos' excludes us (path[0]) and
4001        the next_hop (path[1]), so we need to subtract two
4002        and check next_hop explicitly */
4003     if ( (pos->distance == path_len - 2) &&
4004          (pos->next_hop == next_hop) )
4005     {
4006       int match = GNUNET_YES;
4007
4008       for (unsigned int i=0;i<pos->distance;i++)
4009       {
4010         if (0 !=
4011             GNUNET_memcmp (&pos->path[i],
4012                            &path[i+2]))
4013         {
4014           match = GNUNET_NO;
4015           break;
4016         }
4017       }
4018       if (GNUNET_YES == match)
4019       {
4020         struct GNUNET_TIME_Relative last_timeout;
4021
4022         /* Re-discovered known path, update timeout */
4023         GNUNET_STATISTICS_update (GST_stats,
4024                                   "# Known DV path refreshed",
4025                                   1,
4026                                   GNUNET_NO);
4027         last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
4028         pos->timeout
4029           = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
4030         GNUNET_CONTAINER_MDLL_remove (dv,
4031                                       dv->dv_head,
4032                                       dv->dv_tail,
4033                                       pos);
4034         GNUNET_CONTAINER_MDLL_insert (dv,
4035                                       dv->dv_head,
4036                                       dv->dv_tail,
4037                                       pos);
4038         if (last_timeout.rel_value_us <
4039             GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
4040                                            DV_PATH_DISCOVERY_FREQUENCY).rel_value_us)
4041         {
4042           /* Some peer send DV learn messages too often, we are learning
4043              the same path faster than it would be useful; do not forward! */
4044           return GNUNET_NO;
4045         }
4046         return GNUNET_YES;
4047       }
4048     }
4049   }
4050   /* Count how many shorter paths we have (incl. direct
4051      neighbours) before simply giving up on this one! */
4052   if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
4053   {
4054     /* We have a shorter path already! */
4055     return GNUNET_NO;
4056   }
4057   /* create new DV path entry */
4058   hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
4059                        sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
4060   hop->next_hop = next_hop;
4061   hop->dv = dv;
4062   hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
4063   memcpy (&hop[1],
4064           &path[2],
4065           sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
4066   hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
4067   hop->distance = path_len - 2;
4068   GNUNET_CONTAINER_MDLL_insert (dv,
4069                                 dv->dv_head,
4070                                 dv->dv_tail,
4071                                 hop);
4072   GNUNET_CONTAINER_MDLL_insert (neighbour,
4073                                 next_hop->dv_head,
4074                                 next_hop->dv_tail,
4075                                 hop);
4076   return GNUNET_YES;
4077 }
4078
4079
4080 /**
4081  * Communicator gave us a DV learn message.  Check the message.
4082  *
4083  * @param cls a `struct CommunicatorMessageContext`
4084  * @param dvl the send message that was sent
4085  * @return #GNUNET_YES if message is well-formed
4086  */
4087 static int
4088 check_dv_learn (void *cls,
4089                 const struct TransportDVLearn *dvl)
4090 {
4091   uint16_t size = ntohs (dvl->header.size);
4092   uint16_t num_hops = ntohs (dvl->num_hops);
4093   const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
4094
4095   (void) cls;
4096   if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
4097   {
4098     GNUNET_break_op (0);
4099     return GNUNET_SYSERR;
4100   }
4101   if (num_hops > MAX_DV_HOPS_ALLOWED)
4102   {
4103     GNUNET_break_op (0);
4104     return GNUNET_SYSERR;
4105   }
4106   for (unsigned int i=0;i<num_hops;i++)
4107   {
4108     if (0 == GNUNET_memcmp (&dvl->initiator,
4109                             &hops[i].hop))
4110     {
4111       GNUNET_break_op (0);
4112       return GNUNET_SYSERR;
4113     }
4114     if (0 == GNUNET_memcmp (&GST_my_identity,
4115                             &hops[i].hop))
4116     {
4117       GNUNET_break_op (0);
4118       return GNUNET_SYSERR;
4119     }
4120   }
4121   return GNUNET_YES;
4122 }
4123
4124
4125 /**
4126  * Build and forward a DV learn message to @a next_hop.
4127  *
4128  * @param next_hop peer to send the message to
4129  * @param msg message received
4130  * @param bi_history bitmask specifying hops on path that were bidirectional
4131  * @param nhops length of the @a hops array
4132  * @param hops path the message traversed so far
4133  * @param in_time when did we receive the message, used to calculate network delay
4134  */
4135 static void
4136 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
4137                   const struct TransportDVLearn *msg,
4138                   uint16_t bi_history,
4139                   uint16_t nhops,
4140                   const struct DVPathEntryP *hops,
4141                   struct GNUNET_TIME_Absolute in_time)
4142 {
4143   struct DVPathEntryP *dhops;
4144   struct TransportDVLearn *fwd;
4145   struct GNUNET_TIME_Relative nnd;
4146
4147   /* compute message for forwarding */
4148   GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
4149   fwd = GNUNET_malloc (sizeof (struct TransportDVLearn) +
4150                        (nhops + 1) * sizeof (struct DVPathEntryP));
4151   fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
4152   fwd->header.size = htons (sizeof (struct TransportDVLearn) +
4153                             (nhops + 1) * sizeof (struct DVPathEntryP));
4154   fwd->num_hops = htons (nhops + 1);
4155   fwd->bidirectional = htons (bi_history);
4156   nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
4157                                   GNUNET_TIME_relative_ntoh (msg->non_network_delay));
4158   fwd->non_network_delay = GNUNET_TIME_relative_hton (nnd);
4159   fwd->init_sig = msg->init_sig;
4160   fwd->initiator = msg->initiator;
4161   fwd->challenge = msg->challenge;
4162   dhops = (struct DVPathEntryP *) &fwd[1];
4163   GNUNET_memcpy (dhops,
4164                  hops,
4165                  sizeof (struct DVPathEntryP) * nhops);
4166   dhops[nhops].hop = GST_my_identity;
4167   {
4168     struct DvHopPS dhp = {
4169       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
4170       .purpose.size = htonl (sizeof (dhp)),
4171       .pred = dhops[nhops-1].hop,
4172       .succ = *next_hop,
4173       .challenge = msg->challenge
4174     };
4175
4176     GNUNET_assert (GNUNET_OK ==
4177                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
4178                                              &dhp.purpose,
4179                                              &dhops[nhops].hop_sig));
4180   }
4181   route_message (next_hop,
4182                  &fwd->header);
4183 }
4184
4185
4186 /**
4187  * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
4188  *
4189  * @param init the signer
4190  * @param challenge the challenge that was signed
4191  * @param init_sig signature presumably by @a init
4192  * @return #GNUNET_OK if the signature is valid
4193  */
4194 static int
4195 validate_dv_initiator_signature (const struct GNUNET_PeerIdentity *init,
4196                                  const struct GNUNET_ShortHashCode *challenge,
4197                                  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
4198 {
4199   struct DvInitPS ip = {
4200     .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
4201     .purpose.size = htonl (sizeof (ip)),
4202     .challenge = *challenge
4203   };
4204
4205   if (GNUNET_OK !=
4206       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
4207                                   &ip.purpose,
4208                                   init_sig,
4209                                   &init->public_key))
4210   {
4211     GNUNET_break_op (0);
4212     return GNUNET_SYSERR;
4213   }
4214   return GNUNET_OK;
4215 }
4216
4217
4218 /**
4219  * Communicator gave us a DV learn message.  Process the request.
4220  *
4221  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4222  * @param dvl the message that was received
4223  */
4224 static void
4225 handle_dv_learn (void *cls,
4226                  const struct TransportDVLearn *dvl)
4227 {
4228   struct CommunicatorMessageContext *cmc = cls;
4229   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
4230   int bi_hop;
4231   uint16_t nhops;
4232   uint16_t bi_history;
4233   const struct DVPathEntryP *hops;
4234   int do_fwd;
4235   int did_initiator;
4236   struct GNUNET_TIME_Absolute in_time;
4237
4238   nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
4239   bi_history = ntohs (dvl->bidirectional);
4240   hops = (const struct DVPathEntryP *) &dvl[1];
4241   if (0 == nhops)
4242   {
4243     /* sanity check */
4244     if (0 != GNUNET_memcmp (&dvl->initiator,
4245                             &cmc->im.sender))
4246     {
4247       GNUNET_break (0);
4248       finish_cmc_handling (cmc);
4249       return;
4250     }
4251   }
4252   else
4253   {
4254     /* sanity check */
4255     if (0 != GNUNET_memcmp (&hops[nhops - 1].hop,
4256                             &cmc->im.sender))
4257     {
4258       GNUNET_break (0);
4259       finish_cmc_handling (cmc);
4260       return;
4261     }
4262   }
4263
4264   GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
4265   cc = cmc->tc->details.communicator.cc;
4266   bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE == cc); // FIXME: add bi-directional flag to cc?
4267   in_time = GNUNET_TIME_absolute_get ();
4268
4269   /* continue communicator here, everything else can happen asynchronous! */
4270   finish_cmc_handling (cmc);
4271
4272   // FIXME: should we bother to verify _every_ DV initiator signature?
4273   if (GNUNET_OK !=
4274       validate_dv_initiator_signature (&dvl->initiator,
4275                                        &dvl->challenge,
4276                                        &dvl->init_sig))
4277   {
4278     GNUNET_break_op (0);
4279     return;
4280   }
4281   // FIXME: asynchronously (!) verify hop-by-hop signatures!
4282   // => if signature verification load too high, implement random drop strategy!
4283
4284   do_fwd = GNUNET_YES;
4285   if (0 == GNUNET_memcmp (&GST_my_identity,
4286                           &dvl->initiator))
4287   {
4288     struct GNUNET_PeerIdentity path[nhops + 1];
4289     struct GNUNET_TIME_Relative host_latency_sum;
4290     struct GNUNET_TIME_Relative latency;
4291     struct GNUNET_TIME_Relative network_latency;
4292
4293     /* We initiated this, learn the forward path! */
4294     path[0] = GST_my_identity;
4295     path[1] = hops[0].hop;
4296     host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
4297
4298     // Need also something to lookup initiation time
4299     // to compute RTT! -> add RTT argument here?
4300     latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
4301     // (based on dvl->challenge, we can identify time of origin!)
4302
4303     network_latency = GNUNET_TIME_relative_subtract (latency,
4304                                                      host_latency_sum);
4305     /* assumption: latency on all links is the same */
4306     network_latency = GNUNET_TIME_relative_divide (network_latency,
4307                                                    nhops);
4308
4309     for (unsigned int i=2;i<=nhops;i++)
4310     {
4311       struct GNUNET_TIME_Relative ilat;
4312
4313       /* assumption: linear latency increase per hop */
4314       ilat = GNUNET_TIME_relative_multiply (network_latency,
4315                                             i);
4316       path[i] = hops[i-1].hop;
4317       learn_dv_path (path,
4318                      i,
4319                      ilat);
4320     }
4321     /* as we initiated, do not forward again (would be circular!) */
4322     do_fwd = GNUNET_NO;
4323     return;
4324   }
4325   else if (bi_hop)
4326   {
4327     /* last hop was bi-directional, we could learn something here! */
4328     struct GNUNET_PeerIdentity path[nhops + 2];
4329
4330     path[0] = GST_my_identity;
4331     path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
4332     for (unsigned int i=0;i<nhops;i++)
4333     {
4334       int iret;
4335
4336       if (0 == (bi_history & (1 << i)))
4337         break; /* i-th hop not bi-directional, stop learning! */
4338       if (i == nhops)
4339       {
4340         path[i + 2] = dvl->initiator;
4341       }
4342       else
4343       {
4344         path[i + 2] = hops[nhops - i - 2].hop;
4345       }
4346
4347       iret = learn_dv_path (path,
4348                             i + 2,
4349                             GNUNET_TIME_UNIT_FOREVER_REL);
4350       if (GNUNET_SYSERR == iret)
4351       {
4352         /* path invalid or too long to be interesting for US, thus should also
4353            not be interesting to our neighbours, cut path when forwarding to
4354            'i' hops, except of course for the one that goes back to the
4355            initiator */
4356         GNUNET_STATISTICS_update (GST_stats,
4357                                   "# DV learn not forwarded due invalidity of path",
4358                                   1,
4359                                   GNUNET_NO);
4360         do_fwd = GNUNET_NO;
4361         break;
4362       }
4363       if ( (GNUNET_NO == iret) &&
4364            (nhops == i + 1) )
4365       {
4366         /* we have better paths, and this is the longest target,
4367            so there cannot be anything interesting later */
4368         GNUNET_STATISTICS_update (GST_stats,
4369                                   "# DV learn not forwarded, got better paths",
4370                                   1,
4371                                   GNUNET_NO);
4372         do_fwd = GNUNET_NO;
4373         break;
4374       }
4375     }
4376   }
4377
4378   if (MAX_DV_HOPS_ALLOWED == nhops)
4379   {
4380     /* At limit, we're out of here! */
4381     finish_cmc_handling (cmc);
4382     return;
4383   }
4384
4385   /* Forward to initiator, if path non-trivial and possible */
4386   bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
4387   did_initiator = GNUNET_NO;
4388   if ( (1 < nhops) &&
4389        (GNUNET_YES ==
4390         GNUNET_CONTAINER_multipeermap_contains (neighbours,
4391                                                 &dvl->initiator)) )
4392   {
4393     /* send back to origin! */
4394     forward_dv_learn (&dvl->initiator,
4395                       dvl,
4396                       bi_history,
4397                       nhops,
4398                       hops,
4399                       in_time);
4400     did_initiator = GNUNET_YES;
4401   }
4402   /* We forward under two conditions: either we still learned something
4403      ourselves (do_fwd), or the path was darn short and thus the initiator is
4404      likely to still be very interested in this (and we did NOT already
4405      send it back to the initiator) */
4406   if ( (do_fwd) ||
4407        ( (nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
4408          (GNUNET_NO == did_initiator) ) )
4409   {
4410     /* FIXME: loop over all neighbours, pick those with low
4411        queues AND that are not yet on the path; possibly
4412        adapt threshold to nhops! */
4413 #if FIXME
4414     forward_dv_learn (NULL, // fill in peer from iterator here!
4415                       dvl,
4416                       bi_history,
4417                       nhops,
4418                       hops,
4419                       in_time);
4420 #endif
4421   }
4422 }
4423
4424
4425 /**
4426  * Communicator gave us a DV box.  Check the message.
4427  *
4428  * @param cls a `struct CommunicatorMessageContext`
4429  * @param dvb the send message that was sent
4430  * @return #GNUNET_YES if message is well-formed
4431  */
4432 static int
4433 check_dv_box (void *cls,
4434               const struct TransportDVBox *dvb)
4435 {
4436   uint16_t size = ntohs (dvb->header.size);
4437   uint16_t num_hops = ntohs (dvb->num_hops);
4438   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
4439   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
4440   uint16_t isize;
4441   uint16_t itype;
4442
4443   (void) cls;
4444   if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
4445   {
4446     GNUNET_break_op (0);
4447     return GNUNET_SYSERR;
4448   }
4449   isize = ntohs (inbox->size);
4450   if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
4451   {
4452     GNUNET_break_op (0);
4453     return GNUNET_SYSERR;
4454   }
4455   itype = ntohs (inbox->type);
4456   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
4457        (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
4458   {
4459     GNUNET_break_op (0);
4460     return GNUNET_SYSERR;
4461   }
4462   return GNUNET_YES;
4463 }
4464
4465
4466 /**
4467  * Communicator gave us a DV box.  Process the request.
4468  *
4469  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4470  * @param dvb the message that was received
4471  */
4472 static void
4473 handle_dv_box (void *cls,
4474                const struct TransportDVBox *dvb)
4475 {
4476   struct CommunicatorMessageContext *cmc = cls;
4477   uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
4478   uint16_t num_hops = ntohs (dvb->num_hops);
4479   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
4480   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
4481
4482   if (num_hops > 0)
4483   {
4484     // FIXME: if we are not the target, shorten path and forward along.
4485     // Try from the _end_ of hops array if we know the given
4486     // neighbour (shortening the path!).
4487     // NOTE: increment total_hops!
4488     finish_cmc_handling (cmc);
4489     return;
4490   }
4491   /* We are the target. Unbox and handle message. */
4492   cmc->im.sender = dvb->origin;
4493   cmc->total_hops = ntohs (dvb->total_hops);
4494   demultiplex_with_cmc (cmc,
4495                         inbox);
4496 }
4497
4498
4499 /**
4500  * Client notified us about transmission from a peer.  Process the request.
4501  *
4502  * @param cls a `struct TransportClient` which sent us the message
4503  * @param obm the send message that was sent
4504  * @return #GNUNET_YES if message is well-formed
4505  */
4506 static int
4507 check_incoming_msg (void *cls,
4508                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
4509 {
4510   struct TransportClient *tc = cls;
4511
4512   if (CT_COMMUNICATOR != tc->type)
4513   {
4514     GNUNET_break (0);
4515     return GNUNET_SYSERR;
4516   }
4517   GNUNET_MQ_check_boxed_message (im);
4518   return GNUNET_OK;
4519 }
4520
4521
4522 /**
4523  * Communicator gave us a transport address validation challenge.  Process the request.
4524  *
4525  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4526  * @param tvc the message that was received
4527  */
4528 static void
4529 handle_validation_challenge (void *cls,
4530                              const struct TransportValidationChallenge *tvc)
4531 {
4532   struct CommunicatorMessageContext *cmc = cls;
4533   struct TransportValidationResponse *tvr;
4534
4535   if (cmc->total_hops > 0)
4536   {
4537     /* DV routing is not allowed for validation challenges! */
4538     GNUNET_break_op (0);
4539     finish_cmc_handling (cmc);
4540     return;
4541   }
4542   tvr = GNUNET_new (struct TransportValidationResponse);
4543   tvr->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
4544   tvr->header.size = htons (sizeof (*tvr));
4545   tvr->challenge = tvc->challenge;
4546   tvr->origin_time = tvc->sender_time;
4547   tvr->validity_duration = cmc->im.expected_address_validity;
4548   {
4549     /* create signature */
4550     struct TransportValidationPS tvp = {
4551       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
4552       .purpose.size = htonl (sizeof (tvp)),
4553       .validity_duration = tvr->validity_duration,
4554       .challenge = tvc->challenge
4555     };
4556
4557     GNUNET_assert (GNUNET_OK ==
4558                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
4559                                              &tvp.purpose,
4560                                              &tvr->signature));
4561   }
4562   route_message (&cmc->im.sender,
4563                  &tvr->header);
4564   finish_cmc_handling (cmc);
4565 }
4566
4567
4568 /**
4569  * Closure for #check_known_challenge.
4570  */
4571 struct CheckKnownChallengeContext
4572 {
4573   /**
4574    * Set to the challenge we are looking for.
4575    */
4576   const struct GNUNET_ShortHashCode *challenge;
4577
4578   /**
4579    * Set to a matching validation state, if one was found.
4580    */
4581   struct ValidationState *vs;
4582 };
4583
4584
4585 /**
4586  * Test if the validation state in @a value matches the
4587  * challenge from @a cls.
4588  *
4589  * @param cls a `struct CheckKnownChallengeContext`
4590  * @param pid unused (must match though)
4591  * @param value a `struct ValidationState`
4592  * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
4593  */
4594 static int
4595 check_known_challenge (void *cls,
4596                        const struct GNUNET_PeerIdentity *pid,
4597                        void *value)
4598 {
4599   struct CheckKnownChallengeContext *ckac = cls;
4600   struct ValidationState *vs = value;
4601
4602   (void) pid;
4603   if (0 != GNUNET_memcmp (&vs->challenge,
4604                           ckac->challenge))
4605     return GNUNET_OK;
4606   ckac->vs = vs;
4607   return GNUNET_NO;
4608 }
4609
4610
4611 /**
4612  * Function called when peerstore is done storing a
4613  * validated address.
4614  *
4615  * @param cls a `struct ValidationState`
4616  * @param success #GNUNET_YES on success
4617  */
4618 static void
4619 peerstore_store_validation_cb (void *cls,
4620                                int success)
4621 {
4622   struct ValidationState *vs = cls;
4623
4624   vs->sc = NULL;
4625   if (GNUNET_YES == success)
4626     return;
4627   GNUNET_STATISTICS_update (GST_stats,
4628                             "# Peerstore failed to store foreign address",
4629                             1,
4630                             GNUNET_NO);
4631 }
4632
4633
4634 /**
4635  * Task run periodically to validate some address based on #validation_heap.
4636  *
4637  * @param cls NULL
4638  */
4639 static void
4640 validation_start_cb (void *cls);
4641
4642
4643 /**
4644  * Set the time for next_challenge of @a vs to @a new_time.
4645  * Updates the heap and if necessary reschedules the job.
4646  *
4647  * @param vs validation state to update
4648  * @param new_time new time for revalidation
4649  */
4650 static void
4651 update_next_challenge_time (struct ValidationState *vs,
4652                             struct GNUNET_TIME_Absolute new_time)
4653 {
4654   struct GNUNET_TIME_Relative delta;
4655
4656   if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
4657     return; /* be lazy */
4658   vs->next_challenge = new_time;
4659   if (NULL == vs->hn)
4660     vs->hn = GNUNET_CONTAINER_heap_insert (validation_heap,
4661                                            vs,
4662                                            new_time.abs_value_us);
4663   else
4664     GNUNET_CONTAINER_heap_update_cost (vs->hn,
4665                                        new_time.abs_value_us);
4666   if ( (vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
4667        (NULL != validation_task) )
4668     return;
4669   if (NULL != validation_task)
4670     GNUNET_SCHEDULER_cancel (validation_task);
4671   /* randomize a bit */
4672   delta.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
4673                                                  MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
4674   new_time = GNUNET_TIME_absolute_add (new_time,
4675                                        delta);
4676   validation_task = GNUNET_SCHEDULER_add_at (new_time,
4677                                              &validation_start_cb,
4678                                              NULL);
4679 }
4680
4681
4682 /**
4683  * Communicator gave us a transport address validation response.  Process the request.
4684  *
4685  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4686  * @param tvr the message that was received
4687  */
4688 static void
4689 handle_validation_response (void *cls,
4690                             const struct TransportValidationResponse *tvr)
4691 {
4692   struct CommunicatorMessageContext *cmc = cls;
4693   struct ValidationState *vs;
4694   struct CheckKnownChallengeContext ckac = {
4695     .challenge = &tvr->challenge,
4696     .vs = NULL
4697   };
4698   struct GNUNET_TIME_Absolute origin_time;
4699
4700   /* check this is one of our challenges */
4701   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
4702                                                      &cmc->im.sender,
4703                                                      &check_known_challenge,
4704                                                      &ckac);
4705   if (NULL == (vs = ckac.vs))
4706   {
4707     /* This can happen simply if we 'forgot' the challenge by now,
4708        i.e. because we received the validation response twice */
4709     GNUNET_STATISTICS_update (GST_stats,
4710                               "# Validations dropped, challenge unknown",
4711                               1,
4712                               GNUNET_NO);
4713     finish_cmc_handling (cmc);
4714     return;
4715   }
4716
4717   /* sanity check on origin time */
4718   origin_time = GNUNET_TIME_absolute_ntoh (tvr->origin_time);
4719   if ( (origin_time.abs_value_us < vs->first_challenge_use.abs_value_us) ||
4720        (origin_time.abs_value_us > vs->last_challenge_use.abs_value_us) )
4721   {
4722     GNUNET_break_op (0);
4723     finish_cmc_handling (cmc);
4724     return;
4725   }
4726
4727   {
4728     /* check signature */
4729     struct TransportValidationPS tvp = {
4730       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
4731       .purpose.size = htonl (sizeof (tvp)),
4732       .validity_duration = tvr->validity_duration,
4733       .challenge = tvr->challenge
4734     };
4735
4736     if (GNUNET_OK !=
4737         GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE,
4738                                     &tvp.purpose,
4739                                     &tvr->signature,
4740                                     &cmc->im.sender.public_key))
4741     {
4742       GNUNET_break_op (0);
4743       finish_cmc_handling (cmc);
4744       return;
4745     }
4746   }
4747
4748   /* validity is capped by our willingness to keep track of the
4749      validation entry and the maximum the other peer allows */
4750   vs->valid_until
4751     = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_min (GNUNET_TIME_relative_ntoh (tvr->validity_duration),
4752                                                                   MAX_ADDRESS_VALID_UNTIL));
4753   vs->validated_until
4754     = GNUNET_TIME_absolute_min (vs->valid_until,
4755                                 GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME));
4756   vs->validation_rtt = GNUNET_TIME_absolute_get_duration (origin_time);
4757   vs->challenge_backoff = GNUNET_TIME_UNIT_ZERO;
4758   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
4759                               &vs->challenge,
4760                               sizeof (vs->challenge));
4761   vs->first_challenge_use = GNUNET_TIME_absolute_subtract (vs->validated_until,
4762                                                            GNUNET_TIME_relative_multiply (vs->validation_rtt,
4763                                                                                           VALIDATION_RTT_BUFFER_FACTOR));
4764   vs->last_challenge_use = GNUNET_TIME_UNIT_ZERO_ABS; /* challenge was not yet used */
4765   update_next_challenge_time (vs,
4766                               vs->first_challenge_use);
4767   vs->sc = GNUNET_PEERSTORE_store (peerstore,
4768                                    "transport",
4769                                    &cmc->im.sender,
4770                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
4771                                    vs->address,
4772                                    strlen (vs->address) + 1,
4773                                    vs->valid_until,
4774                                    GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
4775                                    &peerstore_store_validation_cb,
4776                                    vs);
4777   // FIXME: should we find the matching queue and update the RTT?
4778   finish_cmc_handling (cmc);
4779 }
4780
4781
4782 /**
4783  * Incoming meessage.  Process the request.
4784  *
4785  * @param im the send message that was received
4786  */
4787 static void
4788 handle_incoming_msg (void *cls,
4789                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
4790 {
4791   struct TransportClient *tc = cls;
4792   struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
4793
4794   cmc->tc = tc;
4795   cmc->im = *im;
4796   demultiplex_with_cmc (cmc,
4797                         (const struct GNUNET_MessageHeader *) &im[1]);
4798 }
4799
4800
4801 /**
4802  * Given an inbound message @a msg from a communicator @a cmc,
4803  * demultiplex it based on the type calling the right handler.
4804  *
4805  * @param cmc context for demultiplexing
4806  * @param msg message to demultiplex
4807  */
4808 static void
4809 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
4810                       const struct GNUNET_MessageHeader *msg)
4811 {
4812   struct GNUNET_MQ_MessageHandler handlers[] = {
4813     GNUNET_MQ_hd_var_size (fragment_box,
4814                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
4815                            struct TransportFragmentBox,
4816                            &cmc),
4817     GNUNET_MQ_hd_fixed_size (fragment_ack,
4818                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
4819                              struct TransportFragmentAckMessage,
4820                              &cmc),
4821     GNUNET_MQ_hd_var_size (reliability_box,
4822                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
4823                            struct TransportReliabilityBox,
4824                            &cmc),
4825     GNUNET_MQ_hd_fixed_size (reliability_ack,
4826                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
4827                              struct TransportReliabilityAckMessage,
4828                              &cmc),
4829     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
4830                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
4831                            struct TransportBackchannelEncapsulationMessage,
4832                            &cmc),
4833     GNUNET_MQ_hd_var_size (dv_learn,
4834                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
4835                            struct TransportDVLearn,
4836                            &cmc),
4837     GNUNET_MQ_hd_var_size (dv_box,
4838                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
4839                            struct TransportDVBox,
4840                            &cmc),
4841     GNUNET_MQ_hd_fixed_size (validation_challenge,
4842                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
4843                              struct TransportValidationChallenge,
4844                              &cmc),
4845     GNUNET_MQ_hd_fixed_size (validation_response,
4846                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
4847                              struct TransportValidationResponse,
4848                              &cmc),
4849     GNUNET_MQ_handler_end()
4850   };
4851   int ret;
4852
4853   ret = GNUNET_MQ_handle_message (handlers,
4854                                   msg);
4855   if (GNUNET_SYSERR == ret)
4856   {
4857     GNUNET_break (0);
4858     GNUNET_SERVICE_client_drop (cmc->tc->client);
4859     GNUNET_free (cmc);
4860     return;
4861   }
4862   if (GNUNET_NO == ret)
4863   {
4864     /* unencapsulated 'raw' message */
4865     handle_raw_message (&cmc,
4866                         msg);
4867   }
4868 }
4869
4870
4871 /**
4872  * New queue became available.  Check message.
4873  *
4874  * @param cls the client
4875  * @param aqm the send message that was sent
4876  */
4877 static int
4878 check_add_queue_message (void *cls,
4879                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
4880 {
4881   struct TransportClient *tc = cls;
4882
4883   if (CT_COMMUNICATOR != tc->type)
4884   {
4885     GNUNET_break (0);
4886     return GNUNET_SYSERR;
4887   }
4888   GNUNET_MQ_check_zero_termination (aqm);
4889   return GNUNET_OK;
4890 }
4891
4892
4893 /**
4894  * Bandwidth tracker informs us that the delay until we should receive
4895  * more has changed.
4896  *
4897  * @param cls a `struct Queue` for which the delay changed
4898  */
4899 static void
4900 tracker_update_in_cb (void *cls)
4901 {
4902   struct Queue *queue = cls;
4903   struct GNUNET_TIME_Relative in_delay;
4904   unsigned int rsize;
4905
4906   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
4907   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
4908                                                  rsize);
4909   // FIXME: how exactly do we do inbound flow control?
4910 }
4911
4912
4913 /**
4914  * If necessary, generates the UUID for a @a pm
4915  *
4916  * @param pm pending message to generate UUID for.
4917  */
4918 static void
4919 set_pending_message_uuid (struct PendingMessage *pm)
4920 {
4921   if (pm->msg_uuid_set)
4922     return;
4923   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
4924                               &pm->msg_uuid,
4925                               sizeof (pm->msg_uuid));
4926   pm->msg_uuid_set = GNUNET_YES;
4927 }
4928
4929
4930 /**
4931  * Fragment the given @a pm to the given @a mtu.  Adds
4932  * additional fragments to the neighbour as well. If the
4933  * @a mtu is too small, generates and error for the @a pm
4934  * and returns NULL.
4935  *
4936  * @param pm pending message to fragment for transmission
4937  * @param mtu MTU to apply
4938  * @return new message to transmit
4939  */
4940 static struct PendingMessage *
4941 fragment_message (struct PendingMessage *pm,
4942                   uint16_t mtu)
4943 {
4944   struct PendingMessage *ff;
4945
4946   set_pending_message_uuid (pm);
4947
4948   /* This invariant is established in #handle_add_queue_message() */
4949   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
4950
4951   /* select fragment for transmission, descending the tree if it has
4952      been expanded until we are at a leaf or at a fragment that is small enough */
4953   ff = pm;
4954   while ( ( (ff->bytes_msg > mtu) ||
4955             (pm == ff) ) &&
4956           (ff->frag_off == ff->bytes_msg) &&
4957           (NULL != ff->head_frag) )
4958   {
4959     ff = ff->head_frag; /* descent into fragmented fragments */
4960   }
4961
4962   if ( ( (ff->bytes_msg > mtu) ||
4963          (pm == ff) ) &&
4964        (pm->frag_off < pm->bytes_msg) )
4965   {
4966     /* Did not yet calculate all fragments, calculate next fragment */
4967     struct PendingMessage *frag;
4968     struct TransportFragmentBox tfb;
4969     const char *orig;
4970     char *msg;
4971     uint16_t fragmax;
4972     uint16_t fragsize;
4973     uint16_t msize;
4974     uint16_t xoff = 0;
4975
4976     orig = (const char *) &ff[1];
4977     msize = ff->bytes_msg;
4978     if (pm != ff)
4979     {
4980       const struct TransportFragmentBox *tfbo;
4981
4982       tfbo = (const struct TransportFragmentBox *) orig;
4983       orig += sizeof (struct TransportFragmentBox);
4984       msize -= sizeof (struct TransportFragmentBox);
4985       xoff = ntohs (tfbo->frag_off);
4986     }
4987     fragmax = mtu - sizeof (struct TransportFragmentBox);
4988     fragsize = GNUNET_MIN (msize - ff->frag_off,
4989                            fragmax);
4990     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
4991                           sizeof (struct TransportFragmentBox) +
4992                           fragsize);
4993     frag->target = pm->target;
4994     frag->frag_parent = ff;
4995     frag->timeout = pm->timeout;
4996     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
4997     frag->pmt = PMT_FRAGMENT_BOX;
4998     msg = (char *) &frag[1];
4999     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
5000     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
5001                              fragsize);
5002     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
5003     tfb.msg_uuid = pm->msg_uuid;
5004     tfb.frag_off = htons (ff->frag_off + xoff);
5005     tfb.msg_size = htons (pm->bytes_msg);
5006     memcpy (msg,
5007             &tfb,
5008             sizeof (tfb));
5009     memcpy (&msg[sizeof (tfb)],
5010             &orig[ff->frag_off],
5011             fragsize);
5012     GNUNET_CONTAINER_MDLL_insert (frag,
5013                                   ff->head_frag,
5014                                   ff->tail_frag,
5015                                   frag);
5016     ff->frag_off += fragsize;
5017     ff = frag;
5018   }
5019
5020   /* Move head to the tail and return it */
5021   GNUNET_CONTAINER_MDLL_remove (frag,
5022                                 ff->frag_parent->head_frag,
5023                                 ff->frag_parent->tail_frag,
5024                                 ff);
5025   GNUNET_CONTAINER_MDLL_insert_tail (frag,
5026                                      ff->frag_parent->head_frag,
5027                                      ff->frag_parent->tail_frag,
5028                                      ff);
5029   return ff;
5030 }
5031
5032
5033 /**
5034  * Reliability-box the given @a pm. On error (can there be any), NULL
5035  * may be returned, otherwise the "replacement" for @a pm (which
5036  * should then be added to the respective neighbour's queue instead of
5037  * @a pm).  If the @a pm is already fragmented or reliability boxed,
5038  * or itself an ACK, this function simply returns @a pm.
5039  *
5040  * @param pm pending message to box for transmission over unreliabile queue
5041  * @return new message to transmit
5042  */
5043 static struct PendingMessage *
5044 reliability_box_message (struct PendingMessage *pm)
5045 {
5046   struct TransportReliabilityBox rbox;
5047   struct PendingMessage *bpm;
5048   char *msg;
5049
5050   if (PMT_CORE != pm->pmt)
5051     return pm;  /* already fragmented or reliability boxed, or control message: do nothing */
5052   if (NULL != pm->bpm)
5053     return pm->bpm; /* already computed earlier: do nothing */
5054   GNUNET_assert (NULL == pm->head_frag);
5055   if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
5056   {
5057     /* failed hard */
5058     GNUNET_break (0);
5059     client_send_response (pm,
5060                           GNUNET_NO,
5061                           0);
5062     return NULL;
5063   }
5064   bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
5065                        sizeof (rbox) +
5066                        pm->bytes_msg);
5067   bpm->target = pm->target;
5068   bpm->frag_parent = pm;
5069   GNUNET_CONTAINER_MDLL_insert (frag,
5070                                 pm->head_frag,
5071                                 pm->tail_frag,
5072                                 bpm);
5073   bpm->timeout = pm->timeout;
5074   bpm->pmt = PMT_RELIABILITY_BOX;
5075   bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
5076   set_pending_message_uuid (bpm);
5077   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
5078   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
5079   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
5080   rbox.msg_uuid = pm->msg_uuid;
5081   msg = (char *) &bpm[1];
5082   memcpy (msg,
5083           &rbox,
5084           sizeof (rbox));
5085   memcpy (&msg[sizeof (rbox)],
5086           &pm[1],
5087           pm->bytes_msg);
5088   pm->bpm = bpm;
5089   return bpm;
5090 }
5091
5092
5093 /**
5094  * We believe we are ready to transmit a message on a queue. Double-checks
5095  * with the queue's "tracker_out" and then gives the message to the
5096  * communicator for transmission (updating the tracker, and re-scheduling
5097  * itself if applicable).
5098  *
5099  * @param cls the `struct Queue` to process transmissions for
5100  */
5101 static void
5102 transmit_on_queue (void *cls)
5103 {
5104   struct Queue *queue = cls;
5105   struct Neighbour *n = queue->neighbour;
5106   struct PendingMessage *pm;
5107   struct PendingMessage *s;
5108   uint32_t overhead;
5109   struct GNUNET_TRANSPORT_SendMessageTo *smt;
5110   struct GNUNET_MQ_Envelope *env;
5111
5112   queue->transmit_task = NULL;
5113   if (NULL == (pm = n->pending_msg_head))
5114   {
5115     /* no message pending, nothing to do here! */
5116     return;
5117   }
5118   schedule_transmit_on_queue (queue);
5119   if (NULL != queue->transmit_task)
5120     return; /* do it later */
5121   overhead = 0;
5122   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
5123     overhead += sizeof (struct TransportReliabilityBox);
5124   s = pm;
5125   if ( ( (0 != queue->mtu) &&
5126          (pm->bytes_msg + overhead > queue->mtu) ) ||
5127        (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
5128        (NULL != pm->head_frag /* fragments already exist, should
5129                                  respect that even if MTU is 0 for
5130                                  this queue */) )
5131     s = fragment_message (s,
5132                           (0 == queue->mtu)
5133                           ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
5134                           : queue->mtu);
5135   if (NULL == s)
5136   {
5137     /* Fragmentation failed, try next message... */
5138     schedule_transmit_on_queue (queue);
5139     return;
5140   }
5141   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
5142     s = reliability_box_message (s);
5143   if (NULL == s)
5144   {
5145     /* Reliability boxing failed, try next message... */
5146     schedule_transmit_on_queue (queue);
5147     return;
5148   }
5149
5150   /* Pass 's' for transission to the communicator */
5151   env = GNUNET_MQ_msg_extra (smt,
5152                              s->bytes_msg,
5153                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
5154   smt->qid = queue->qid;
5155   smt->mid = queue->mid_gen;
5156   smt->receiver = n->pid;
5157   memcpy (&smt[1],
5158           &s[1],
5159           s->bytes_msg);
5160   {
5161     /* Pass the env to the communicator of queue for transmission. */
5162     struct QueueEntry *qe;
5163
5164     qe = GNUNET_new (struct QueueEntry);
5165     qe->mid = queue->mid_gen++;
5166     qe->queue = queue;
5167     // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
5168     GNUNET_CONTAINER_DLL_insert (queue->queue_head,
5169                                  queue->queue_tail,
5170                                  qe);
5171     GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
5172     queue->queue_length++;
5173     queue->tc->details.communicator.total_queue_length++;
5174     GNUNET_MQ_send (queue->tc->mq,
5175                     env);
5176   }
5177
5178   // FIXME: do something similar to the logic below
5179   // in defragmentation / reliability ACK handling!
5180
5181   /* Check if this transmission somehow conclusively finished handing 'pm'
5182      even without any explicit ACKs */
5183   if ( (PMT_CORE == s->pmt) &&
5184        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
5185   {
5186     /* Full message sent, and over reliabile channel */
5187     client_send_response (pm,
5188                           GNUNET_YES,
5189                           pm->bytes_msg);
5190   }
5191   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
5192             (PMT_FRAGMENT_BOX == s->pmt) )
5193   {
5194     struct PendingMessage *pos;
5195
5196     /* Fragment sent over reliabile channel */
5197     free_fragment_tree (s);
5198     pos = s->frag_parent;
5199     GNUNET_CONTAINER_MDLL_remove (frag,
5200                                   pos->head_frag,
5201                                   pos->tail_frag,
5202                                   s);
5203     GNUNET_free (s);
5204     /* check if subtree is done */
5205     while ( (NULL == pos->head_frag) &&
5206             (pos->frag_off == pos->bytes_msg) &&
5207             (pos != pm) )
5208     {
5209       s = pos;
5210       pos = s->frag_parent;
5211       GNUNET_CONTAINER_MDLL_remove (frag,
5212                                     pos->head_frag,
5213                                     pos->tail_frag,
5214                                     s);
5215       GNUNET_free (s);
5216     }
5217
5218     /* Was this the last applicable fragmment? */
5219     if ( (NULL == pm->head_frag) &&
5220          (pm->frag_off == pm->bytes_msg) )
5221       client_send_response (pm,
5222                             GNUNET_YES,
5223                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
5224   }
5225   else if (PMT_CORE != pm->pmt)
5226   {
5227     /* This was an acknowledgement of some type, always free */
5228     free_pending_message (pm);
5229   }
5230   else
5231   {
5232     /* message not finished, waiting for acknowledgement */
5233     struct Neighbour *neighbour = pm->target;
5234     /* Update time by which we might retransmit 's' based on queue
5235        characteristics (i.e. RTT); it takes one RTT for the message to
5236        arrive and the ACK to come back in the best case; but the other
5237        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
5238        retransmitting.  Note that in the future this heuristic should
5239        likely be improved further (measure RTT stability, consider
5240        message urgency and size when delaying ACKs, etc.) */
5241     s->next_attempt = GNUNET_TIME_relative_to_absolute
5242       (GNUNET_TIME_relative_multiply (queue->rtt,
5243                                       4));
5244     if (s == pm)
5245     {
5246       struct PendingMessage *pos;
5247
5248       /* re-insert sort in neighbour list */
5249       GNUNET_CONTAINER_MDLL_remove (neighbour,
5250                                     neighbour->pending_msg_head,
5251                                     neighbour->pending_msg_tail,
5252                                     pm);
5253       pos = neighbour->pending_msg_tail;
5254       while ( (NULL != pos) &&
5255               (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
5256         pos = pos->prev_neighbour;
5257       GNUNET_CONTAINER_MDLL_insert_after (neighbour,
5258                                           neighbour->pending_msg_head,
5259                                           neighbour->pending_msg_tail,
5260                                           pos,
5261                                           pm);
5262     }
5263     else
5264     {
5265       /* re-insert sort in fragment list */
5266       struct PendingMessage *fp = s->frag_parent;
5267       struct PendingMessage *pos;
5268
5269       GNUNET_CONTAINER_MDLL_remove (frag,
5270                                     fp->head_frag,
5271                                     fp->tail_frag,
5272                                     s);
5273       pos = fp->tail_frag;
5274       while ( (NULL != pos) &&
5275               (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
5276         pos = pos->prev_frag;
5277       GNUNET_CONTAINER_MDLL_insert_after (frag,
5278                                           fp->head_frag,
5279                                           fp->tail_frag,
5280                                           pos,
5281                                           s);
5282     }
5283   }
5284
5285   /* finally, re-schedule queue transmission task itself */
5286   schedule_transmit_on_queue (queue);
5287 }
5288
5289
5290 /**
5291  * Bandwidth tracker informs us that the delay until we
5292  * can transmit again changed.
5293  *
5294  * @param cls a `struct Queue` for which the delay changed
5295  */
5296 static void
5297 tracker_update_out_cb (void *cls)
5298 {
5299   struct Queue *queue = cls;
5300   struct Neighbour *n = queue->neighbour;
5301
5302   if (NULL == n->pending_msg_head)
5303   {
5304     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5305                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
5306                 queue->address);
5307     return; /* no message pending, nothing to do here! */
5308   }
5309   GNUNET_SCHEDULER_cancel (queue->transmit_task);
5310   queue->transmit_task = NULL;
5311   schedule_transmit_on_queue (queue);
5312 }
5313
5314
5315 /**
5316  * Bandwidth tracker informs us that excessive outbound bandwidth was
5317  * allocated which is not being used.
5318  *
5319  * @param cls a `struct Queue` for which the excess was noted
5320  */
5321 static void
5322 tracker_excess_out_cb (void *cls)
5323 {
5324   (void) cls;
5325     
5326   /* FIXME: trigger excess bandwidth report to core? Right now,
5327      this is done internally within transport_api2_core already,
5328      but we probably want to change the logic and trigger it
5329      from here via a message instead! */
5330   /* TODO: maybe inform someone at this point? */
5331   GNUNET_STATISTICS_update (GST_stats,
5332                             "# Excess outbound bandwidth reported",
5333                             1,
5334                             GNUNET_NO);
5335 }
5336
5337
5338
5339 /**
5340  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
5341  * which is not being used.
5342  *
5343  * @param cls a `struct Queue` for which the excess was noted
5344  */
5345 static void
5346 tracker_excess_in_cb (void *cls)
5347 {
5348   (void) cls;
5349
5350   /* TODO: maybe inform somone at this point? */
5351   GNUNET_STATISTICS_update (GST_stats,
5352                             "# Excess inbound bandwidth reported",
5353                             1,
5354                             GNUNET_NO);
5355 }
5356
5357
5358 /**
5359  * Queue to a peer went down.  Process the request.
5360  *
5361  * @param cls the client
5362  * @param dqm the send message that was sent
5363  */
5364 static void
5365 handle_del_queue_message (void *cls,
5366                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
5367 {
5368   struct TransportClient *tc = cls;
5369
5370   if (CT_COMMUNICATOR != tc->type)
5371   {
5372     GNUNET_break (0);
5373     GNUNET_SERVICE_client_drop (tc->client);
5374     return;
5375   }
5376   for (struct Queue *queue = tc->details.communicator.queue_head;
5377        NULL != queue;
5378        queue = queue->next_client)
5379   {
5380     struct Neighbour *neighbour = queue->neighbour;
5381
5382     if ( (dqm->qid != queue->qid) ||
5383          (0 != GNUNET_memcmp (&dqm->receiver,
5384                               &neighbour->pid)) )
5385       continue;
5386     free_queue (queue);
5387     GNUNET_SERVICE_client_continue (tc->client);
5388     return;
5389   }
5390   GNUNET_break (0);
5391   GNUNET_SERVICE_client_drop (tc->client);
5392 }
5393
5394
5395 /**
5396  * Message was transmitted.  Process the request.
5397  *
5398  * @param cls the client
5399  * @param sma the send message that was sent
5400  */
5401 static void
5402 handle_send_message_ack (void *cls,
5403                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
5404 {
5405   struct TransportClient *tc = cls;
5406   struct QueueEntry *qe;
5407
5408   if (CT_COMMUNICATOR != tc->type)
5409   {
5410     GNUNET_break (0);
5411     GNUNET_SERVICE_client_drop (tc->client);
5412     return;
5413   }
5414
5415   /* find our queue entry matching the ACK */
5416   qe = NULL;
5417   for (struct Queue *queue = tc->details.communicator.queue_head;
5418        NULL != queue;
5419        queue = queue->next_client)
5420   {
5421     if (0 != GNUNET_memcmp (&queue->neighbour->pid,
5422                             &sma->receiver))
5423       continue;
5424     for (struct QueueEntry *qep = queue->queue_head;
5425          NULL != qep;
5426          qep = qep->next)
5427     {
5428       if (qep->mid != sma->mid)
5429         continue;
5430       qe = qep;
5431       break;
5432     }
5433     break;
5434   }
5435   if (NULL == qe)
5436   {
5437     /* this should never happen */
5438     GNUNET_break (0);
5439     GNUNET_SERVICE_client_drop (tc->client);
5440     return;
5441   }
5442   GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
5443                                qe->queue->queue_tail,
5444                                qe);
5445   qe->queue->queue_length--;
5446   tc->details.communicator.total_queue_length--;
5447   GNUNET_SERVICE_client_continue (tc->client);
5448
5449   /* if applicable, resume transmissions that waited on ACK */
5450   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
5451   {
5452     /* Communicator dropped below threshold, resume all queues */
5453     GNUNET_STATISTICS_update (GST_stats,
5454                               "# Transmission throttled due to communicator queue limit",
5455                               -1,
5456                               GNUNET_NO);
5457     for (struct Queue *queue = tc->details.communicator.queue_head;
5458          NULL != queue;
5459          queue = queue->next_client)
5460       schedule_transmit_on_queue (queue);
5461   }
5462   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
5463   {
5464     /* queue dropped below threshold; only resume this one queue */
5465     GNUNET_STATISTICS_update (GST_stats,
5466                               "# Transmission throttled due to queue queue limit",
5467                               -1,
5468                               GNUNET_NO);
5469     schedule_transmit_on_queue (qe->queue);
5470   }
5471
5472   /* TODO: we also should react on the status! */
5473   // FIXME: this probably requires queue->pm = s assignment!
5474   // FIXME: react to communicator status about transmission request. We got:
5475   sma->status; // OK success, SYSERR failure
5476
5477   GNUNET_free (qe);
5478 }
5479
5480
5481 /**
5482  * Iterator telling new MONITOR client about all existing
5483  * queues to peers.
5484  *
5485  * @param cls the new `struct TransportClient`
5486  * @param pid a connected peer
5487  * @param value the `struct Neighbour` with more information
5488  * @return #GNUNET_OK (continue to iterate)
5489  */
5490 static int
5491 notify_client_queues (void *cls,
5492                       const struct GNUNET_PeerIdentity *pid,
5493                       void *value)
5494 {
5495   struct TransportClient *tc = cls;
5496   struct Neighbour *neighbour = value;
5497
5498   GNUNET_assert (CT_MONITOR == tc->type);
5499   for (struct Queue *q = neighbour->queue_head;
5500        NULL != q;
5501        q = q->next_neighbour)
5502   {
5503     struct MonitorEvent me = {
5504       .rtt = q->rtt,
5505       .cs = q->cs,
5506       .num_msg_pending = q->num_msg_pending,
5507       .num_bytes_pending = q->num_bytes_pending
5508     };
5509
5510     notify_monitor (tc,
5511                     pid,
5512                     q->address,
5513                     q->nt,
5514                     &me);
5515   }
5516   return GNUNET_OK;
5517 }
5518
5519
5520 /**
5521  * Initialize a monitor client.
5522  *
5523  * @param cls the client
5524  * @param start the start message that was sent
5525  */
5526 static void
5527 handle_monitor_start (void *cls,
5528                       const struct GNUNET_TRANSPORT_MonitorStart *start)
5529 {
5530   struct TransportClient *tc = cls;
5531
5532   if (CT_NONE != tc->type)
5533   {
5534     GNUNET_break (0);
5535     GNUNET_SERVICE_client_drop (tc->client);
5536     return;
5537   }
5538   tc->type = CT_MONITOR;
5539   tc->details.monitor.peer = start->peer;
5540   tc->details.monitor.one_shot = ntohl (start->one_shot);
5541   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
5542                                          &notify_client_queues,
5543                                          tc);
5544   GNUNET_SERVICE_client_mark_monitor (tc->client);
5545   GNUNET_SERVICE_client_continue (tc->client);
5546 }
5547
5548
5549 /**
5550  * Find transport client providing communication service
5551  * for the protocol @a prefix.
5552  *
5553  * @param prefix communicator name
5554  * @return NULL if no such transport client is available
5555  */
5556 static struct TransportClient *
5557 lookup_communicator (const char *prefix)
5558 {
5559   for (struct TransportClient *tc = clients_head;
5560        NULL != tc;
5561        tc = tc->next)
5562   {
5563     if (CT_COMMUNICATOR != tc->type)
5564       continue;
5565     if (0 == strcmp (prefix,
5566                      tc->details.communicator.address_prefix))
5567       return tc;
5568   }
5569   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
5570               "Somone suggested use of communicator for `%s', but we do not have such a communicator!\n",
5571               prefix);
5572   return NULL;
5573 }
5574
5575
5576 /**
5577  * Signature of a function called with a communicator @a address of a peer
5578  * @a pid that an application wants us to connect to.
5579  *
5580  * @param pid target peer
5581  * @param address the address to try
5582  */
5583 static void
5584 suggest_to_connect (const struct GNUNET_PeerIdentity *pid,
5585                     const char *address)
5586 {
5587   static uint32_t idgen;
5588   struct TransportClient *tc;
5589   char *prefix;
5590   struct GNUNET_TRANSPORT_CreateQueue *cqm;
5591   struct GNUNET_MQ_Envelope *env;
5592   size_t alen;
5593
5594   prefix = GNUNET_HELLO_address_to_prefix (address);
5595   if (NULL == prefix)
5596   {
5597     GNUNET_break (0); /* We got an invalid address!? */
5598     return;
5599   }
5600   tc = lookup_communicator (prefix);
5601   if (NULL == tc)
5602   {
5603     GNUNET_STATISTICS_update (GST_stats,
5604                               "# Suggestions ignored due to missing communicator",
5605                               1,
5606                               GNUNET_NO);
5607     return;
5608   }
5609   /* forward suggestion for queue creation to communicator */
5610   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5611               "Request #%u for `%s' communicator to create queue to `%s'\n",
5612               (unsigned int) idgen,
5613               prefix,
5614               address);
5615   alen = strlen (address) + 1;
5616   env = GNUNET_MQ_msg_extra (cqm,
5617                              alen,
5618                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
5619   cqm->request_id = htonl (idgen++);
5620   cqm->receiver = *pid;
5621   memcpy (&cqm[1],
5622           address,
5623           alen);
5624   GNUNET_MQ_send (tc->mq,
5625                   env);
5626 }
5627
5628
5629 /**
5630  * The queue @a q (which matches the peer and address in @a vs) is
5631  * ready for queueing. We should now queue the validation request.
5632  *
5633  * @param q queue to send on
5634  * @param vs state to derive validation challenge from
5635  */
5636 static void
5637 validation_transmit_on_queue (struct Queue *q,
5638                               struct ValidationState *vs)
5639 {
5640   struct GNUNET_MQ_Envelope *env;
5641   struct TransportValidationChallenge *tvc;
5642
5643   vs->last_challenge_use = GNUNET_TIME_absolute_get ();
5644   env = GNUNET_MQ_msg (tvc,
5645                        GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
5646   tvc->reserved = htonl (0);
5647   tvc->challenge = vs->challenge;
5648   tvc->sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
5649   // FIXME: not so easy, need to BOX this message
5650   // in a transmission request! (mistake also done elsewhere!)
5651   GNUNET_MQ_send (q->tc->mq,
5652                   env);
5653 }
5654
5655
5656 /**
5657  * Task run periodically to validate some address based on #validation_heap.
5658  *
5659  * @param cls NULL
5660  */
5661 static void
5662 validation_start_cb (void *cls)
5663 {
5664   struct ValidationState *vs;
5665   struct Neighbour *n;
5666   struct Queue *q;
5667
5668   (void) cls;
5669   validation_task = NULL;
5670   vs = GNUNET_CONTAINER_heap_peek (validation_heap);
5671   /* drop validations past their expiration */
5672   while ( (NULL != vs) &&
5673           (0 == GNUNET_TIME_absolute_get_remaining (vs->valid_until).rel_value_us) )
5674   {
5675     free_validation_state (vs);
5676     vs = GNUNET_CONTAINER_heap_peek (validation_heap);
5677   }
5678   if (NULL == vs)
5679     return; /* woopsie, no more addresses known, should only
5680                happen if we're really a lonely peer */
5681   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
5682                                          &vs->pid);
5683   q = NULL;
5684   if (NULL != n)
5685   {
5686     for (struct Queue *pos = n->queue_head;
5687          NULL != pos;
5688          pos = pos->next_neighbour)
5689     {
5690       if (0 == strcmp (pos->address,
5691                        vs->address))
5692       {
5693         q = pos;
5694         break;
5695       }
5696     }
5697   }
5698   if (NULL == q)
5699   {
5700     vs->awaiting_queue = GNUNET_YES;
5701     suggest_to_connect (&vs->pid,
5702                         vs->address);
5703   }
5704   else
5705     validation_transmit_on_queue (q,
5706                                   vs);
5707   /* Finally, reschedule next attempt */
5708   vs->challenge_backoff = GNUNET_TIME_randomized_backoff (vs->challenge_backoff,
5709                                                           MAX_VALIDATION_CHALLENGE_FREQ);
5710   update_next_challenge_time (vs,
5711                               GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
5712 }
5713
5714
5715 /**
5716  * Closure for #check_connection_quality.
5717  */
5718 struct QueueQualityContext
5719 {
5720   /**
5721    * Set to the @e k'th queue encountered.
5722    */ 
5723   struct Queue *q;
5724
5725   /**
5726    * Set to the number of quality queues encountered.
5727    */
5728   unsigned int quality_count;
5729
5730   /**
5731    * Set to the total number of queues encountered.
5732    */ 
5733   unsigned int num_queues;
5734
5735   /**
5736    * Decremented for each queue, for selection of the
5737    * k-th queue in @e q.
5738    */
5739   unsigned int k;
5740
5741 };
5742
5743
5744 /**
5745  * Check whether any queue to the given neighbour is
5746  * of a good "quality" and if so, increment the counter.
5747  * Also counts the total number of queues, and returns
5748  * the k-th queue found.
5749  *
5750  * @param cls a `struct QueueQualityContext *` with counters
5751  * @param pid peer this is about
5752  * @param value a `struct Neighbour`
5753  * @return #GNUNET_OK (continue to iterate)
5754  */
5755 static int
5756 check_connection_quality (void *cls,
5757                           const struct GNUNET_PeerIdentity *pid,
5758                           void *value)
5759 {
5760   struct QueueQualityContext *ctx = cls;
5761   struct Neighbour *n = value;
5762   int do_inc;
5763
5764   (void) pid;
5765   do_inc = GNUNET_NO;
5766   for (struct Queue *q = n->queue_head;
5767        NULL != q;
5768        q = q->next_neighbour)
5769   {
5770     if (0 != q->distance)
5771       continue; /* DV does not count */
5772     ctx->num_queues++;
5773     if (0 == ctx->k--)
5774       ctx->q = q;
5775     /* OPTIMIZE-FIXME: in the future, add reliability / goodput
5776        statistics and consider those as well here? */
5777     if (q->rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
5778       do_inc = GNUNET_YES;
5779   }
5780   if (GNUNET_YES == do_inc)
5781     ctx->quality_count++;
5782   return GNUNET_OK;
5783 }
5784
5785
5786 /**
5787  * Task run when we CONSIDER initiating a DV learn 
5788  * process. We first check that sending out a message is
5789  * even possible (queues exist), then that it is desirable
5790  * (if not, reschedule the task for later), and finally
5791  * we may then begin the job.  If there are too many
5792  * entries in the #dvlearn_map, we purge the oldest entry
5793  * using #lle_tail.
5794  *
5795  * @param cls NULL
5796  */
5797 static void
5798 start_dv_learn (void *cls)
5799 {
5800   struct LearnLaunchEntry *lle;
5801   struct QueueQualityContext qqc;
5802   struct GNUNET_MQ_Envelope *env;
5803   struct TransportDVLearn *dvl;
5804
5805   (void) cls;
5806   dvlearn_task = NULL;
5807   if (0 ==
5808       GNUNET_CONTAINER_multipeermap_size (neighbours))
5809     return; /* lost all connectivity, cannot do learning */
5810   qqc.quality_count = 0;
5811   qqc.num_queues = 0;
5812   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
5813                                          &check_connection_quality,
5814                                          &qqc);
5815   if (qqc.quality_count > DV_LEARN_QUALITY_THRESHOLD)
5816   {
5817     struct GNUNET_TIME_Relative delay;
5818     unsigned int factor;
5819
5820     /* scale our retries by how far we are above the threshold */
5821     factor = qqc.quality_count / DV_LEARN_QUALITY_THRESHOLD;
5822     delay = GNUNET_TIME_relative_multiply (DV_LEARN_BASE_FREQUENCY,
5823                                            factor);
5824     dvlearn_task = GNUNET_SCHEDULER_add_delayed (delay,
5825                                                  &start_dv_learn,
5826                                                  NULL);
5827     return;
5828   }
5829   /* remove old entries in #dvlearn_map if it has grown too big */
5830   while (MAX_DV_LEARN_PENDING >=
5831          GNUNET_CONTAINER_multishortmap_size (dvlearn_map))
5832   {
5833     lle = lle_tail;
5834     GNUNET_assert (GNUNET_YES ==
5835                    GNUNET_CONTAINER_multishortmap_remove (dvlearn_map,
5836                                                           &lle->challenge,
5837                                                           lle));
5838     GNUNET_CONTAINER_DLL_remove (lle_head,
5839                                  lle_tail,
5840                                  lle);
5841     GNUNET_free (lle);
5842   }
5843   /* setup data structure for learning */
5844   lle = GNUNET_new (struct LearnLaunchEntry);
5845   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
5846                               &lle->challenge,
5847                               sizeof (lle->challenge));
5848   GNUNET_CONTAINER_DLL_insert (lle_head,
5849                                lle_tail,
5850                                lle);
5851   GNUNET_break (GNUNET_YES ==
5852                 GNUNET_CONTAINER_multishortmap_put (dvlearn_map,
5853                                                     &lle->challenge,
5854                                                     lle,
5855                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
5856   env = GNUNET_MQ_msg (dvl,
5857                        GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
5858   dvl->num_hops = htons (0);
5859   dvl->bidirectional = htons (0);
5860   dvl->non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
5861   {
5862     struct DvInitPS dvip = {
5863       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
5864       .purpose.size = htonl (sizeof (dvip)),
5865       .challenge = lle->challenge
5866     };
5867
5868     GNUNET_assert (GNUNET_OK ==
5869                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
5870                                              &dvip.purpose,
5871                                              &dvl->init_sig));
5872   }
5873   dvl->initiator = GST_my_identity;
5874   dvl->challenge = lle->challenge;
5875
5876   qqc.quality_count = 0;
5877   qqc.k = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
5878                                     qqc.num_queues);
5879   qqc.num_queues = 0;
5880   qqc.q = NULL;
5881   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
5882                                          &check_connection_quality,
5883                                          &qqc);
5884   GNUNET_assert (NULL != qqc.q);
5885   
5886   /* Do this as close to transmission time as possible! */
5887   lle->launch_time = GNUNET_TIME_absolute_get (); 
5888   // FIXME: not so easy, need to BOX this message
5889   // in a transmission request! (mistake also done elsewhere!)
5890   GNUNET_MQ_send (qqc.q->tc->mq,
5891                   env);
5892
5893   /* reschedule this job, randomizing the time it runs (but no 
5894      actual backoff!) */
5895   dvlearn_task
5896     = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY),
5897                                     &start_dv_learn,
5898                                     NULL);
5899 }
5900
5901
5902 /**
5903  * A new queue has been created, check if any address validation
5904  * requests have been waiting for it.
5905  *
5906  * @param cls a `struct Queue`
5907  * @param pid peer concerned (unused)
5908  * @param value a `struct ValidationState`
5909  * @return #GNUNET_NO if a match was found and we can stop looking
5910  */
5911 static int
5912 check_validation_request_pending (void *cls,
5913                                   const struct GNUNET_PeerIdentity *pid,
5914                                   void *value)
5915 {
5916   struct Queue *q = cls;
5917   struct ValidationState *vs = value;
5918
5919   (void) pid;
5920   if ( (GNUNET_YES == vs->awaiting_queue) &&
5921        (0 == strcmp (vs->address,
5922                      q->address)) )
5923   {
5924     vs->awaiting_queue = GNUNET_NO;
5925     validation_transmit_on_queue (q,
5926                                   vs);
5927     return GNUNET_NO;
5928   }
5929   return GNUNET_OK;
5930 }
5931
5932
5933 /**
5934  * New queue became available.  Process the request.
5935  *
5936  * @param cls the client
5937  * @param aqm the send message that was sent
5938  */
5939 static void
5940 handle_add_queue_message (void *cls,
5941                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
5942 {
5943   struct TransportClient *tc = cls;
5944   struct Queue *queue;
5945   struct Neighbour *neighbour;
5946   const char *addr;
5947   uint16_t addr_len;
5948
5949   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
5950   {
5951     /* MTU so small as to be useless for transmissions,
5952        required for #fragment_message()! */
5953     GNUNET_break_op (0);
5954     GNUNET_SERVICE_client_drop (tc->client);
5955     return;
5956   }
5957   neighbour = lookup_neighbour (&aqm->receiver);
5958   if (NULL == neighbour)
5959   {
5960     neighbour = GNUNET_new (struct Neighbour);
5961     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
5962     neighbour->pid = aqm->receiver;
5963     GNUNET_assert (GNUNET_OK ==
5964                    GNUNET_CONTAINER_multipeermap_put (neighbours,
5965                                                       &neighbour->pid,
5966                                                       neighbour,
5967                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
5968     cores_send_connect_info (&neighbour->pid,
5969                              GNUNET_BANDWIDTH_ZERO);
5970   }
5971   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
5972   addr = (const char *) &aqm[1];
5973
5974   queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
5975   queue->tc = tc;
5976   queue->address = (const char *) &queue[1];
5977   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
5978   queue->qid = aqm->qid;
5979   queue->mtu = ntohl (aqm->mtu);
5980   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
5981   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
5982   queue->neighbour = neighbour;
5983   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
5984                                   &tracker_update_in_cb,
5985                                   queue,
5986                                   GNUNET_BANDWIDTH_ZERO,
5987                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
5988                                   &tracker_excess_in_cb,
5989                                   queue);
5990   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
5991                                   &tracker_update_out_cb,
5992                                   queue,
5993                                   GNUNET_BANDWIDTH_ZERO,
5994                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
5995                                   &tracker_excess_out_cb,
5996                                   queue);
5997   memcpy (&queue[1],
5998           addr,
5999           addr_len);
6000   /* notify monitors about new queue */
6001   {
6002     struct MonitorEvent me = {
6003       .rtt = queue->rtt,
6004       .cs = queue->cs
6005     };
6006
6007     notify_monitors (&neighbour->pid,
6008                      queue->address,
6009                      queue->nt,
6010                      &me);
6011   }
6012   GNUNET_CONTAINER_MDLL_insert (neighbour,
6013                                 neighbour->queue_head,
6014                                 neighbour->queue_tail,
6015                                 queue);
6016   GNUNET_CONTAINER_MDLL_insert (client,
6017                                 tc->details.communicator.queue_head,
6018                                 tc->details.communicator.queue_tail,
6019                                 queue);
6020   /* check if valdiations are waiting for the queue */
6021   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
6022                                                      &aqm->receiver,
6023                                                      &check_validation_request_pending,
6024                                                      queue);
6025   /* might be our first queue, try launching DV learning */
6026   if (NULL == dvlearn_task)
6027     dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn,
6028                                              NULL);
6029   GNUNET_SERVICE_client_continue (tc->client);
6030 }
6031
6032
6033 /**
6034  * Communicator tells us that our request to create a queue "worked", that
6035  * is setting up the queue is now in process.
6036  *
6037  * @param cls the `struct TransportClient`
6038  * @param cqr confirmation message
6039  */
6040 static void
6041 handle_queue_create_ok (void *cls,
6042                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
6043 {
6044   struct TransportClient *tc = cls;
6045
6046   if (CT_COMMUNICATOR != tc->type)
6047   {
6048     GNUNET_break (0);
6049     GNUNET_SERVICE_client_drop (tc->client);
6050     return;
6051   }
6052   GNUNET_STATISTICS_update (GST_stats,
6053                             "# Suggestions succeeded at communicator",
6054                             1,
6055                             GNUNET_NO);
6056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6057               "Request #%u for communicator to create queue succeeded\n",
6058               (unsigned int) ntohs (cqr->request_id));
6059   GNUNET_SERVICE_client_continue (tc->client);
6060 }
6061
6062
6063 /**
6064  * Communicator tells us that our request to create a queue failed. This usually
6065  * indicates that the provided address is simply invalid or that the communicator's
6066  * resources are exhausted.
6067  *
6068  * @param cls the `struct TransportClient`
6069  * @param cqr failure message
6070  */
6071 static void
6072 handle_queue_create_fail (void *cls,
6073                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
6074 {
6075   struct TransportClient *tc = cls;
6076
6077   if (CT_COMMUNICATOR != tc->type)
6078   {
6079     GNUNET_break (0);
6080     GNUNET_SERVICE_client_drop (tc->client);
6081     return;
6082   }
6083   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6084               "Request #%u for communicator to create queue failed\n",
6085               (unsigned int) ntohs (cqr->request_id));
6086   GNUNET_STATISTICS_update (GST_stats,
6087                             "# Suggestions failed in queue creation at communicator",
6088                             1,
6089                             GNUNET_NO);
6090   GNUNET_SERVICE_client_continue (tc->client);
6091 }
6092
6093
6094 /**
6095  * We have received a `struct ExpressPreferenceMessage` from an application client.
6096  *
6097  * @param cls handle to the client
6098  * @param msg the start message
6099  */
6100 static void
6101 handle_suggest_cancel (void *cls,
6102                        const struct ExpressPreferenceMessage *msg)
6103 {
6104   struct TransportClient *tc = cls;
6105   struct PeerRequest *pr;
6106
6107   if (CT_APPLICATION != tc->type)
6108   {
6109     GNUNET_break (0);
6110     GNUNET_SERVICE_client_drop (tc->client);
6111     return;
6112   }
6113   pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests,
6114                                           &msg->peer);
6115   if (NULL == pr)
6116   {
6117     GNUNET_break (0);
6118     GNUNET_SERVICE_client_drop (tc->client);
6119     return;
6120   }
6121   (void) stop_peer_request (tc,
6122                             &pr->pid,
6123                             pr);
6124   GNUNET_SERVICE_client_continue (tc->client);
6125 }
6126
6127
6128 /**
6129  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
6130  * messages. We do nothing here, real verification is done later.
6131  *
6132  * @param cls a `struct TransportClient *`
6133  * @param msg message to verify
6134  * @return #GNUNET_OK
6135  */
6136 static int
6137 check_address_consider_verify (void *cls,
6138                                const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
6139 {
6140   (void) cls;
6141   (void) hdr;
6142   return GNUNET_OK;
6143 }
6144
6145
6146 /**
6147  * Closure for #check_known_address.
6148  */
6149 struct CheckKnownAddressContext
6150 {
6151   /**
6152    * Set to the address we are looking for.
6153    */
6154   const char *address;
6155
6156   /**
6157    * Set to a matching validation state, if one was found.
6158    */
6159   struct ValidationState *vs;
6160 };
6161
6162
6163 /**
6164  * Test if the validation state in @a value matches the
6165  * address from @a cls.
6166  *
6167  * @param cls a `struct CheckKnownAddressContext`
6168  * @param pid unused (must match though)
6169  * @param value a `struct ValidationState`
6170  * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
6171  */
6172 static int
6173 check_known_address (void *cls,
6174                      const struct GNUNET_PeerIdentity *pid,
6175                      void *value)
6176 {
6177   struct CheckKnownAddressContext *ckac = cls;
6178   struct ValidationState *vs = value;
6179
6180   (void) pid;
6181   if (0 != strcmp (vs->address,
6182                    ckac->address))
6183     return GNUNET_OK;
6184   ckac->vs = vs;
6185   return GNUNET_NO;
6186 }
6187
6188
6189 /**
6190  * Start address validation.
6191  *
6192  * @param pid peer the @a address is for
6193  * @param address an address to reach @a pid (presumably)
6194  * @param expiration when did @a pid claim @a address will become invalid
6195  */
6196 static void
6197 start_address_validation (const struct GNUNET_PeerIdentity *pid,
6198                           const char *address,
6199                           struct GNUNET_TIME_Absolute expiration)
6200 {
6201   struct GNUNET_TIME_Absolute now;
6202   struct ValidationState *vs;
6203   struct CheckKnownAddressContext ckac = {
6204     .address = address,
6205     .vs = NULL
6206   };
6207
6208   if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
6209     return; /* expired */
6210   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
6211                                                      pid,
6212                                                      &check_known_address,
6213                                                      &ckac);
6214   if (NULL != (vs = ckac.vs))
6215   {
6216     /* if 'vs' is not currently valid, we need to speed up retrying the validation */
6217     if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
6218     {
6219       /* reduce backoff as we got a fresh advertisement */
6220       vs->challenge_backoff = GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
6221                                                         GNUNET_TIME_relative_divide (vs->challenge_backoff,
6222                                                                                      2));
6223       update_next_challenge_time (vs,
6224                                   GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
6225     }
6226     return;
6227   }
6228   now = GNUNET_TIME_absolute_get();
6229   vs = GNUNET_new (struct ValidationState);
6230   vs->pid = *pid;
6231   vs->valid_until = expiration;
6232   vs->first_challenge_use = now;
6233   vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
6234   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
6235                               &vs->challenge,
6236                               sizeof (vs->challenge));
6237   vs->address = GNUNET_strdup (address);
6238   GNUNET_assert (GNUNET_YES ==
6239                  GNUNET_CONTAINER_multipeermap_put (validation_map,
6240                                                     &vs->pid,
6241                                                     vs,
6242                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6243   update_next_challenge_time (vs,
6244                               now);
6245 }
6246
6247
6248 /**
6249  * Function called by PEERSTORE for each matching record.
6250  *
6251  * @param cls closure
6252  * @param record peerstore record information
6253  * @param emsg error message, or NULL if no errors
6254  */
6255 static void
6256 handle_hello (void *cls,
6257               const struct GNUNET_PEERSTORE_Record *record,
6258               const char *emsg)
6259 {
6260   struct PeerRequest *pr = cls;
6261   const char *val;
6262
6263   if (NULL != emsg)
6264   {
6265     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
6266                 "Got failure from PEERSTORE: %s\n",
6267                 emsg);
6268     return;
6269   }
6270   val = record->value;
6271   if ( (0 == record->value_size) ||
6272        ('\0' != val[record->value_size - 1]) )
6273   {
6274     GNUNET_break (0);
6275     return;
6276   }
6277   start_address_validation (&pr->pid,
6278                             (const char *) record->value,
6279                             record->expiry);
6280 }
6281
6282
6283 /**
6284  * We have received a `struct ExpressPreferenceMessage` from an application client.
6285  *
6286  * @param cls handle to the client
6287  * @param msg the start message
6288  */
6289 static void
6290 handle_suggest (void *cls,
6291                 const struct ExpressPreferenceMessage *msg)
6292 {
6293   struct TransportClient *tc = cls;
6294   struct PeerRequest *pr;
6295
6296   if (CT_NONE == tc->type)
6297   {
6298     tc->type = CT_APPLICATION;
6299     tc->details.application.requests
6300       = GNUNET_CONTAINER_multipeermap_create (16,
6301                                               GNUNET_YES);
6302   }
6303   if (CT_APPLICATION != tc->type)
6304   {
6305     GNUNET_break (0);
6306     GNUNET_SERVICE_client_drop (tc->client);
6307     return;
6308   }
6309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6310               "Client suggested we talk to %s with preference %d at rate %u\n",
6311               GNUNET_i2s (&msg->peer),
6312               (int) ntohl (msg->pk),
6313               (int) ntohl (msg->bw.value__));
6314   pr = GNUNET_new (struct PeerRequest);
6315   pr->tc = tc;
6316   pr->pid = msg->peer;
6317   pr->bw = msg->bw;
6318   pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk);
6319   if (GNUNET_YES !=
6320       GNUNET_CONTAINER_multipeermap_put (tc->details.application.requests,
6321                                          &pr->pid,
6322                                          pr,
6323                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
6324   {
6325     GNUNET_break (0);
6326     GNUNET_free (pr);
6327     GNUNET_SERVICE_client_drop (tc->client);
6328     return;
6329   }
6330   pr->wc = GNUNET_PEERSTORE_watch (peerstore,
6331                                    "transport",
6332                                    &pr->pid,
6333                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
6334                                    &handle_hello,
6335                                    pr);
6336   GNUNET_SERVICE_client_continue (tc->client);
6337 }
6338
6339
6340 /**
6341  * Given another peers address, consider checking it for validity
6342  * and then adding it to the Peerstore.
6343  *
6344  * @param cls a `struct TransportClient`
6345  * @param hdr message containing the raw address data and
6346  *        signature in the body, see #GNUNET_HELLO_extract_address()
6347  */
6348 static void
6349 handle_address_consider_verify (void *cls,
6350                                 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
6351 {
6352   struct TransportClient *tc = cls;
6353   char *address;
6354   enum GNUNET_NetworkType nt;
6355   struct GNUNET_TIME_Absolute expiration;
6356
6357   (void) cls;
6358   // FIXME: checking that we know this address already should
6359   //        be done BEFORE checking the signature => HELLO API change!
6360   // FIXME: pre-check: rate-limit signature verification / validation?!
6361   address = GNUNET_HELLO_extract_address (&hdr[1],
6362                                           ntohs (hdr->header.size) - sizeof (*hdr),
6363                                           &hdr->peer,
6364                                           &nt,
6365                                           &expiration);
6366   if (NULL == address)
6367   {
6368     GNUNET_break_op (0);
6369     return;
6370   }
6371   start_address_validation (&hdr->peer,
6372                             address,
6373                             expiration);
6374   GNUNET_free (address);
6375   GNUNET_SERVICE_client_continue (tc->client);
6376 }
6377
6378
6379 /**
6380  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_HELLO_VALIDATION
6381  * messages.
6382  *
6383  * @param cls a `struct TransportClient *`
6384  * @param m message to verify
6385  * @return #GNUNET_OK on success
6386  */
6387 static int
6388 check_request_hello_validation (void *cls,
6389                                 const struct RequestHelloValidationMessage *m)
6390 {
6391   (void) cls;
6392   GNUNET_MQ_check_zero_termination (m);
6393   return GNUNET_OK;
6394 }
6395
6396
6397 /**
6398  * A client encountered an address of another peer. Consider validating it,
6399  * and if validation succeeds, persist it to PEERSTORE.
6400  *
6401  * @param cls a `struct TransportClient *`
6402  * @param m message to verify
6403  */
6404 static void
6405 handle_request_hello_validation (void *cls,
6406                                  const struct RequestHelloValidationMessage *m)
6407 {
6408   struct TransportClient *tc = cls;
6409
6410   start_address_validation (&m->peer,
6411                             (const char *) &m[1],
6412                             GNUNET_TIME_absolute_ntoh (m->expiration));
6413   GNUNET_SERVICE_client_continue (tc->client);
6414 }
6415
6416
6417 /**
6418  * Free neighbour entry.
6419  *
6420  * @param cls NULL
6421  * @param pid unused
6422  * @param value a `struct Neighbour`
6423  * @return #GNUNET_OK (always)
6424  */
6425 static int
6426 free_neighbour_cb (void *cls,
6427                    const struct GNUNET_PeerIdentity *pid,
6428                    void *value)
6429 {
6430   struct Neighbour *neighbour = value;
6431
6432   (void) cls;
6433   (void) pid;
6434   GNUNET_break (0); // should this ever happen?
6435   free_neighbour (neighbour);
6436
6437   return GNUNET_OK;
6438 }
6439
6440
6441 /**
6442  * Free DV route entry.
6443  *
6444  * @param cls NULL
6445  * @param pid unused
6446  * @param value a `struct DistanceVector`
6447  * @return #GNUNET_OK (always)
6448  */
6449 static int
6450 free_dv_routes_cb (void *cls,
6451                    const struct GNUNET_PeerIdentity *pid,
6452                    void *value)
6453 {
6454   struct DistanceVector *dv = value;
6455
6456   (void) cls;
6457   (void) pid;
6458   free_dv_route (dv);
6459
6460   return GNUNET_OK;
6461 }
6462
6463
6464 /**
6465  * Free ephemeral entry.
6466  *
6467  * @param cls NULL
6468  * @param pid unused
6469  * @param value a `struct EphemeralCacheEntry`
6470  * @return #GNUNET_OK (always)
6471  */
6472 static int
6473 free_ephemeral_cb (void *cls,
6474                    const struct GNUNET_PeerIdentity *pid,
6475                    void *value)
6476 {
6477   struct EphemeralCacheEntry *ece = value;
6478
6479   (void) cls;
6480   (void) pid;
6481   free_ephemeral (ece);
6482   return GNUNET_OK;
6483 }
6484
6485
6486 /**
6487  * Free validation state.
6488  *
6489  * @param cls NULL
6490  * @param pid unused
6491  * @param value a `struct ValidationState`
6492  * @return #GNUNET_OK (always)
6493  */
6494 static int
6495 free_validation_state_cb (void *cls,
6496                           const struct GNUNET_PeerIdentity *pid,
6497                           void *value)
6498 {
6499   struct ValidationState *vs = value;
6500
6501   (void) cls;
6502   (void) pid;
6503   free_validation_state (vs);
6504   return GNUNET_OK;
6505 }
6506
6507
6508 /**
6509  * Function called when the service shuts down.  Unloads our plugins
6510  * and cancels pending validations.
6511  *
6512  * @param cls closure, unused
6513  */
6514 static void
6515 do_shutdown (void *cls)
6516 {
6517   struct LearnLaunchEntry *lle;
6518   (void) cls;
6519
6520   if (NULL != ephemeral_task)
6521   {
6522     GNUNET_SCHEDULER_cancel (ephemeral_task);
6523     ephemeral_task = NULL;
6524   }
6525   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
6526                                          &free_neighbour_cb,
6527                                          NULL);
6528   if (NULL != peerstore)
6529   {
6530     GNUNET_PEERSTORE_disconnect (peerstore,
6531                                  GNUNET_NO);
6532     peerstore = NULL;
6533   }
6534   if (NULL != GST_stats)
6535   {
6536     GNUNET_STATISTICS_destroy (GST_stats,
6537                                GNUNET_NO);
6538     GST_stats = NULL;
6539   }
6540   if (NULL != GST_my_private_key)
6541   {
6542     GNUNET_free (GST_my_private_key);
6543     GST_my_private_key = NULL;
6544   }
6545   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
6546   neighbours = NULL;
6547   GNUNET_CONTAINER_multipeermap_iterate (validation_map,
6548                                          &free_validation_state_cb,
6549                                          NULL);
6550   GNUNET_CONTAINER_multipeermap_destroy (validation_map);
6551   validation_map = NULL;
6552   while (NULL != (lle = lle_head))
6553   {
6554     GNUNET_CONTAINER_DLL_remove (lle_head,
6555                                  lle_tail,
6556                                  lle);
6557     GNUNET_free (lle);
6558   }
6559   GNUNET_CONTAINER_multishortmap_destroy (dvlearn_map);
6560   dvlearn_map = NULL;
6561   GNUNET_CONTAINER_heap_destroy (validation_heap);
6562   validation_heap = NULL;
6563   GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
6564                                          &free_dv_routes_cb,
6565                                          NULL);
6566   GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
6567   dv_routes = NULL;
6568   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
6569                                          &free_ephemeral_cb,
6570                                          NULL);
6571   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
6572   ephemeral_map = NULL;
6573   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
6574   ephemeral_heap = NULL;
6575 }
6576
6577
6578 /**
6579  * Initiate transport service.
6580  *
6581  * @param cls closure
6582  * @param c configuration to use
6583  * @param service the initialized service
6584  */
6585 static void
6586 run (void *cls,
6587      const struct GNUNET_CONFIGURATION_Handle *c,
6588      struct GNUNET_SERVICE_Handle *service)
6589 {
6590   (void) cls;
6591   (void) service;
6592   /* setup globals */
6593   GST_cfg = c;
6594   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
6595                                                      GNUNET_YES);
6596   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
6597                                                     GNUNET_YES);
6598   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
6599                                                         GNUNET_YES);
6600   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
6601   dvlearn_map = GNUNET_CONTAINER_multishortmap_create (2 * MAX_DV_LEARN_PENDING,
6602                                                        GNUNET_YES);
6603   validation_map = GNUNET_CONTAINER_multipeermap_create (1024,
6604                                                          GNUNET_YES);
6605   validation_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
6606   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
6607   if (NULL == GST_my_private_key)
6608   {
6609     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
6610                 _("Transport service is lacking key configuration settings. Exiting.\n"));
6611     GNUNET_SCHEDULER_shutdown ();
6612     return;
6613   }
6614   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
6615                                       &GST_my_identity.public_key);
6616   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
6617              "My identity is `%s'\n",
6618              GNUNET_i2s_full (&GST_my_identity));
6619   GST_stats = GNUNET_STATISTICS_create ("transport",
6620                                         GST_cfg);
6621   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
6622                                  NULL);
6623   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
6624   if (NULL == peerstore)
6625   {
6626     GNUNET_break (0);
6627     GNUNET_SCHEDULER_shutdown ();
6628     return;
6629   }
6630 }
6631
6632
6633 /**
6634  * Define "main" method using service macro.
6635  */
6636 GNUNET_SERVICE_MAIN
6637 ("transport",
6638  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
6639  &run,
6640  &client_connect_cb,
6641  &client_disconnect_cb,
6642  NULL,
6643  /* communication with applications */
6644  GNUNET_MQ_hd_fixed_size (suggest,
6645                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST,
6646                           struct ExpressPreferenceMessage,
6647                           NULL),
6648  GNUNET_MQ_hd_fixed_size (suggest_cancel,
6649                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL,
6650                           struct ExpressPreferenceMessage,
6651                           NULL),
6652  GNUNET_MQ_hd_var_size (request_hello_validation,
6653                         GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_HELLO_VALIDATION,
6654                         struct RequestHelloValidationMessage,
6655                         NULL),
6656  /* communication with core */
6657  GNUNET_MQ_hd_fixed_size (client_start,
6658                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
6659                           struct StartMessage,
6660                           NULL),
6661  GNUNET_MQ_hd_var_size (client_send,
6662                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
6663                         struct OutboundMessage,
6664                         NULL),
6665  /* communication with communicators */
6666  GNUNET_MQ_hd_var_size (communicator_available,
6667                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
6668                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
6669                         NULL),
6670  GNUNET_MQ_hd_var_size (communicator_backchannel,
6671                         GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
6672                         struct GNUNET_TRANSPORT_CommunicatorBackchannel,
6673                         NULL),
6674  GNUNET_MQ_hd_var_size (add_address,
6675                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
6676                         struct GNUNET_TRANSPORT_AddAddressMessage,
6677                         NULL),
6678  GNUNET_MQ_hd_fixed_size (del_address,
6679                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
6680                           struct GNUNET_TRANSPORT_DelAddressMessage,
6681                           NULL),
6682  GNUNET_MQ_hd_var_size (incoming_msg,
6683                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
6684                         struct GNUNET_TRANSPORT_IncomingMessage,
6685                         NULL),
6686  GNUNET_MQ_hd_fixed_size (queue_create_ok,
6687                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
6688                           struct GNUNET_TRANSPORT_CreateQueueResponse,
6689                           NULL),
6690  GNUNET_MQ_hd_fixed_size (queue_create_fail,
6691                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
6692                           struct GNUNET_TRANSPORT_CreateQueueResponse,
6693                           NULL),
6694  GNUNET_MQ_hd_var_size (add_queue_message,
6695                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
6696                         struct GNUNET_TRANSPORT_AddQueueMessage,
6697                         NULL),
6698  GNUNET_MQ_hd_var_size (address_consider_verify,
6699                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
6700                         struct GNUNET_TRANSPORT_AddressToVerify,
6701                         NULL),
6702  GNUNET_MQ_hd_fixed_size (del_queue_message,
6703                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
6704                           struct GNUNET_TRANSPORT_DelQueueMessage,
6705                           NULL),
6706  GNUNET_MQ_hd_fixed_size (send_message_ack,
6707                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
6708                           struct GNUNET_TRANSPORT_SendMessageToAck,
6709                           NULL),
6710  /* communication with monitors */
6711  GNUNET_MQ_hd_fixed_size (monitor_start,
6712                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
6713                           struct GNUNET_TRANSPORT_MonitorStart,
6714                           NULL),
6715  GNUNET_MQ_handler_end ());
6716
6717
6718 /* end of file gnunet-service-transport.c */