only notify core about validated queues
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement next:
36  * - route_message() implementation, including using DV data structures
37  *   (but not when routing certain message types, like DV learn,
38  *    looks like now like we need two flags (DV/no-DV, confirmed-only,
39  *    unconfirmed OK)
40  *   + NOTE: do NOT use PendingMessage for route_message(), as that is
41  *     for fragmentation/reliability and ultimately core flow control!
42  *     => route_message() should pick the queue
43  *     => in case of DV routing, route_message should BOX the message, too.
44  * - We currently do NEVER tell CORE also about DV-connections (core_visible
45  *    of `struct DistanceVector` is simply never set!)
46  *     + When? Easy if we initiated the DV and got the challenge; do that NOW
47  *        BUT what we passively learned DV (unconfirmed freshness)
48  *        => Do we trigger Challenge->Response there as well, or 'wait' for
49  *           our own DV initiations to discover?
50  *        => What about DV routes that expire? Do we also only count on
51  *           our own DV initiations for maintenance here, or do we
52  *           try to specifically re-confirm the existence of a particular path?
53  *        => OPITMIZATION-FIXME!
54  *   + Where do we track what we told core? Careful: need to check
55  *     the "core_visible' flag in both neighbours and DV before
56  *     sending out notifications to CORE!
57  * - retransmission logic
58  * - track RTT, distance, loss, etc. => requires extra data structures!
59  *
60  * Later:
61  * - change transport-core API to provide proper flow control in both
62  *   directions, allow multiple messages per peer simultaneously (tag
63  *   confirmations with unique message ID), and replace quota-out with
64  *   proper flow control;
65  * - if messages are below MTU, consider adding ACKs and other stuff
66  *   (requires planning at receiver, and additional MST-style demultiplex
67  *    at receiver!)
68  * - could avoid copying body of message into each fragment and keep
69  *   fragments as just pointers into the original message and only
70  *   fully build fragments just before transmission (optimization, should
71  *   reduce CPU and memory use)
72  *
73  * FIXME (without marks in the code!):
74  * - proper use/initialization of timestamps in messages exchanged
75  *   during DV learning
76  * - persistence of monotonic time obtained from other peers
77  *   in PEERSTORE (by message type)
78  *
79  * Optimizations:
80  * - use shorthashmap on msg_uuid's when matching reliability/fragment ACKs
81  *   against our pending message queue (requires additional per neighbour
82  *   hash map to be maintained, avoids possible linear scan on pending msgs)
83  * - queue_send_msg and route_message both by API design have to make copies
84  *   of the payload, and route_message on top of that requires a malloc/free.
85  *   Change design to approximate "zero" copy better...
86  *
87  * Design realizations / discussion:
88  * - communicators do flow control by calling MQ "notify sent"
89  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
90  *   or explicitly via backchannel FC ACKs.  As long as the
91  *   channel is not full, they may 'notify sent' even if the other
92  *   peer has not yet confirmed receipt. The other peer confirming
93  *   is _only_ for FC, not for more reliable transmission; reliable
94  *   transmission (i.e. of fragments) is left to _transport_.
95  * - ACKs sent back in uni-directional communicators are done via
96  *   the background channel API; here transport _may_ initially
97  *   broadcast (with bounded # hops) if no path is known;
98  * - transport should _integrate_ DV-routing and build a view of
99  *   the network; then background channel traffic can be
100  *   routed via DV as well as explicit "DV" traffic.
101  * - background channel is also used for ACKs and NAT traversal support
102  * - transport service is responsible for AEAD'ing the background
103  *   channel, timestamps and monotonic time are used against replay
104  *   of old messages -> peerstore needs to be supplied with
105  *   "latest timestamps seen" data
106  * - if transport implements DV, we likely need a 3rd peermap
107  *   in addition to ephemerals and (direct) neighbours
108  *   ==> check if stuff needs to be moved out of "Neighbour"
109  * - transport should encapsualte core-level messages and do its
110  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
111  *   for retransmission
112  */
113 #include "platform.h"
114 #include "gnunet_util_lib.h"
115 #include "gnunet_statistics_service.h"
116 #include "gnunet_transport_monitor_service.h"
117 #include "gnunet_peerstore_service.h"
118 #include "gnunet_hello_lib.h"
119 #include "gnunet_signatures.h"
120 #include "transport.h"
121
122
123 /**
124  * What is the size we assume for a read operation in the
125  * absence of an MTU for the purpose of flow control?
126  */
127 #define IN_PACKET_SIZE_WITHOUT_MTU 128
128
129 /**
130  * Minimum number of hops we should forward DV learn messages
131  * even if they are NOT useful for us in hope of looping
132  * back to the initiator?
133  *
134  * FIXME: allow initiator some control here instead?
135  */
136 #define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
137
138 /**
139  * Maximum DV distance allowed ever.
140  */
141 #define MAX_DV_HOPS_ALLOWED 16
142
143 /**
144  * Maximum number of DV learning activities we may
145  * have pending at the same time.
146  */
147 #define MAX_DV_LEARN_PENDING 64
148
149 /**
150  * Maximum number of DV paths we keep simultaneously to the same target.
151  */
152 #define MAX_DV_PATHS_TO_TARGET 3
153
154 /**
155  * If a queue delays the next message by more than this number
156  * of seconds we log a warning. Note: this is for testing,
157  * the value chosen here might be too aggressively low!
158  */
159 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
160
161 /**
162  * We only consider queues as "quality" connections when
163  * suppressing the generation of DV initiation messages if
164  * the latency of the queue is below this threshold.
165  */
166 #define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
167
168 /**
169  * How long do we consider a DV path valid if we see no
170  * further updates on it? Note: the value chosen here might be too low!
171  */
172 #define DV_PATH_VALIDITY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
173
174 /**
175  * How long before paths expire would we like to (re)discover DV paths? Should
176  * be below #DV_PATH_VALIDITY_TIMEOUT.
177  */
178 #define DV_PATH_DISCOVERY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
179
180 /**
181  * How long are ephemeral keys valid?
182  */
183 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
184
185 /**
186  * How long do we keep partially reassembled messages around before giving up?
187  */
188 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
189
190 /**
191  * What is the fastest rate at which we send challenges *if* we keep learning
192  * an address (gossip, DHT, etc.)?
193  */
194 #define FAST_VALIDATION_CHALLENGE_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
195
196 /**
197  * What is the slowest rate at which we send challenges?
198  */
199 #define MAX_VALIDATION_CHALLENGE_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
200
201 /**
202  * What is the non-randomized base frequency at which we
203  * would initiate DV learn messages?
204  */
205 #define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
206
207 /**
208  * How many good connections (confirmed, bi-directional, not DV)
209  * do we need to have to suppress initiating DV learn messages?
210  */
211 #define DV_LEARN_QUALITY_THRESHOLD 100
212
213 /**
214  * When do we forget an invalid address for sure?
215  */
216 #define MAX_ADDRESS_VALID_UNTIL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
217 /**
218  * How long do we consider an address valid if we just checked?
219  */
220 #define ADDRESS_VALIDATION_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
221
222 /**
223  * What is the maximum frequency at which we do address validation?
224  * A random value between 0 and this value is added when scheduling
225  * the #validation_task (both to ensure we do not validate too often,
226  * and to randomize a bit).
227  */
228 #define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
229
230 /**
231  * How many network RTTs before an address validation expires should we begin
232  * trying to revalidate? (Note that the RTT used here is the one that we
233  * experienced during the last validation, not necessarily the latest RTT
234  * observed).
235  */
236 #define VALIDATION_RTT_BUFFER_FACTOR 3
237
238 /**
239  * How many messages can we have pending for a given communicator
240  * process before we start to throttle that communicator?
241  *
242  * Used if a communicator might be CPU-bound and cannot handle the traffic.
243  */
244 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
245
246 /**
247  * How many messages can we have pending for a given queue (queue to
248  * a particular peer via a communicator) process before we start to
249  * throttle that queue?
250  */
251 #define QUEUE_LENGTH_LIMIT 32
252
253
254 GNUNET_NETWORK_STRUCT_BEGIN
255
256 /**
257  * Outer layer of an encapsulated backchannel message.
258  */
259 struct TransportBackchannelEncapsulationMessage
260 {
261   /**
262    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
263    */
264   struct GNUNET_MessageHeader header;
265
266   /**
267    * Distance the backchannel message has traveled, to be updated at
268    * each hop.  Used to bound the number of hops in case a backchannel
269    * message is broadcast and thus travels without routing
270    * information (during initial backchannel discovery).
271    */
272   uint32_t distance;
273
274   /**
275    * Target's peer identity (as backchannels may be transmitted
276    * indirectly, or even be broadcast).
277    */
278   struct GNUNET_PeerIdentity target;
279
280   /**
281    * Ephemeral key setup by the sender for @e target, used
282    * to encrypt the payload.
283    */
284   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
285
286   /**
287    * We use an IV here as the @e ephemeral_key is re-used for
288    * #EPHEMERAL_VALIDITY time to avoid re-signing it all the time.
289    */
290   struct GNUNET_ShortHashCode iv;
291
292   /**
293    * HMAC over the ciphertext of the encrypted, variable-size
294    * body that follows.  Verified via DH of @e target and
295    * @e ephemeral_key
296    */
297   struct GNUNET_HashCode hmac;
298
299   /* Followed by encrypted, variable-size payload */
300 };
301
302
303 /**
304  * Body by which a peer confirms that it is using an ephemeral key.
305  */
306 struct EphemeralConfirmation
307 {
308
309   /**
310    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
311    */
312   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
313
314   /**
315    * How long is this signature over the ephemeral key valid?
316    *
317    * Note that the receiver MUST IGNORE the absolute time, and only interpret
318    * the value as a mononic time and reject "older" values than the last one
319    * observed.  This is necessary as we do not want to require synchronized
320    * clocks and may not have a bidirectional communication channel.
321    *
322    * Even with this, there is no real guarantee against replay achieved here,
323    * unless the latest timestamp is persisted.  While persistence should be
324    * provided via PEERSTORE, we do not consider the mechanism reliable!  Thus,
325    * communicators must protect against replay attacks when using backchannel
326    * communication!
327    */
328   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
329
330   /**
331    * Target's peer identity.
332    */
333   struct GNUNET_PeerIdentity target;
334
335   /**
336    * Ephemeral key setup by the sender for @e target, used
337    * to encrypt the payload.
338    */
339   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
340
341 };
342
343
344 /**
345  * Plaintext of the variable-size payload that is encrypted
346  * within a `struct TransportBackchannelEncapsulationMessage`
347  */
348 struct TransportBackchannelRequestPayload
349 {
350
351   /**
352    * Sender's peer identity.
353    */
354   struct GNUNET_PeerIdentity sender;
355
356   /**
357    * Signature of the sender over an
358    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
359    */
360   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
361
362   /**
363    * How long is this signature over the ephemeral key valid?
364    *
365    * Note that the receiver MUST IGNORE the absolute time, and only interpret
366    * the value as a mononic time and reject "older" values than the last one
367    * observed.  This is necessary as we do not want to require synchronized
368    * clocks and may not have a bidirectional communication channel.
369    *
370    * Even with this, there is no real guarantee against replay achieved here,
371    * unless the latest timestamp is persisted.  While persistence should be
372    * provided via PEERSTORE, we do not consider the mechanism reliable!  Thus,
373    * communicators must protect against replay attacks when using backchannel
374    * communication!
375    */
376   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
377
378   /**
379    * Current monotonic time of the sending transport service.  Used to
380    * detect replayed messages.  Note that the receiver should remember
381    * a list of the recently seen timestamps and only reject messages
382    * if the timestamp is in the list, or the list is "full" and the
383    * timestamp is smaller than the lowest in the list.
384    *
385    * Like the @e ephemeral_validity, the list of timestamps per peer should be
386    * persisted to guard against replays after restarts.
387    */
388   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
389
390   /* Followed by a `struct GNUNET_MessageHeader` with a message
391      for a communicator */
392
393   /* Followed by a 0-termianted string specifying the name of
394      the communicator which is to receive the message */
395
396 };
397
398
399 /**
400  * Outer layer of an encapsulated unfragmented application message sent
401  * over an unreliable channel.
402  */
403 struct TransportReliabilityBox
404 {
405   /**
406    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
407    */
408   struct GNUNET_MessageHeader header;
409
410   /**
411    * Number of messages still to be sent before a commulative
412    * ACK is requested.  Zero if an ACK is requested immediately.
413    * In NBO.  Note that the receiver may send the ACK faster
414    * if it believes that is reasonable.
415    */
416   uint32_t ack_countdown GNUNET_PACKED;
417
418   /**
419    * Unique ID of the message used for signalling receipt of
420    * messages sent over possibly unreliable channels.  Should
421    * be a random.
422    */
423   struct GNUNET_ShortHashCode msg_uuid;
424 };
425
426
427 /**
428  * Confirmation that the receiver got a
429  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
430  * confirmation may be transmitted over a completely different queue,
431  * so ACKs are identified by a combination of PID of sender and
432  * message UUID, without the queue playing any role!
433  */
434 struct TransportReliabilityAckMessage
435 {
436   /**
437    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
438    */
439   struct GNUNET_MessageHeader header;
440
441   /**
442    * Reserved. Zero.
443    */
444   uint32_t reserved GNUNET_PACKED;
445
446   /**
447    * How long was the ACK delayed relative to the average time of
448    * receipt of the messages being acknowledged?  Used to calculate
449    * the average RTT by taking the receipt time of the ack minus the
450    * average transmission time of the sender minus this value.
451    */
452   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
453
454   /* followed by any number of `struct GNUNET_ShortHashCode`
455      messages providing ACKs */
456 };
457
458
459 /**
460  * Outer layer of an encapsulated fragmented application message.
461  */
462 struct TransportFragmentBox
463 {
464   /**
465    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
466    */
467   struct GNUNET_MessageHeader header;
468
469   /**
470    * Unique ID of this fragment (and fragment transmission!). Will
471    * change even if a fragement is retransmitted to make each
472    * transmission attempt unique! Should be incremented by one for
473    * each fragment transmission. If a client receives a duplicate
474    * fragment (same @e frag_off), it must send
475    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
476    */
477   uint32_t frag_uuid GNUNET_PACKED;
478
479   /**
480    * Original message ID for of the message that all the1
481    * fragments belong to.  Must be the same for all fragments.
482    */
483   struct GNUNET_ShortHashCode msg_uuid;
484
485   /**
486    * Offset of this fragment in the overall message.
487    */
488   uint16_t frag_off GNUNET_PACKED;
489
490   /**
491    * Total size of the message that is being fragmented.
492    */
493   uint16_t msg_size GNUNET_PACKED;
494
495 };
496
497
498 /**
499  * Outer layer of an fragmented application message sent over a queue
500  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
501  * received, the receiver has two RTTs or 64 further fragments with
502  * the same basic message time to send an acknowledgement, possibly
503  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
504  * sent immediately once all fragments were sent.
505  */
506 struct TransportFragmentAckMessage
507 {
508   /**
509    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
510    */
511   struct GNUNET_MessageHeader header;
512
513   /**
514    * Unique ID of the lowest fragment UUID being acknowledged.
515    */
516   uint32_t frag_uuid GNUNET_PACKED;
517
518   /**
519    * Bitfield of up to 64 additional fragments following the
520    * @e msg_uuid being acknowledged by this message.
521    */
522   uint64_t extra_acks GNUNET_PACKED;
523
524   /**
525    * Original message ID for of the message that all the
526    * fragments belong to.
527    */
528   struct GNUNET_ShortHashCode msg_uuid;
529
530   /**
531    * How long was the ACK delayed relative to the average time of
532    * receipt of the fragments being acknowledged?  Used to calculate
533    * the average RTT by taking the receipt time of the ack minus the
534    * average transmission time of the sender minus this value.
535    */
536   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
537
538   /**
539    * How long until the receiver will stop trying reassembly
540    * of this message?
541    */
542   struct GNUNET_TIME_RelativeNBO reassembly_timeout;
543 };
544
545
546 /**
547  * Content signed by the initator during DV learning.
548  *
549  * The signature is required to prevent DDoS attacks. A peer sending out this
550  * message is potentially generating a lot of traffic that will go back to the
551  * initator, as peers receiving this message will try to let the initiator
552  * know that they got the message.
553  *
554  * Without this signature, an attacker could abuse this mechanism for traffic
555  * amplification, sending a lot of traffic to a peer by putting out this type
556  * of message with the victim's peer identity.
557  *
558  * Even with just a signature, traffic amplification would be possible via
559  * replay attacks. The @e monotonic_time limits such replay attacks, as every
560  * potential amplificator will check the @e monotonic_time and only respond
561  * (at most) once per message.
562  */
563 struct DvInitPS
564 {
565   /**
566    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
567    */
568   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
569
570   /**
571    * Time at the initiator when generating the signature.
572    *
573    * Note that the receiver MUST IGNORE the absolute time, and only interpret
574    * the value as a mononic time and reject "older" values than the last one
575    * observed.  This is necessary as we do not want to require synchronized
576    * clocks and may not have a bidirectional communication channel.
577    *
578    * Even with this, there is no real guarantee against replay achieved here,
579    * unless the latest timestamp is persisted.  Persistence should be
580    * provided via PEERSTORE if possible.
581    */
582   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
583
584   /**
585    * Challenge value used by the initiator to re-identify the path.
586    */
587   struct GNUNET_ShortHashCode challenge;
588
589 };
590
591
592 /**
593  * Content signed by each peer during DV learning.
594  *
595  * This assues the initiator of the DV learning operation that the hop from @e
596  * pred via the signing peer to @e succ actually exists.  This makes it
597  * impossible for an adversary to supply the network with bogus routes.
598  *
599  * The @e challenge is included to provide replay protection for the
600  * initiator. This way, the initiator knows that the hop existed after the
601  * original @e challenge was first transmitted, providing a freshness metric.
602  *
603  * Peers other than the initiator that passively learn paths by observing
604  * these messages do NOT benefit from this. Here, an adversary may indeed
605  * replay old messages.  Thus, passively learned paths should always be
606  * immediately marked as "potentially stale".
607  */
608 struct DvHopPS
609 {
610   /**
611    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP
612    */
613   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
614
615   /**
616    * Identity of the previous peer on the path.
617    */
618   struct GNUNET_PeerIdentity pred;
619
620   /**
621    * Identity of the next peer on the path.
622    */
623   struct GNUNET_PeerIdentity succ;
624
625   /**
626    * Challenge value used by the initiator to re-identify the path.
627    */
628   struct GNUNET_ShortHashCode challenge;
629
630 };
631
632
633 /**
634  * An entry describing a peer on a path in a
635  * `struct TransportDVLearn` message.
636  */
637 struct DVPathEntryP
638 {
639   /**
640    * Identity of a peer on the path.
641    */
642   struct GNUNET_PeerIdentity hop;
643
644   /**
645    * Signature of this hop over the path, of purpose
646    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP
647    */
648   struct GNUNET_CRYPTO_EddsaSignature hop_sig;
649
650 };
651
652
653 /**
654  * Internal message used by transport for distance vector learning.
655  * If @e num_hops does not exceed the threshold, peers should append
656  * themselves to the peer list and flood the message (possibly only
657  * to a subset of their neighbours to limit discoverability of the
658  * network topology).  To the extend that the @e bidirectional bits
659  * are set, peers may learn the inverse paths even if they did not
660  * initiate.
661  *
662  * Unless received on a bidirectional queue and @e num_hops just
663  * zero, peers that can forward to the initator should always try to
664  * forward to the initiator.
665  */
666 struct TransportDVLearn
667 {
668   /**
669    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
670    */
671   struct GNUNET_MessageHeader header;
672
673   /**
674    * Number of hops this messages has travelled, in NBO. Zero if
675    * sent by initiator.
676    */
677   uint16_t num_hops GNUNET_PACKED;
678
679   /**
680    * Bitmask of the last 16 hops indicating whether they are confirmed
681    * available (without DV) in both directions or not, in NBO.  Used
682    * to possibly instantly learn a path in both directions.  Each peer
683    * should shift this value by one to the left, and then set the
684    * lowest bit IF the current sender can be reached from it (without
685    * DV routing).
686    */
687   uint16_t bidirectional GNUNET_PACKED;
688
689   /**
690    * Peers receiving this message and delaying forwarding to other
691    * peers for any reason should increment this value by the non-network
692    * delay created by the peer.
693    */
694   struct GNUNET_TIME_RelativeNBO non_network_delay;
695
696   /**
697    * Signature of this hop over the path, of purpose
698    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
699    */
700   struct GNUNET_CRYPTO_EddsaSignature init_sig;
701
702   /**
703    * Identity of the peer that started this learning activity.
704    */
705   struct GNUNET_PeerIdentity initiator;
706
707   /**
708    * Challenge value used by the initiator to re-identify the path.
709    */
710   struct GNUNET_ShortHashCode challenge;
711
712   /* Followed by @e num_hops `struct DVPathEntryP` values,
713      excluding the initiator of the DV trace; the last entry is the
714      current sender; the current peer must not be included. */
715
716 };
717
718
719 /**
720  * Outer layer of an encapsulated message send over multiple hops.
721  * The path given only includes the identities of the subsequent
722  * peers, i.e. it will be empty if we are the receiver. Each
723  * forwarding peer should scan the list from the end, and if it can,
724  * forward to the respective peer. The list should then be shortened
725  * by all the entries up to and including that peer.  Each hop should
726  * also increment @e total_hops to allow the receiver to get a precise
727  * estimate on the number of hops the message travelled.  Senders must
728  * provide a learned path that thus should work, but intermediaries
729  * know of a shortcut, they are allowed to send the message via that
730  * shortcut.
731  *
732  * If a peer finds itself still on the list, it must drop the message.
733  */
734 struct TransportDVBox
735 {
736   /**
737    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
738    */
739   struct GNUNET_MessageHeader header;
740
741   /**
742    * Number of total hops this messages travelled. In NBO.
743    * @e origin sets this to zero, to be incremented at
744    * each hop.
745    */
746   uint16_t total_hops GNUNET_PACKED;
747
748   /**
749    * Number of hops this messages includes. In NBO.
750    */
751   uint16_t num_hops GNUNET_PACKED;
752
753   /**
754    * Identity of the peer that originated the message.
755    */
756   struct GNUNET_PeerIdentity origin;
757
758   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
759      excluding the @e origin and the current peer, the last must be
760      the ultimate target; if @e num_hops is zero, the receiver of this
761      message is the ultimate target. */
762
763   /* Followed by the actual message, which itself may be
764      another box, but not a DV_LEARN or DV_BOX message! */
765 };
766
767
768 /**
769  * Message send to another peer to validate that it can indeed
770  * receive messages at a particular address.
771  */
772 struct TransportValidationChallenge
773 {
774
775   /**
776    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE
777    */
778   struct GNUNET_MessageHeader header;
779
780   /**
781    * Zero.
782    */
783   uint32_t reserved GNUNET_PACKED;
784
785   /**
786    * Challenge to be signed by the receiving peer.
787    */
788   struct GNUNET_ShortHashCode challenge;
789
790   /**
791    * Timestamp of the sender, to be copied into the reply
792    * to allow sender to calculate RTT.
793    */
794   struct GNUNET_TIME_AbsoluteNBO sender_time;
795 };
796
797
798 /**
799  * Message signed by a peer to confirm that it can indeed
800  * receive messages at a particular address.
801  */
802 struct TransportValidationPS
803 {
804
805   /**
806    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE
807    */
808   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
809
810   /**
811    * How long does the sender believe the address on
812    * which the challenge was received to remain valid?
813    */
814   struct GNUNET_TIME_RelativeNBO validity_duration;
815
816   /**
817    * Challenge signed by the receiving peer.
818    */
819   struct GNUNET_ShortHashCode challenge;
820
821 };
822
823
824 /**
825  * Message send to a peer to respond to a
826  * #GNUNET_MESSAGE_TYPE_ADDRESS_VALIDATION_CHALLENGE
827  */
828 struct TransportValidationResponse
829 {
830
831   /**
832    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE
833    */
834   struct GNUNET_MessageHeader header;
835
836   /**
837    * Zero.
838    */
839   uint32_t reserved GNUNET_PACKED;
840
841   /**
842    * The peer's signature matching the
843    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE purpose.
844    */
845   struct GNUNET_CRYPTO_EddsaSignature signature;
846
847   /**
848    * The challenge that was signed by the receiving peer.
849    */
850   struct GNUNET_ShortHashCode challenge;
851
852   /**
853    * Original timestamp of the sender (was @code{sender_time}),
854    * copied into the reply to allow sender to calculate RTT.
855    */
856   struct GNUNET_TIME_AbsoluteNBO origin_time;
857
858   /**
859    * How long does the sender believe this address to remain
860    * valid?
861    */
862   struct GNUNET_TIME_RelativeNBO validity_duration;
863 };
864
865
866
867 GNUNET_NETWORK_STRUCT_END
868
869
870 /**
871  * What type of client is the `struct TransportClient` about?
872  */
873 enum ClientType
874 {
875   /**
876    * We do not know yet (client is fresh).
877    */
878   CT_NONE = 0,
879
880   /**
881    * Is the CORE service, we need to forward traffic to it.
882    */
883   CT_CORE = 1,
884
885   /**
886    * It is a monitor, forward monitor data.
887    */
888   CT_MONITOR = 2,
889
890   /**
891    * It is a communicator, use for communication.
892    */
893   CT_COMMUNICATOR = 3,
894
895   /**
896    * "Application" telling us where to connect (i.e. TOPOLOGY, DHT or CADET).
897    */
898   CT_APPLICATION = 4
899 };
900
901
902 /**
903  * When did we launch this DV learning activity?
904  */
905 struct LearnLaunchEntry
906 {
907
908   /**
909    * Kept (also) in a DLL sorted by launch time.
910    */
911   struct LearnLaunchEntry *prev;
912
913   /**
914    * Kept (also) in a DLL sorted by launch time.
915    */
916   struct LearnLaunchEntry *next;
917
918   /**
919    * Challenge that uniquely identifies this activity.
920    */
921   struct GNUNET_ShortHashCode challenge;
922
923   /**
924    * When did we transmit the DV learn message (used to calculate RTT) and
925    * determine freshness of paths learned via this operation.
926    */
927   struct GNUNET_TIME_Absolute launch_time;
928
929 };
930
931
932 /**
933  * Entry in our cache of ephemeral keys we currently use.  This way, we only
934  * sign an ephemeral once per @e target, and then can re-use it over multiple
935  * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION messages (as
936  * signing is expensive and in some cases we may use backchannel messages a
937  * lot).
938  */
939 struct EphemeralCacheEntry
940 {
941
942   /**
943    * Target's peer identity (we don't re-use ephemerals
944    * to limit linkability of messages).
945    */
946   struct GNUNET_PeerIdentity target;
947
948   /**
949    * Signature affirming @e ephemeral_key of type
950    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
951    */
952   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
953
954   /**
955    * How long is @e sender_sig valid
956    */
957   struct GNUNET_TIME_Absolute ephemeral_validity;
958
959   /**
960    * Our ephemeral key.
961    */
962   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
963
964   /**
965    * Our private ephemeral key.
966    */
967   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
968
969   /**
970    * Node in the ephemeral cache for this entry.
971    * Used for expiration.
972    */
973   struct GNUNET_CONTAINER_HeapNode *hn;
974 };
975
976
977 /**
978  * Client connected to the transport service.
979  */
980 struct TransportClient;
981
982
983 /**
984  * A neighbour that at least one communicator is connected to.
985  */
986 struct Neighbour;
987
988
989 /**
990  * Entry in our #dv_routes table, representing a (set of) distance
991  * vector routes to a particular peer.
992  */
993 struct DistanceVector;
994
995 /**
996  * One possible hop towards a DV target.
997  */
998 struct DistanceVectorHop
999 {
1000
1001   /**
1002    * Kept in a MDLL, sorted by @e timeout.
1003    */
1004   struct DistanceVectorHop *next_dv;
1005
1006   /**
1007    * Kept in a MDLL, sorted by @e timeout.
1008    */
1009   struct DistanceVectorHop *prev_dv;
1010
1011   /**
1012    * Kept in a MDLL.
1013    */
1014   struct DistanceVectorHop *next_neighbour;
1015
1016   /**
1017    * Kept in a MDLL.
1018    */
1019   struct DistanceVectorHop *prev_neighbour;
1020
1021   /**
1022    * What would be the next hop to @e target?
1023    */
1024   struct Neighbour *next_hop;
1025
1026   /**
1027    * Distance vector entry this hop belongs with.
1028    */
1029   struct DistanceVector *dv;
1030
1031   /**
1032    * Array of @e distance hops to the target, excluding @e next_hop.
1033    * NULL if the entire path is us to @e next_hop to `target`. Allocated
1034    * at the end of this struct.
1035    */
1036   const struct GNUNET_PeerIdentity *path;
1037
1038   /**
1039    * At what time do we forget about this path unless we see it again
1040    * while learning?
1041    */
1042   struct GNUNET_TIME_Absolute timeout;
1043
1044   /**
1045    * After what time do we know for sure that the path must have existed?
1046    * Set to ZERO if the path is learned by snooping on DV learn messages
1047    * initiated by other peers, and to the time at which we generated the
1048    * challenge for DV learn operations this peer initiated.
1049    */
1050   struct GNUNET_TIME_Absolute freshness;
1051
1052   /**
1053    * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
1054    * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
1055    */
1056   unsigned int distance;
1057 };
1058
1059
1060 /**
1061  * Entry in our #dv_routes table, representing a (set of) distance
1062  * vector routes to a particular peer.
1063  */
1064 struct DistanceVector
1065 {
1066
1067   /**
1068    * To which peer is this a route?
1069    */
1070   struct GNUNET_PeerIdentity target;
1071
1072   /**
1073    * Known paths to @e target.
1074    */
1075   struct DistanceVectorHop *dv_head;
1076
1077   /**
1078    * Known paths to @e target.
1079    */
1080   struct DistanceVectorHop *dv_tail;
1081
1082   /**
1083    * Task scheduled to purge expired paths from @e dv_head MDLL.
1084    */
1085   struct GNUNET_SCHEDULER_Task *timeout_task;
1086
1087   /**
1088    * Is one of the DV paths in this struct 'confirmed' and thus
1089    * the cause for CORE to see this peer as connected? (Note that
1090    * the same may apply to a `struct Neighbour` at the same time.)
1091    */
1092   int core_visible;
1093 };
1094
1095
1096 /**
1097  * A queue is a message queue provided by a communicator
1098  * via which we can reach a particular neighbour.
1099  */
1100 struct Queue;
1101
1102
1103 /**
1104  * Entry identifying transmission in one of our `struct
1105  * Queue` which still awaits an ACK.  This is used to
1106  * ensure we do not overwhelm a communicator and limit the number of
1107  * messages outstanding per communicator (say in case communicator is
1108  * CPU bound) and per queue (in case bandwidth allocation exceeds
1109  * what the communicator can actually provide towards a particular
1110  * peer/target).
1111  */
1112 struct QueueEntry
1113 {
1114
1115   /**
1116    * Kept as a DLL.
1117    */
1118   struct QueueEntry *next;
1119
1120   /**
1121    * Kept as a DLL.
1122    */
1123   struct QueueEntry *prev;
1124
1125   /**
1126    * Queue this entry is queued with.
1127    */
1128   struct Queue *queue;
1129
1130   /**
1131    * Message ID used for this message with the queue used for transmission.
1132    */
1133   uint64_t mid;
1134 };
1135
1136
1137 /**
1138  * A queue is a message queue provided by a communicator
1139  * via which we can reach a particular neighbour.
1140  */
1141 struct Queue
1142 {
1143   /**
1144    * Kept in a MDLL.
1145    */
1146   struct Queue *next_neighbour;
1147
1148   /**
1149    * Kept in a MDLL.
1150    */
1151   struct Queue *prev_neighbour;
1152
1153   /**
1154    * Kept in a MDLL.
1155    */
1156   struct Queue *prev_client;
1157
1158   /**
1159    * Kept in a MDLL.
1160    */
1161   struct Queue *next_client;
1162
1163   /**
1164    * Head of DLL of unacked transmission requests.
1165    */
1166   struct QueueEntry *queue_head;
1167
1168   /**
1169    * End of DLL of unacked transmission requests.
1170    */
1171   struct QueueEntry *queue_tail;
1172
1173   /**
1174    * Which neighbour is this queue for?
1175    */
1176   struct Neighbour *neighbour;
1177
1178   /**
1179    * Which communicator offers this queue?
1180    */
1181   struct TransportClient *tc;
1182
1183   /**
1184    * Address served by the queue.
1185    */
1186   const char *address;
1187
1188   /**
1189    * Task scheduled for the time when this queue can (likely) transmit the
1190    * next message. Still needs to check with the @e tracker_out to be sure.
1191    */
1192   struct GNUNET_SCHEDULER_Task *transmit_task;
1193
1194   /**
1195    * Task scheduled to possibly notfiy core that this queue is no longer
1196    * counting as confirmed.  Runs the #core_queue_visibility_check().
1197    */
1198   struct GNUNET_SCHEDULER_Task *visibility_task;
1199
1200   /**
1201    * Our current RTT estimate for this queue.
1202    */
1203   struct GNUNET_TIME_Relative rtt;
1204
1205   /**
1206    * How long do *we* consider this @e address to be valid?  In the past or
1207    * zero if we have not yet validated it.  Can be updated based on
1208    * challenge-response validations (via address validation logic), or when we
1209    * receive ACKs that we can definitively map to transmissions via this
1210    * queue.
1211    */
1212   struct GNUNET_TIME_Absolute validated_until;
1213
1214   /**
1215    * Message ID generator for transmissions on this queue.
1216    */
1217   uint64_t mid_gen;
1218
1219   /**
1220    * Unique identifier of this queue with the communicator.
1221    */
1222   uint32_t qid;
1223
1224   /**
1225    * Maximum transmission unit supported by this queue.
1226    */
1227   uint32_t mtu;
1228
1229   /**
1230    * Distance to the target of this queue.
1231    * FIXME: needed? DV is done differently these days...
1232    */
1233   uint32_t distance;
1234
1235   /**
1236    * Messages pending.
1237    */
1238   uint32_t num_msg_pending;
1239
1240   /**
1241    * Bytes pending.
1242    */
1243   uint32_t num_bytes_pending;
1244
1245   /**
1246    * Length of the DLL starting at @e queue_head.
1247    */
1248   unsigned int queue_length;
1249
1250   /**
1251    * Network type offered by this queue.
1252    */
1253   enum GNUNET_NetworkType nt;
1254
1255   /**
1256    * Connection status for this queue.
1257    */
1258   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1259
1260   /**
1261    * How much outbound bandwidth do we have available for this queue?
1262    */
1263   struct GNUNET_BANDWIDTH_Tracker tracker_out;
1264
1265   /**
1266    * How much inbound bandwidth do we have available for this queue?
1267    */
1268   struct GNUNET_BANDWIDTH_Tracker tracker_in;
1269 };
1270
1271
1272 /**
1273  * Information we keep for a message that we are reassembling.
1274  */
1275 struct ReassemblyContext
1276 {
1277
1278   /**
1279    * Original message ID for of the message that all the
1280    * fragments belong to.
1281    */
1282   struct GNUNET_ShortHashCode msg_uuid;
1283
1284   /**
1285    * Which neighbour is this context for?
1286    */
1287   struct Neighbour *neighbour;
1288
1289   /**
1290    * Entry in the reassembly heap (sorted by expiration).
1291    */
1292   struct GNUNET_CONTAINER_HeapNode *hn;
1293
1294   /**
1295    * Bitfield with @e msg_size bits representing the positions
1296    * where we have received fragments.  When we receive a fragment,
1297    * we check the bits in @e bitfield before incrementing @e msg_missing.
1298    *
1299    * Allocated after the reassembled message.
1300    */
1301   uint8_t *bitfield;
1302
1303   /**
1304    * Task for sending ACK. We may send ACKs either because of hitting
1305    * the @e extra_acks limit, or based on time and @e num_acks.  This
1306    * task is for the latter case.
1307    */
1308   struct GNUNET_SCHEDULER_Task *ack_task;
1309
1310   /**
1311    * At what time will we give up reassembly of this message?
1312    */
1313   struct GNUNET_TIME_Absolute reassembly_timeout;
1314
1315   /**
1316    * Average delay of all acks in @e extra_acks and @e frag_uuid.
1317    * Should be reset to zero when @e num_acks is set to 0.
1318    */
1319   struct GNUNET_TIME_Relative avg_ack_delay;
1320
1321   /**
1322    * Time we received the last fragment.  @e avg_ack_delay must be
1323    * incremented by now - @e last_frag multiplied by @e num_acks.
1324    */
1325   struct GNUNET_TIME_Absolute last_frag;
1326
1327   /**
1328    * Bitfield of up to 64 additional fragments following @e frag_uuid
1329    * to be acknowledged in the next cummulative ACK.
1330    */
1331   uint64_t extra_acks;
1332
1333   /**
1334    * Unique ID of the lowest fragment UUID to be acknowledged in the
1335    * next cummulative ACK.  Only valid if @e num_acks > 0.
1336    */
1337   uint32_t frag_uuid;
1338
1339   /**
1340    * Number of ACKs we have accumulated so far.  Reset to 0
1341    * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
1342    */
1343   unsigned int num_acks;
1344
1345   /**
1346    * How big is the message we are reassembling in total?
1347    */
1348   uint16_t msg_size;
1349
1350   /**
1351    * How many bytes of the message are still missing?  Defragmentation
1352    * is complete when @e msg_missing == 0.
1353    */
1354   uint16_t msg_missing;
1355
1356   /* Followed by @e msg_size bytes of the (partially) defragmented original message */
1357
1358   /* Followed by @e bitfield data */
1359 };
1360
1361
1362 /**
1363  * A neighbour that at least one communicator is connected to.
1364  */
1365 struct Neighbour
1366 {
1367
1368   /**
1369    * Which peer is this about?
1370    */
1371   struct GNUNET_PeerIdentity pid;
1372
1373   /**
1374    * Map with `struct ReassemblyContext` structs for fragments under
1375    * reassembly. May be NULL if we currently have no fragments from
1376    * this @e pid (lazy initialization).
1377    */
1378   struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
1379
1380   /**
1381    * Heap with `struct ReassemblyContext` structs for fragments under
1382    * reassembly. May be NULL if we currently have no fragments from
1383    * this @e pid (lazy initialization).
1384    */
1385   struct GNUNET_CONTAINER_Heap *reassembly_heap;
1386
1387   /**
1388    * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
1389    */
1390   struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
1391
1392   /**
1393    * Head of list of messages pending for this neighbour.
1394    */
1395   struct PendingMessage *pending_msg_head;
1396
1397   /**
1398    * Tail of list of messages pending for this neighbour.
1399    */
1400   struct PendingMessage *pending_msg_tail;
1401
1402   /**
1403    * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1404    * purged if this neighbour goes down.
1405    */
1406   struct DistanceVectorHop *dv_head;
1407
1408   /**
1409    * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1410    * purged if this neighbour goes down.
1411    */
1412   struct DistanceVectorHop *dv_tail;
1413
1414   /**
1415    * Head of DLL of queues to this peer.
1416    */
1417   struct Queue *queue_head;
1418
1419   /**
1420    * Tail of DLL of queues to this peer.
1421    */
1422   struct Queue *queue_tail;
1423
1424   /**
1425    * Task run to cleanup pending messages that have exceeded their timeout.
1426    */
1427   struct GNUNET_SCHEDULER_Task *timeout_task;
1428
1429   /**
1430    * Quota at which CORE is allowed to transmit to this peer.
1431    *
1432    * FIXME: not yet used, tricky to get right given multiple queues!
1433    *        (=> Idea: measure???)
1434    * FIXME: how do we set this value initially when we tell CORE?
1435    *    Options: start at a minimum value or at literally zero?
1436    *         (=> Current thought: clean would be zero!)
1437    */
1438   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1439
1440   /**
1441    * What is the earliest timeout of any message in @e pending_msg_tail?
1442    */
1443   struct GNUNET_TIME_Absolute earliest_timeout;
1444
1445   /**
1446    * Do we have a confirmed working queue and are thus visible to
1447    * CORE?
1448    */
1449   int core_visible;
1450 };
1451
1452
1453 /**
1454  * A peer that an application (client) would like us to talk to directly.
1455  */
1456 struct PeerRequest
1457 {
1458
1459   /**
1460    * Which peer is this about?
1461    */
1462   struct GNUNET_PeerIdentity pid;
1463
1464   /**
1465    * Client responsible for the request.
1466    */
1467   struct TransportClient *tc;
1468
1469   /**
1470    * Handle for watching the peerstore for HELLOs for this peer.
1471    */
1472   struct GNUNET_PEERSTORE_WatchContext *wc;
1473
1474   /**
1475    * What kind of performance preference does this @e tc have?
1476    */
1477   enum GNUNET_MQ_PreferenceKind pk;
1478
1479   /**
1480    * How much bandwidth would this @e tc like to see?
1481    */
1482   struct GNUNET_BANDWIDTH_Value32NBO bw;
1483
1484 };
1485
1486
1487 /**
1488  * Types of different pending messages.
1489  */
1490 enum PendingMessageType
1491 {
1492
1493   /**
1494    * Ordinary message received from the CORE service.
1495    */
1496   PMT_CORE = 0,
1497
1498   /**
1499    * Fragment box.
1500    */
1501   PMT_FRAGMENT_BOX = 1,
1502
1503   /**
1504    * Reliability box.
1505    */
1506   PMT_RELIABILITY_BOX = 2,
1507
1508   /**
1509    * Any type of acknowledgement.
1510    */
1511   PMT_ACKNOWLEDGEMENT = 3,
1512
1513   /**
1514    * Control traffic generated by the TRANSPORT service itself.
1515    */
1516   PMT_CONTROL = 4
1517
1518 };
1519
1520
1521 /**
1522  * Transmission request that is awaiting delivery.  The original
1523  * transmission requests from CORE may be too big for some queues.
1524  * In this case, a *tree* of fragments is created.  At each
1525  * level of the tree, fragments are kept in a DLL ordered by which
1526  * fragment should be sent next (at the head).  The tree is searched
1527  * top-down, with the original message at the root.
1528  *
1529  * To select a node for transmission, first it is checked if the
1530  * current node's message fits with the MTU.  If it does not, we
1531  * either calculate the next fragment (based on @e frag_off) from the
1532  * current node, or, if all fragments have already been created,
1533  * descend to the @e head_frag.  Even though the node was already
1534  * fragmented, the fragment may be too big if the fragment was
1535  * generated for a queue with a larger MTU. In this case, the node
1536  * may be fragmented again, thus creating a tree.
1537  *
1538  * When acknowledgements for fragments are received, the tree
1539  * must be pruned, removing those parts that were already
1540  * acknowledged.  When fragments are sent over a reliable
1541  * channel, they can be immediately removed.
1542  *
1543  * If a message is ever fragmented, then the original "full" message
1544  * is never again transmitted (even if it fits below the MTU), and
1545  * only (remaining) fragments are sent.
1546  */
1547 struct PendingMessage
1548 {
1549   /**
1550    * Kept in a MDLL of messages for this @a target.
1551    */
1552   struct PendingMessage *next_neighbour;
1553
1554   /**
1555    * Kept in a MDLL of messages for this @a target.
1556    */
1557   struct PendingMessage *prev_neighbour;
1558
1559   /**
1560    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1561    */
1562   struct PendingMessage *next_client;
1563
1564   /**
1565    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
1566    */
1567   struct PendingMessage *prev_client;
1568
1569   /**
1570    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1571    */
1572   struct PendingMessage *next_frag;
1573
1574   /**
1575    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
1576    */
1577   struct PendingMessage *prev_frag;
1578
1579   /**
1580    * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1581    */
1582   struct PendingMessage *bpm;
1583
1584   /**
1585    * Target of the request.
1586    */
1587   struct Neighbour *target;
1588
1589   /**
1590    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1591    */
1592   struct TransportClient *client;
1593
1594   /**
1595    * Head of a MDLL of fragments created for this core message.
1596    */
1597   struct PendingMessage *head_frag;
1598
1599   /**
1600    * Tail of a MDLL of fragments created for this core message.
1601    */
1602   struct PendingMessage *tail_frag;
1603
1604   /**
1605    * Our parent in the fragmentation tree.
1606    */
1607   struct PendingMessage *frag_parent;
1608
1609   /**
1610    * At what time should we give up on the transmission (and no longer retry)?
1611    */
1612   struct GNUNET_TIME_Absolute timeout;
1613
1614   /**
1615    * What is the earliest time for us to retry transmission of this message?
1616    */
1617   struct GNUNET_TIME_Absolute next_attempt;
1618
1619   /**
1620    * UUID to use for this message (used for reassembly of fragments, only
1621    * initialized if @e msg_uuid_set is #GNUNET_YES).
1622    */
1623   struct GNUNET_ShortHashCode msg_uuid;
1624
1625   /**
1626    * Counter incremented per generated fragment.
1627    */
1628   uint32_t frag_uuidgen;
1629
1630   /**
1631    * Type of the pending message.
1632    */
1633   enum PendingMessageType pmt;
1634
1635   /**
1636    * Size of the original message.
1637    */
1638   uint16_t bytes_msg;
1639
1640   /**
1641    * Offset at which we should generate the next fragment.
1642    */
1643   uint16_t frag_off;
1644
1645   /**
1646    * #GNUNET_YES once @e msg_uuid was initialized
1647    */
1648   int16_t msg_uuid_set;
1649
1650   /* Followed by @e bytes_msg to transmit */
1651 };
1652
1653
1654 /**
1655  * One of the addresses of this peer.
1656  */
1657 struct AddressListEntry
1658 {
1659
1660   /**
1661    * Kept in a DLL.
1662    */
1663   struct AddressListEntry *next;
1664
1665   /**
1666    * Kept in a DLL.
1667    */
1668   struct AddressListEntry *prev;
1669
1670   /**
1671    * Which communicator provides this address?
1672    */
1673   struct TransportClient *tc;
1674
1675   /**
1676    * The actual address.
1677    */
1678   const char *address;
1679
1680   /**
1681    * Current context for storing this address in the peerstore.
1682    */
1683   struct GNUNET_PEERSTORE_StoreContext *sc;
1684
1685   /**
1686    * Task to periodically do @e st operation.
1687    */
1688   struct GNUNET_SCHEDULER_Task *st;
1689
1690   /**
1691    * What is a typical lifetime the communicator expects this
1692    * address to have? (Always from now.)
1693    */
1694   struct GNUNET_TIME_Relative expiration;
1695
1696   /**
1697    * Address identifier used by the communicator.
1698    */
1699   uint32_t aid;
1700
1701   /**
1702    * Network type offered by this address.
1703    */
1704   enum GNUNET_NetworkType nt;
1705
1706 };
1707
1708
1709 /**
1710  * Client connected to the transport service.
1711  */
1712 struct TransportClient
1713 {
1714
1715   /**
1716    * Kept in a DLL.
1717    */
1718   struct TransportClient *next;
1719
1720   /**
1721    * Kept in a DLL.
1722    */
1723   struct TransportClient *prev;
1724
1725   /**
1726    * Handle to the client.
1727    */
1728   struct GNUNET_SERVICE_Client *client;
1729
1730   /**
1731    * Message queue to the client.
1732    */
1733   struct GNUNET_MQ_Handle *mq;
1734
1735   /**
1736    * What type of client is this?
1737    */
1738   enum ClientType type;
1739
1740   union
1741   {
1742
1743     /**
1744      * Information for @e type #CT_CORE.
1745      */
1746     struct {
1747
1748       /**
1749        * Head of list of messages pending for this client, sorted by
1750        * transmission time ("next_attempt" + possibly internal prioritization).
1751        */
1752       struct PendingMessage *pending_msg_head;
1753
1754       /**
1755        * Tail of list of messages pending for this client.
1756        */
1757       struct PendingMessage *pending_msg_tail;
1758
1759     } core;
1760
1761     /**
1762      * Information for @e type #CT_MONITOR.
1763      */
1764     struct {
1765
1766       /**
1767        * Peer identity to monitor the addresses of.
1768        * Zero to monitor all neighbours.  Valid if
1769        * @e type is #CT_MONITOR.
1770        */
1771       struct GNUNET_PeerIdentity peer;
1772
1773       /**
1774        * Is this a one-shot monitor?
1775        */
1776       int one_shot;
1777
1778     } monitor;
1779
1780
1781     /**
1782      * Information for @e type #CT_COMMUNICATOR.
1783      */
1784     struct {
1785       /**
1786        * If @e type is #CT_COMMUNICATOR, this communicator
1787        * supports communicating using these addresses.
1788        */
1789       char *address_prefix;
1790
1791       /**
1792        * Head of DLL of queues offered by this communicator.
1793        */
1794       struct Queue *queue_head;
1795
1796       /**
1797        * Tail of DLL of queues offered by this communicator.
1798        */
1799       struct Queue *queue_tail;
1800
1801       /**
1802        * Head of list of the addresses of this peer offered by this communicator.
1803        */
1804       struct AddressListEntry *addr_head;
1805
1806       /**
1807        * Tail of list of the addresses of this peer offered by this communicator.
1808        */
1809       struct AddressListEntry *addr_tail;
1810
1811       /**
1812        * Number of queue entries in all queues to this communicator. Used
1813        * throttle sending to a communicator if we see that the communicator
1814        * is globally unable to keep up.
1815        */
1816       unsigned int total_queue_length;
1817
1818       /**
1819        * Characteristics of this communicator.
1820        */
1821       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1822
1823     } communicator;
1824
1825     /**
1826      * Information for @e type #CT_APPLICATION
1827      */
1828     struct {
1829
1830       /**
1831        * Map of requests for peers the given client application would like to
1832        * see connections for.  Maps from PIDs to `struct PeerRequest`.
1833        */
1834       struct GNUNET_CONTAINER_MultiPeerMap *requests;
1835
1836     } application;
1837
1838   } details;
1839
1840 };
1841
1842
1843 /**
1844  * State we keep for validation activities.  Each of these
1845  * is both in the #validation_heap and the #validation_map.
1846  */
1847 struct ValidationState
1848 {
1849
1850   /**
1851    * For which peer is @a address to be validated (or possibly valid)?
1852    * Serves as key in the #validation_map.
1853    */
1854   struct GNUNET_PeerIdentity pid;
1855
1856   /**
1857    * How long did the peer claim this @e address to be valid? Capped at
1858    * minimum of #MAX_ADDRESS_VALID_UNTIL relative to the time where we last
1859    * were told about the address and the value claimed by the other peer at
1860    * that time.  May be updated similarly when validation succeeds.
1861    */
1862   struct GNUNET_TIME_Absolute valid_until;
1863
1864   /**
1865    * How long do *we* consider this @e address to be valid?
1866    * In the past or zero if we have not yet validated it.
1867    */
1868   struct GNUNET_TIME_Absolute validated_until;
1869
1870   /**
1871    * When did we FIRST use the current @e challenge in a message?
1872    * Used to sanity-check @code{origin_time} in the response when
1873    * calculating the RTT. If the @code{origin_time} is not in
1874    * the expected range, the response is discarded as malicious.
1875    */
1876   struct GNUNET_TIME_Absolute first_challenge_use;
1877
1878   /**
1879    * When did we LAST use the current @e challenge in a message?
1880    * Used to sanity-check @code{origin_time} in the response when
1881    * calculating the RTT.  If the @code{origin_time} is not in
1882    * the expected range, the response is discarded as malicious.
1883    */
1884   struct GNUNET_TIME_Absolute last_challenge_use;
1885
1886   /**
1887    * Next time we will send the @e challenge to the peer, if this time is past
1888    * @e valid_until, this validation state is released at this time.  If the
1889    * address is valid, @e next_challenge is set to @e validated_until MINUS @e
1890    * validation_delay * #VALIDATION_RTT_BUFFER_FACTOR, such that we will try
1891    * to re-validate before the validity actually expires.
1892    */
1893   struct GNUNET_TIME_Absolute next_challenge;
1894
1895   /**
1896    * Current backoff factor we're applying for sending the @a challenge.
1897    * Reset to 0 if the @a challenge is confirmed upon validation.
1898    * Reduced to minimum of #FAST_VALIDATION_CHALLENGE_FREQ and half of the
1899    * existing value if we receive an unvalidated address again over
1900    * another channel (and thus should consider the information "fresh").
1901    * Maximum is #MAX_VALIDATION_CHALLENGE_FREQ.
1902    */
1903   struct GNUNET_TIME_Relative challenge_backoff;
1904
1905   /**
1906    * Initially set to "forever". Once @e validated_until is set, this value is
1907    * set to the RTT that tells us how long it took to receive the validation.
1908    */
1909   struct GNUNET_TIME_Relative validation_rtt;
1910
1911   /**
1912    * The challenge we sent to the peer to get it to validate the address. Note
1913    * that we rotate the challenge whenever we update @e validated_until to
1914    * avoid attacks where a peer simply replays an old challenge in the future.
1915    * (We must not rotate more often as otherwise we may discard valid answers
1916    * due to packet losses, latency and reorderings on the network).
1917    */
1918   struct GNUNET_ShortHashCode challenge;
1919
1920   /**
1921    * Claimed address of the peer.
1922    */
1923   char *address;
1924
1925   /**
1926    * Entry in the #validation_heap, which is sorted by @e next_challenge. The
1927    * heap is used to figure out when the next validation activity should be
1928    * run.
1929    */
1930   struct GNUNET_CONTAINER_HeapNode *hn;
1931
1932   /**
1933    * Handle to a PEERSTORE store operation for this @e address.  NULL if
1934    * no PEERSTORE operation is pending.
1935    */
1936   struct GNUNET_PEERSTORE_StoreContext *sc;
1937
1938   /**
1939    * We are technically ready to send the challenge, but we are waiting for
1940    * the respective queue to become available for transmission.
1941    */
1942   int awaiting_queue;
1943
1944 };
1945
1946
1947 /**
1948  * Head of linked list of all clients to this service.
1949  */
1950 static struct TransportClient *clients_head;
1951
1952 /**
1953  * Tail of linked list of all clients to this service.
1954  */
1955 static struct TransportClient *clients_tail;
1956
1957 /**
1958  * Statistics handle.
1959  */
1960 static struct GNUNET_STATISTICS_Handle *GST_stats;
1961
1962 /**
1963  * Configuration handle.
1964  */
1965 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1966
1967 /**
1968  * Our public key.
1969  */
1970 static struct GNUNET_PeerIdentity GST_my_identity;
1971
1972 /**
1973  * Our private key.
1974  */
1975 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1976
1977 /**
1978  * Map from PIDs to `struct Neighbour` entries.  A peer is
1979  * a neighbour if we have an MQ to it from some communicator.
1980  */
1981 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1982
1983 /**
1984  * Map from PIDs to `struct DistanceVector` entries describing
1985  * known paths to the peer.
1986  */
1987 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1988
1989 /**
1990  * Map from PIDs to `struct ValidationState` entries describing
1991  * addresses we are aware of and their validity state.
1992  */
1993 static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
1994
1995 /**
1996  * Map from challenges to `struct LearnLaunchEntry` values.
1997  */
1998 static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map;
1999
2000 /**
2001  * Head of a DLL sorted by launch time.
2002  */
2003 static struct LearnLaunchEntry *lle_head;
2004
2005 /**
2006  * Tail of a DLL sorted by launch time.
2007  */
2008 static struct LearnLaunchEntry *lle_tail;
2009
2010 /**
2011  * MIN Heap sorted by "next_challenge" to `struct ValidationState` entries
2012  * sorting addresses we are aware of by when we should next try to (re)validate
2013  * (or expire) them.
2014  */
2015 static struct GNUNET_CONTAINER_Heap *validation_heap;
2016
2017 /**
2018  * Database for peer's HELLOs.
2019  */
2020 static struct GNUNET_PEERSTORE_Handle *peerstore;
2021
2022 /**
2023  * Heap sorting `struct EphemeralCacheEntry` by their
2024  * key/signature validity.
2025  */
2026 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
2027
2028 /**
2029  * Hash map for looking up `struct EphemeralCacheEntry`s
2030  * by peer identity. (We may have ephemerals in our
2031  * cache for which we do not have a neighbour entry,
2032  * and similar many neighbours may not need ephemerals,
2033  * so we use a second map.)
2034  */
2035 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
2036
2037 /**
2038  * Task to free expired ephemerals.
2039  */
2040 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
2041
2042 /**
2043  * Task run to initiate DV learning.
2044  */
2045 static struct GNUNET_SCHEDULER_Task *dvlearn_task;
2046
2047 /**
2048  * Task to run address validation.
2049  */
2050 static struct GNUNET_SCHEDULER_Task *validation_task;
2051
2052
2053 /**
2054  * Free cached ephemeral key.
2055  *
2056  * @param ece cached signature to free
2057  */
2058 static void
2059 free_ephemeral (struct EphemeralCacheEntry *ece)
2060 {
2061   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
2062                                         &ece->target,
2063                                         ece);
2064   GNUNET_CONTAINER_heap_remove_node (ece->hn);
2065   GNUNET_free (ece);
2066 }
2067
2068
2069 /**
2070  * Free validation state.
2071  *
2072  * @param vs validation state to free
2073  */
2074 static void
2075 free_validation_state (struct ValidationState *vs)
2076 {
2077   GNUNET_CONTAINER_multipeermap_remove (validation_map,
2078                                         &vs->pid,
2079                                         vs);
2080   GNUNET_CONTAINER_heap_remove_node (vs->hn);
2081   vs->hn = NULL;
2082   if (NULL != vs->sc)
2083   {
2084     GNUNET_PEERSTORE_store_cancel (vs->sc);
2085     vs->sc = NULL;
2086   }
2087   GNUNET_free (vs->address);
2088   GNUNET_free (vs);
2089 }
2090
2091
2092 /**
2093  * Lookup neighbour record for peer @a pid.
2094  *
2095  * @param pid neighbour to look for
2096  * @return NULL if we do not have this peer as a neighbour
2097  */
2098 static struct Neighbour *
2099 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
2100 {
2101   return GNUNET_CONTAINER_multipeermap_get (neighbours,
2102                                             pid);
2103 }
2104
2105
2106 /**
2107  * Details about what to notify monitors about.
2108  */
2109 struct MonitorEvent
2110 {
2111   /**
2112    * @deprecated To be discussed if we keep these...
2113    */
2114   struct GNUNET_TIME_Absolute last_validation;
2115   struct GNUNET_TIME_Absolute valid_until;
2116   struct GNUNET_TIME_Absolute next_validation;
2117
2118   /**
2119    * Current round-trip time estimate.
2120    */
2121   struct GNUNET_TIME_Relative rtt;
2122
2123   /**
2124    * Connection status.
2125    */
2126   enum GNUNET_TRANSPORT_ConnectionStatus cs;
2127
2128   /**
2129    * Messages pending.
2130    */
2131   uint32_t num_msg_pending;
2132
2133   /**
2134    * Bytes pending.
2135    */
2136   uint32_t num_bytes_pending;
2137
2138
2139 };
2140
2141
2142 /**
2143  * Free a @dvh. Callers MAY want to check if this was the last path to the
2144  * `target`, and if so call #free_dv_route to also free the associated DV
2145  * entry in #dv_routes (if not, the associated scheduler job should eventually
2146  * take care of it).
2147  *
2148  * @param dvh hop to free
2149  */
2150 static void
2151 free_distance_vector_hop (struct DistanceVectorHop *dvh)
2152 {
2153   struct Neighbour *n = dvh->next_hop;
2154   struct DistanceVector *dv = dvh->dv;
2155
2156   GNUNET_CONTAINER_MDLL_remove (neighbour,
2157                                 n->dv_head,
2158                                 n->dv_tail,
2159                                 dvh);
2160   GNUNET_CONTAINER_MDLL_remove (dv,
2161                                 dv->dv_head,
2162                                 dv->dv_tail,
2163                                 dvh);
2164   GNUNET_free (dvh);
2165 }
2166
2167
2168 /**
2169  * Free entry in #dv_routes.  First frees all hops to the target, and
2170  * if there are no entries left, frees @a dv as well.
2171  *
2172  * @param dv route to free
2173  */
2174 static void
2175 free_dv_route (struct DistanceVector *dv)
2176 {
2177   struct DistanceVectorHop *dvh;
2178
2179   while (NULL != (dvh = dv->dv_head))
2180     free_distance_vector_hop (dvh);
2181   if (NULL == dv->dv_head)
2182   {
2183     GNUNET_assert (GNUNET_YES ==
2184                    GNUNET_CONTAINER_multipeermap_remove (dv_routes,
2185                                                          &dv->target,
2186                                                          dv));
2187     if (NULL != dv->timeout_task)
2188       GNUNET_SCHEDULER_cancel (dv->timeout_task);
2189     GNUNET_free (dv);
2190   }
2191 }
2192
2193
2194 /**
2195  * Notify monitor @a tc about an event.  That @a tc
2196  * cares about the event has already been checked.
2197  *
2198  * Send @a tc information in @a me about a @a peer's status with
2199  * respect to some @a address to all monitors that care.
2200  *
2201  * @param tc monitor to inform
2202  * @param peer peer the information is about
2203  * @param address address the information is about
2204  * @param nt network type associated with @a address
2205  * @param me detailed information to transmit
2206  */
2207 static void
2208 notify_monitor (struct TransportClient *tc,
2209                 const struct GNUNET_PeerIdentity *peer,
2210                 const char *address,
2211                 enum GNUNET_NetworkType nt,
2212                 const struct MonitorEvent *me)
2213 {
2214   struct GNUNET_MQ_Envelope *env;
2215   struct GNUNET_TRANSPORT_MonitorData *md;
2216   size_t addr_len = strlen (address) + 1;
2217
2218   env = GNUNET_MQ_msg_extra (md,
2219                              addr_len,
2220                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
2221   md->nt = htonl ((uint32_t) nt);
2222   md->peer = *peer;
2223   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
2224   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
2225   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
2226   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
2227   md->cs = htonl ((uint32_t) me->cs);
2228   md->num_msg_pending = htonl (me->num_msg_pending);
2229   md->num_bytes_pending = htonl (me->num_bytes_pending);
2230   memcpy (&md[1],
2231           address,
2232           addr_len);
2233   GNUNET_MQ_send (tc->mq,
2234                   env);
2235 }
2236
2237
2238 /**
2239  * Send information in @a me about a @a peer's status with respect
2240  * to some @a address to all monitors that care.
2241  *
2242  * @param peer peer the information is about
2243  * @param address address the information is about
2244  * @param nt network type associated with @a address
2245  * @param me detailed information to transmit
2246  */
2247 static void
2248 notify_monitors (const struct GNUNET_PeerIdentity *peer,
2249                  const char *address,
2250                  enum GNUNET_NetworkType nt,
2251                  const struct MonitorEvent *me)
2252 {
2253   for (struct TransportClient *tc = clients_head;
2254        NULL != tc;
2255        tc = tc->next)
2256   {
2257     if (CT_MONITOR != tc->type)
2258       continue;
2259     if (tc->details.monitor.one_shot)
2260       continue;
2261     if ( (0 != GNUNET_is_zero (&tc->details.monitor.peer)) &&
2262          (0 != GNUNET_memcmp (&tc->details.monitor.peer,
2263                               peer)) )
2264       continue;
2265     notify_monitor (tc,
2266                     peer,
2267                     address,
2268                     nt,
2269                     me);
2270   }
2271 }
2272
2273
2274 /**
2275  * Called whenever a client connects.  Allocates our
2276  * data structures associated with that client.
2277  *
2278  * @param cls closure, NULL
2279  * @param client identification of the client
2280  * @param mq message queue for the client
2281  * @return our `struct TransportClient`
2282  */
2283 static void *
2284 client_connect_cb (void *cls,
2285                    struct GNUNET_SERVICE_Client *client,
2286                    struct GNUNET_MQ_Handle *mq)
2287 {
2288   struct TransportClient *tc;
2289
2290   (void) cls;
2291   tc = GNUNET_new (struct TransportClient);
2292   tc->client = client;
2293   tc->mq = mq;
2294   GNUNET_CONTAINER_DLL_insert (clients_head,
2295                                clients_tail,
2296                                tc);
2297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2298               "Client %p connected\n",
2299               tc);
2300   return tc;
2301 }
2302
2303
2304 /**
2305  * Free @a rc
2306  *
2307  * @param rc data structure to free
2308  */
2309 static void
2310 free_reassembly_context (struct ReassemblyContext *rc)
2311 {
2312   struct Neighbour *n = rc->neighbour;
2313
2314   GNUNET_assert (rc ==
2315                  GNUNET_CONTAINER_heap_remove_node (rc->hn));
2316   GNUNET_assert (GNUNET_OK ==
2317                  GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
2318                                                         &rc->msg_uuid,
2319                                                         rc));
2320   GNUNET_free (rc);
2321 }
2322
2323
2324 /**
2325  * Task run to clean up reassembly context of a neighbour that have expired.
2326  *
2327  * @param cls a `struct Neighbour`
2328  */
2329 static void
2330 reassembly_cleanup_task (void *cls)
2331 {
2332   struct Neighbour *n = cls;
2333   struct ReassemblyContext *rc;
2334
2335   n->reassembly_timeout_task = NULL;
2336   while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
2337   {
2338     if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
2339     {
2340       free_reassembly_context (rc);
2341       continue;
2342     }
2343     GNUNET_assert (NULL == n->reassembly_timeout_task);
2344     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
2345                                                           &reassembly_cleanup_task,
2346                                                           n);
2347     return;
2348   }
2349 }
2350
2351
2352 /**
2353  * function called to #free_reassembly_context().
2354  *
2355  * @param cls NULL
2356  * @param key unused
2357  * @param value a `struct ReassemblyContext` to free
2358  * @return #GNUNET_OK (continue iteration)
2359  */
2360 static int
2361 free_reassembly_cb (void *cls,
2362                     const struct GNUNET_ShortHashCode *key,
2363                     void *value)
2364 {
2365   struct ReassemblyContext *rc = value;
2366   (void) cls;
2367   (void) key;
2368
2369   free_reassembly_context (rc);
2370   return GNUNET_OK;
2371 }
2372
2373
2374 /**
2375  * Release memory used by @a neighbour.
2376  *
2377  * @param neighbour neighbour entry to free
2378  */
2379 static void
2380 free_neighbour (struct Neighbour *neighbour)
2381 {
2382   struct DistanceVectorHop *dvh;
2383
2384   GNUNET_assert (NULL == neighbour->queue_head);
2385   GNUNET_assert (GNUNET_YES ==
2386                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
2387                                                        &neighbour->pid,
2388                                                        neighbour));
2389   if (NULL != neighbour->timeout_task)
2390     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
2391   if (NULL != neighbour->reassembly_map)
2392   {
2393     GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
2394                                             &free_reassembly_cb,
2395                                             NULL);
2396     GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
2397     neighbour->reassembly_map = NULL;
2398     GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
2399     neighbour->reassembly_heap = NULL;
2400   }
2401   while (NULL != (dvh = neighbour->dv_head))
2402   {
2403     struct DistanceVector *dv = dvh->dv;
2404
2405     free_distance_vector_hop (dvh);
2406     if (NULL == dv->dv_head)
2407       free_dv_route (dv);
2408   }
2409   if (NULL != neighbour->reassembly_timeout_task)
2410     GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
2411   GNUNET_free (neighbour);
2412 }
2413
2414
2415 /**
2416  * Send message to CORE clients that we lost a connection.
2417  *
2418  * @param tc client to inform (must be CORE client)
2419  * @param pid peer the connection is for
2420  * @param quota_out current quota for the peer
2421  */
2422 static void
2423 core_send_connect_info (struct TransportClient *tc,
2424                         const struct GNUNET_PeerIdentity *pid,
2425                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2426 {
2427   struct GNUNET_MQ_Envelope *env;
2428   struct ConnectInfoMessage *cim;
2429
2430   GNUNET_assert (CT_CORE == tc->type);
2431   env = GNUNET_MQ_msg (cim,
2432                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
2433   cim->quota_out = quota_out;
2434   cim->id = *pid;
2435   GNUNET_MQ_send (tc->mq,
2436                   env);
2437 }
2438
2439
2440 /**
2441  * Send message to CORE clients that we gained a connection
2442  *
2443  * @param pid peer the queue was for
2444  * @param quota_out current quota for the peer
2445  */
2446 static void
2447 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
2448                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
2449 {
2450   for (struct TransportClient *tc = clients_head;
2451        NULL != tc;
2452        tc = tc->next)
2453   {
2454     if (CT_CORE != tc->type)
2455       continue;
2456     core_send_connect_info (tc,
2457                             pid,
2458                             quota_out);
2459   }
2460 }
2461
2462
2463 /**
2464  * Send message to CORE clients that we lost a connection.
2465  *
2466  * @param pid peer the connection was for
2467  */
2468 static void
2469 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
2470 {
2471   for (struct TransportClient *tc = clients_head;
2472        NULL != tc;
2473        tc = tc->next)
2474   {
2475     struct GNUNET_MQ_Envelope *env;
2476     struct DisconnectInfoMessage *dim;
2477
2478     if (CT_CORE != tc->type)
2479       continue;
2480     env = GNUNET_MQ_msg (dim,
2481                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
2482     dim->peer = *pid;
2483     GNUNET_MQ_send (tc->mq,
2484                     env);
2485   }
2486 }
2487
2488
2489 /**
2490  * We believe we are ready to transmit a message on a queue. Double-checks
2491  * with the queue's "tracker_out" and then gives the message to the
2492  * communicator for transmission (updating the tracker, and re-scheduling
2493  * itself if applicable).
2494  *
2495  * @param cls the `struct Queue` to process transmissions for
2496  */
2497 static void
2498 transmit_on_queue (void *cls);
2499
2500
2501 /**
2502  * Schedule next run of #transmit_on_queue().  Does NOTHING if
2503  * we should run immediately or if the message queue is empty.
2504  * Test for no task being added AND queue not being empty to
2505  * transmit immediately afterwards!  This function must only
2506  * be called if the message queue is non-empty!
2507  *
2508  * @param queue the queue to do scheduling for
2509  */
2510 static void
2511 schedule_transmit_on_queue (struct Queue *queue)
2512 {
2513   struct Neighbour *n = queue->neighbour;
2514   struct PendingMessage *pm = n->pending_msg_head;
2515   struct GNUNET_TIME_Relative out_delay;
2516   unsigned int wsize;
2517
2518   GNUNET_assert (NULL != pm);
2519   if (queue->tc->details.communicator.total_queue_length >=
2520       COMMUNICATOR_TOTAL_QUEUE_LIMIT)
2521   {
2522     GNUNET_STATISTICS_update (GST_stats,
2523                               "# Transmission throttled due to communicator queue limit",
2524                               1,
2525                               GNUNET_NO);
2526     return;
2527   }
2528   if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
2529   {
2530     GNUNET_STATISTICS_update (GST_stats,
2531                               "# Transmission throttled due to queue queue limit",
2532                               1,
2533                               GNUNET_NO);
2534     return;
2535   }
2536
2537   wsize = (0 == queue->mtu)
2538     ? pm->bytes_msg /* FIXME: add overheads? */
2539     : queue->mtu;
2540   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
2541                                                   wsize);
2542   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
2543                                         out_delay);
2544   if (0 == out_delay.rel_value_us)
2545     return; /* we should run immediately! */
2546   /* queue has changed since we were scheduled, reschedule again */
2547   queue->transmit_task
2548     = GNUNET_SCHEDULER_add_delayed (out_delay,
2549                                     &transmit_on_queue,
2550                                     queue);
2551   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
2552     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2553                 "Next transmission on queue `%s' in %s (high delay)\n",
2554                 queue->address,
2555                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2556                                                         GNUNET_YES));
2557   else
2558     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2559                 "Next transmission on queue `%s' in %s\n",
2560                 queue->address,
2561                 GNUNET_STRINGS_relative_time_to_string (out_delay,
2562                                                         GNUNET_YES));
2563 }
2564
2565
2566 /**
2567  * Check whether the CORE visibility of @a n changed. If so,
2568  * check whether we need to notify CORE.
2569  *
2570  * @param n neighbour to perform the check for
2571  */
2572 static void
2573 update_neighbour_core_visibility (struct Neighbour *n);
2574
2575
2576 /**
2577  * Free @a queue.
2578  *
2579  * @param queue the queue to free
2580  */
2581 static void
2582 free_queue (struct Queue *queue)
2583 {
2584   struct Neighbour *neighbour = queue->neighbour;
2585   struct TransportClient *tc = queue->tc;
2586   struct MonitorEvent me = {
2587     .cs = GNUNET_TRANSPORT_CS_DOWN,
2588     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
2589   };
2590   struct QueueEntry *qe;
2591   int maxxed;
2592
2593   if (NULL != queue->transmit_task)
2594   {
2595     GNUNET_SCHEDULER_cancel (queue->transmit_task);
2596     queue->transmit_task = NULL;
2597   }
2598   if (NULL != queue->visibility_task)
2599   {
2600     GNUNET_SCHEDULER_cancel (queue->visibility_task);
2601     queue->visibility_task = NULL;
2602   }
2603   GNUNET_CONTAINER_MDLL_remove (neighbour,
2604                                 neighbour->queue_head,
2605                                 neighbour->queue_tail,
2606                                 queue);
2607   GNUNET_CONTAINER_MDLL_remove (client,
2608                                 tc->details.communicator.queue_head,
2609                                 tc->details.communicator.queue_tail,
2610                                 queue);
2611   maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
2612   while (NULL != (qe = queue->queue_head))
2613   {
2614     GNUNET_CONTAINER_DLL_remove (queue->queue_head,
2615                                  queue->queue_tail,
2616                                  qe);
2617     queue->queue_length--;
2618     tc->details.communicator.total_queue_length--;
2619     GNUNET_free (qe);
2620   }
2621   GNUNET_assert (0 == queue->queue_length);
2622   if ( (maxxed) &&
2623        (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2624   {
2625     /* Communicator dropped below threshold, resume all queues */
2626     GNUNET_STATISTICS_update (GST_stats,
2627                               "# Transmission throttled due to communicator queue limit",
2628                               -1,
2629                               GNUNET_NO);
2630     for (struct Queue *s = tc->details.communicator.queue_head;
2631          NULL != s;
2632          s = s->next_client)
2633       schedule_transmit_on_queue (s);
2634   }
2635   notify_monitors (&neighbour->pid,
2636                    queue->address,
2637                    queue->nt,
2638                    &me);
2639   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2640   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2641   GNUNET_free (queue);
2642
2643   update_neighbour_core_visibility (neighbour);
2644     cores_send_disconnect_info (&neighbour->pid);
2645
2646   if (NULL == neighbour->queue_head)
2647   {
2648     free_neighbour (neighbour);
2649   }
2650 }
2651
2652
2653 /**
2654  * Free @a ale
2655  *
2656  * @param ale address list entry to free
2657  */
2658 static void
2659 free_address_list_entry (struct AddressListEntry *ale)
2660 {
2661   struct TransportClient *tc = ale->tc;
2662
2663   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2664                                tc->details.communicator.addr_tail,
2665                                ale);
2666   if (NULL != ale->sc)
2667   {
2668     GNUNET_PEERSTORE_store_cancel (ale->sc);
2669     ale->sc = NULL;
2670   }
2671   if (NULL != ale->st)
2672   {
2673     GNUNET_SCHEDULER_cancel (ale->st);
2674     ale->st = NULL;
2675   }
2676   GNUNET_free (ale);
2677 }
2678
2679
2680 /**
2681  * Stop the peer request in @a value.
2682  *
2683  * @param cls a `struct TransportClient` that no longer makes the request
2684  * @param pid the peer's identity
2685  * @param value a `struct PeerRequest`
2686  * @return #GNUNET_YES (always)
2687  */
2688 static int
2689 stop_peer_request (void *cls,
2690                    const struct GNUNET_PeerIdentity *pid,
2691                    void *value)
2692 {
2693   struct TransportClient *tc = cls;
2694   struct PeerRequest *pr = value;
2695
2696   GNUNET_PEERSTORE_watch_cancel (pr->wc);
2697   GNUNET_assert (GNUNET_YES ==
2698                  GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
2699                                                        pid,
2700                                                        pr));
2701   GNUNET_free (pr);
2702
2703   return GNUNET_OK;
2704 }
2705
2706
2707 /**
2708  * Called whenever a client is disconnected.  Frees our
2709  * resources associated with that client.
2710  *
2711  * @param cls closure, NULL
2712  * @param client identification of the client
2713  * @param app_ctx our `struct TransportClient`
2714  */
2715 static void
2716 client_disconnect_cb (void *cls,
2717                       struct GNUNET_SERVICE_Client *client,
2718                       void *app_ctx)
2719 {
2720   struct TransportClient *tc = app_ctx;
2721
2722   (void) cls;
2723   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2724               "Client %p disconnected, cleaning up.\n",
2725               tc);
2726   GNUNET_CONTAINER_DLL_remove (clients_head,
2727                                clients_tail,
2728                                tc);
2729   switch (tc->type)
2730   {
2731   case CT_NONE:
2732     break;
2733   case CT_CORE:
2734     {
2735       struct PendingMessage *pm;
2736
2737       while (NULL != (pm = tc->details.core.pending_msg_head))
2738       {
2739         GNUNET_CONTAINER_MDLL_remove (client,
2740                                       tc->details.core.pending_msg_head,
2741                                       tc->details.core.pending_msg_tail,
2742                                       pm);
2743         pm->client = NULL;
2744       }
2745     }
2746     break;
2747   case CT_MONITOR:
2748     break;
2749   case CT_COMMUNICATOR:
2750     {
2751       struct Queue *q;
2752       struct AddressListEntry *ale;
2753
2754       while (NULL != (q = tc->details.communicator.queue_head))
2755         free_queue (q);
2756       while (NULL != (ale = tc->details.communicator.addr_head))
2757         free_address_list_entry (ale);
2758       GNUNET_free (tc->details.communicator.address_prefix);
2759     }
2760     break;
2761   case CT_APPLICATION:
2762     GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
2763                                            &stop_peer_request,
2764                                            tc);
2765     GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
2766     break;
2767   }
2768   GNUNET_free (tc);
2769 }
2770
2771
2772 /**
2773  * Iterator telling new CORE client about all existing
2774  * connections to peers.
2775  *
2776  * @param cls the new `struct TransportClient`
2777  * @param pid a connected peer
2778  * @param value the `struct Neighbour` with more information
2779  * @return #GNUNET_OK (continue to iterate)
2780  */
2781 static int
2782 notify_client_connect_info (void *cls,
2783                             const struct GNUNET_PeerIdentity *pid,
2784                             void *value)
2785 {
2786   struct TransportClient *tc = cls;
2787   struct Neighbour *neighbour = value;
2788
2789   core_send_connect_info (tc,
2790                           pid,
2791                           neighbour->quota_out);
2792   return GNUNET_OK;
2793 }
2794
2795
2796 /**
2797  * Initialize a "CORE" client.  We got a start message from this
2798  * client, so add it to the list of clients for broadcasting of
2799  * inbound messages.
2800  *
2801  * @param cls the client
2802  * @param start the start message that was sent
2803  */
2804 static void
2805 handle_client_start (void *cls,
2806                      const struct StartMessage *start)
2807 {
2808   struct TransportClient *tc = cls;
2809   uint32_t options;
2810
2811   options = ntohl (start->options);
2812   if ( (0 != (1 & options)) &&
2813        (0 !=
2814         GNUNET_memcmp (&start->self,
2815                        &GST_my_identity)) )
2816   {
2817     /* client thinks this is a different peer, reject */
2818     GNUNET_break (0);
2819     GNUNET_SERVICE_client_drop (tc->client);
2820     return;
2821   }
2822   if (CT_NONE != tc->type)
2823   {
2824     GNUNET_break (0);
2825     GNUNET_SERVICE_client_drop (tc->client);
2826     return;
2827   }
2828   tc->type = CT_CORE;
2829   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2830                                          &notify_client_connect_info,
2831                                          tc);
2832   GNUNET_SERVICE_client_continue (tc->client);
2833 }
2834
2835
2836 /**
2837  * Client asked for transmission to a peer.  Process the request.
2838  *
2839  * @param cls the client
2840  * @param obm the send message that was sent
2841  */
2842 static int
2843 check_client_send (void *cls,
2844                    const struct OutboundMessage *obm)
2845 {
2846   struct TransportClient *tc = cls;
2847   uint16_t size;
2848   const struct GNUNET_MessageHeader *obmm;
2849
2850   if (CT_CORE != tc->type)
2851   {
2852     GNUNET_break (0);
2853     return GNUNET_SYSERR;
2854   }
2855   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2856   if (size < sizeof (struct GNUNET_MessageHeader))
2857   {
2858     GNUNET_break (0);
2859     return GNUNET_SYSERR;
2860   }
2861   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2862   if (size != ntohs (obmm->size))
2863   {
2864     GNUNET_break (0);
2865     return GNUNET_SYSERR;
2866   }
2867   return GNUNET_OK;
2868 }
2869
2870
2871 /**
2872  * Free fragment tree below @e root, excluding @e root itself.
2873  *
2874  * @param root root of the tree to free
2875  */
2876 static void
2877 free_fragment_tree (struct PendingMessage *root)
2878 {
2879   struct PendingMessage *frag;
2880
2881   while (NULL != (frag = root->head_frag))
2882   {
2883     free_fragment_tree (frag);
2884     GNUNET_CONTAINER_MDLL_remove (frag,
2885                                   root->head_frag,
2886                                   root->tail_frag,
2887                                   frag);
2888     GNUNET_free (frag);
2889   }
2890 }
2891
2892
2893 /**
2894  * Release memory associated with @a pm and remove @a pm from associated
2895  * data structures.  @a pm must be a top-level pending message and not
2896  * a fragment in the tree.  The entire tree is freed (if applicable).
2897  *
2898  * @param pm the pending message to free
2899  */
2900 static void
2901 free_pending_message (struct PendingMessage *pm)
2902 {
2903   struct TransportClient *tc = pm->client;
2904   struct Neighbour *target = pm->target;
2905
2906   if (NULL != tc)
2907   {
2908     GNUNET_CONTAINER_MDLL_remove (client,
2909                                   tc->details.core.pending_msg_head,
2910                                   tc->details.core.pending_msg_tail,
2911                                   pm);
2912   }
2913   GNUNET_CONTAINER_MDLL_remove (neighbour,
2914                                 target->pending_msg_head,
2915                                 target->pending_msg_tail,
2916                                 pm);
2917   free_fragment_tree (pm);
2918   GNUNET_free_non_null (pm->bpm);
2919   GNUNET_free (pm);
2920 }
2921
2922
2923 /**
2924  * Send a response to the @a pm that we have processed a
2925  * "send" request with status @a success. We
2926  * transmitted @a bytes_physical on the actual wire.
2927  * Sends a confirmation to the "core" client responsible
2928  * for the original request and free's @a pm.
2929  *
2930  * @param pm handle to the original pending message
2931  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2932  *          for transmission failure
2933  * @param bytes_physical amount of bandwidth consumed
2934  */
2935 static void
2936 client_send_response (struct PendingMessage *pm,
2937                       int success,
2938                       uint32_t bytes_physical)
2939 {
2940   struct TransportClient *tc = pm->client;
2941   struct Neighbour *target = pm->target;
2942   struct GNUNET_MQ_Envelope *env;
2943   struct SendOkMessage *som;
2944
2945   if (NULL != tc)
2946   {
2947     env = GNUNET_MQ_msg (som,
2948                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2949     som->success = htonl ((uint32_t) success);
2950     som->bytes_msg = htons (pm->bytes_msg);
2951     som->bytes_physical = htonl (bytes_physical);
2952     som->peer = target->pid;
2953     GNUNET_MQ_send (tc->mq,
2954                     env);
2955   }
2956   free_pending_message (pm);
2957 }
2958
2959
2960 /**
2961  * Checks the message queue for a neighbour for messages that have timed
2962  * out and purges them.
2963  *
2964  * @param cls a `struct Neighbour`
2965  */
2966 static void
2967 check_queue_timeouts (void *cls)
2968 {
2969   struct Neighbour *n = cls;
2970   struct PendingMessage *pm;
2971   struct GNUNET_TIME_Absolute now;
2972   struct GNUNET_TIME_Absolute earliest_timeout;
2973
2974   n->timeout_task = NULL;
2975   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2976   now = GNUNET_TIME_absolute_get ();
2977   for (struct PendingMessage *pos = n->pending_msg_head;
2978        NULL != pos;
2979        pos = pm)
2980   {
2981     pm = pos->next_neighbour;
2982     if (pos->timeout.abs_value_us <= now.abs_value_us)
2983     {
2984       GNUNET_STATISTICS_update (GST_stats,
2985                                 "# messages dropped (timeout before confirmation)",
2986                                 1,
2987                                 GNUNET_NO);
2988       client_send_response (pm,
2989                             GNUNET_NO,
2990                             0);
2991       continue;
2992     }
2993     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2994                                                  pos->timeout);
2995   }
2996   n->earliest_timeout = earliest_timeout;
2997   if (NULL != n->pending_msg_head)
2998     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2999                                                &check_queue_timeouts,
3000                                                n);
3001 }
3002
3003
3004 /**
3005  * Client asked for transmission to a peer.  Process the request.
3006  *
3007  * @param cls the client
3008  * @param obm the send message that was sent
3009  */
3010 static void
3011 handle_client_send (void *cls,
3012                     const struct OutboundMessage *obm)
3013 {
3014   struct TransportClient *tc = cls;
3015   struct PendingMessage *pm;
3016   const struct GNUNET_MessageHeader *obmm;
3017   struct Neighbour *target;
3018   uint32_t bytes_msg;
3019   int was_empty;
3020
3021   GNUNET_assert (CT_CORE == tc->type);
3022   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
3023   bytes_msg = ntohs (obmm->size);
3024   target = lookup_neighbour (&obm->peer);
3025   if (NULL == target)
3026   {
3027     /* Failure: don't have this peer as a neighbour (anymore).
3028        Might have gone down asynchronously, so this is NOT
3029        a protocol violation by CORE. Still count the event,
3030        as this should be rare. */
3031     struct GNUNET_MQ_Envelope *env;
3032     struct SendOkMessage *som;
3033
3034     env = GNUNET_MQ_msg (som,
3035                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
3036     som->success = htonl (GNUNET_SYSERR);
3037     som->bytes_msg = htonl (bytes_msg);
3038     som->bytes_physical = htonl (0);
3039     som->peer = obm->peer;
3040     GNUNET_MQ_send (tc->mq,
3041                     env);
3042     GNUNET_SERVICE_client_continue (tc->client);
3043     GNUNET_STATISTICS_update (GST_stats,
3044                               "# messages dropped (neighbour unknown)",
3045                               1,
3046                               GNUNET_NO);
3047     return;
3048   }
3049   was_empty = (NULL == target->pending_msg_head);
3050   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
3051   pm->client = tc;
3052   pm->target = target;
3053   pm->bytes_msg = bytes_msg;
3054   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
3055   memcpy (&pm[1],
3056           &obm[1],
3057           bytes_msg);
3058   GNUNET_CONTAINER_MDLL_insert (neighbour,
3059                                 target->pending_msg_head,
3060                                 target->pending_msg_tail,
3061                                 pm);
3062   GNUNET_CONTAINER_MDLL_insert (client,
3063                                 tc->details.core.pending_msg_head,
3064                                 tc->details.core.pending_msg_tail,
3065                                 pm);
3066   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
3067   {
3068     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
3069     if (NULL != target->timeout_task)
3070       GNUNET_SCHEDULER_cancel (target->timeout_task);
3071     target->timeout_task
3072       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
3073                                  &check_queue_timeouts,
3074                                  target);
3075   }
3076   if (! was_empty)
3077     return; /* all queues must already be busy */
3078   for (struct Queue *queue = target->queue_head;
3079        NULL != queue;
3080        queue = queue->next_neighbour)
3081   {
3082     /* try transmission on any queue that is idle */
3083     if (NULL == queue->transmit_task)
3084       queue->transmit_task = GNUNET_SCHEDULER_add_now (&transmit_on_queue,
3085                                                        queue);
3086   }
3087 }
3088
3089
3090 /**
3091  * Communicator started.  Test message is well-formed.
3092  *
3093  * @param cls the client
3094  * @param cam the send message that was sent
3095  */
3096 static int
3097 check_communicator_available (void *cls,
3098                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
3099 {
3100   struct TransportClient *tc = cls;
3101   uint16_t size;
3102
3103   if (CT_NONE != tc->type)
3104   {
3105     GNUNET_break (0);
3106     return GNUNET_SYSERR;
3107   }
3108   tc->type = CT_COMMUNICATOR;
3109   size = ntohs (cam->header.size) - sizeof (*cam);
3110   if (0 == size)
3111     return GNUNET_OK; /* receive-only communicator */
3112   GNUNET_MQ_check_zero_termination (cam);
3113   return GNUNET_OK;
3114 }
3115
3116
3117 /**
3118  * Communicator started.  Process the request.
3119  *
3120  * @param cls the client
3121  * @param cam the send message that was sent
3122  */
3123 static void
3124 handle_communicator_available (void *cls,
3125                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
3126 {
3127   struct TransportClient *tc = cls;
3128   uint16_t size;
3129
3130   size = ntohs (cam->header.size) - sizeof (*cam);
3131   if (0 == size)
3132     return; /* receive-only communicator */
3133   tc->details.communicator.address_prefix
3134     = GNUNET_strdup ((const char *) &cam[1]);
3135   tc->details.communicator.cc
3136     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
3137   GNUNET_SERVICE_client_continue (tc->client);
3138 }
3139
3140
3141 /**
3142  * Communicator requests backchannel transmission.  Check the request.
3143  *
3144  * @param cls the client
3145  * @param cb the send message that was sent
3146  * @return #GNUNET_OK if message is well-formed
3147  */
3148 static int
3149 check_communicator_backchannel (void *cls,
3150                                 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
3151 {
3152   const struct GNUNET_MessageHeader *inbox;
3153   const char *is;
3154   uint16_t msize;
3155   uint16_t isize;
3156
3157   (void) cls;
3158   msize = ntohs (cb->header.size) - sizeof (*cb);
3159   if (UINT16_MAX - msize >
3160       sizeof (struct TransportBackchannelEncapsulationMessage) +
3161       sizeof (struct TransportBackchannelRequestPayload) )
3162   {
3163     GNUNET_break (0);
3164     return GNUNET_SYSERR;
3165   }
3166   inbox = (const struct GNUNET_MessageHeader *) &cb[1];
3167   isize = ntohs (inbox->size);
3168   if (isize >= msize)
3169   {
3170     GNUNET_break (0);
3171     return GNUNET_SYSERR;
3172   }
3173   is = (const char *) inbox;
3174   is += isize;
3175   msize -= isize;
3176   GNUNET_assert (msize > 0);
3177   if ('\0' != is[msize-1])
3178   {
3179     GNUNET_break (0);
3180     return GNUNET_SYSERR;
3181   }
3182   return GNUNET_OK;
3183 }
3184
3185
3186 /**
3187  * Remove memory used by expired ephemeral keys.
3188  *
3189  * @param cls NULL
3190  */
3191 static void
3192 expire_ephemerals (void *cls)
3193 {
3194   struct EphemeralCacheEntry *ece;
3195
3196   (void) cls;
3197   ephemeral_task = NULL;
3198   while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
3199   {
3200     if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
3201     {
3202       free_ephemeral (ece);
3203       continue;
3204     }
3205     ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
3206                                               &expire_ephemerals,
3207                                               NULL);
3208     return;
3209   }
3210 }
3211
3212
3213 /**
3214  * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
3215  * one, cache it and return it.
3216  *
3217  * @param pid peer to look up ephemeral for
3218  * @param private_key[out] set to the private key
3219  * @param ephemeral_key[out] set to the key
3220  * @param ephemeral_sender_sig[out] set to the signature
3221  * @param ephemeral_validity[out] set to the validity expiration time
3222  */
3223 static void
3224 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
3225                   struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
3226                   struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
3227                   struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
3228                   struct GNUNET_TIME_Absolute *ephemeral_validity)
3229 {
3230   struct EphemeralCacheEntry *ece;
3231   struct EphemeralConfirmation ec;
3232
3233   ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
3234                                            pid);
3235   if ( (NULL != ece) &&
3236        (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
3237   {
3238     free_ephemeral (ece);
3239     ece = NULL;
3240   }
3241   if (NULL == ece)
3242   {
3243     ece = GNUNET_new (struct EphemeralCacheEntry);
3244     ece->target = *pid;
3245     ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
3246                                                         EPHEMERAL_VALIDITY);
3247     GNUNET_assert (GNUNET_OK ==
3248                    GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
3249     GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
3250                                         &ece->ephemeral_key);
3251     ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
3252     ec.purpose.size = htonl (sizeof (ec));
3253     ec.target = *pid;
3254     ec.ephemeral_key = ece->ephemeral_key;
3255     GNUNET_assert (GNUNET_OK ==
3256                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
3257                                              &ec.purpose,
3258                                              &ece->sender_sig));
3259     ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
3260                                             ece,
3261                                             ece->ephemeral_validity.abs_value_us);
3262     GNUNET_assert (GNUNET_OK ==
3263                    GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
3264                                                       &ece->target,
3265                                                       ece,
3266                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3267     if (NULL == ephemeral_task)
3268       ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
3269                                                 &expire_ephemerals,
3270                                                 NULL);
3271   }
3272   *private_key = ece->private_key;
3273   *ephemeral_key = ece->ephemeral_key;
3274   *ephemeral_sender_sig = ece->sender_sig;
3275   *ephemeral_validity = ece->ephemeral_validity;
3276 }
3277
3278
3279 /**
3280  * Send the control message @a payload on @a queue.
3281  *
3282  * @param queue the queue to use for transmission
3283  * @param pm pending message to update once transmission is done, may be NULL!
3284  * @param payload the payload to send (encapsulated in a
3285  *        #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG).
3286  * @param payload_size number of bytes in @a payload
3287  */
3288 static void
3289 queue_send_msg (struct Queue *queue,
3290                 struct PendingMessage *pm,
3291                 const void *payload,
3292                 size_t payload_size)
3293 {
3294   struct Neighbour *n = queue->neighbour;
3295   struct GNUNET_TRANSPORT_SendMessageTo *smt;
3296   struct GNUNET_MQ_Envelope *env;
3297
3298   env = GNUNET_MQ_msg_extra (smt,
3299                              payload_size,
3300                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
3301   smt->qid = queue->qid;
3302   smt->mid = queue->mid_gen;
3303   smt->receiver = n->pid;
3304   memcpy (&smt[1],
3305           payload,
3306           payload_size);
3307   {
3308     /* Pass the env to the communicator of queue for transmission. */
3309     struct QueueEntry *qe;
3310
3311     qe = GNUNET_new (struct QueueEntry);
3312     qe->mid = queue->mid_gen++;
3313     qe->queue = queue;
3314     // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'!
3315     // (also, note that pm may be NULL!)
3316     GNUNET_CONTAINER_DLL_insert (queue->queue_head,
3317                                  queue->queue_tail,
3318                                  qe);
3319     GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3320     queue->queue_length++;
3321     queue->tc->details.communicator.total_queue_length++;
3322     GNUNET_MQ_send (queue->tc->mq,
3323                     env);
3324   }
3325 }
3326
3327
3328 /**
3329  * We need to transmit @a hdr to @a target.  If necessary, this may
3330  * involve DV routing or even broadcasting and fragmentation.
3331  *
3332  * @param target peer to receive @a hdr
3333  * @param hdr header of the message to route and #GNUNET_free()
3334  */
3335 static void
3336 route_message (const struct GNUNET_PeerIdentity *target,
3337                struct GNUNET_MessageHeader *hdr)
3338 {
3339   // Cases:
3340   // 1: called to transmit backchannel message we initiated
3341   // 2: called to transmit fragment ack
3342   // 3: called to transmit reliability box
3343   // 4: called to forward backchannel message
3344   // 5: called to forward DV learn message (caller already picked random neighbour(s))!
3345   // 6: called to forward DV Box message
3346   // 7: called to forward valdiation response
3347
3348   // Choices:
3349   // a) Send ONLY to a *confirmed* direct neighbour
3350   // b) Send allowed to *unconfirmed* direct neighbour
3351   // c) Route also via *confirmed* DV to target
3352   // c) Route allowed via *unconfirmed  DV to target
3353   // => One BIT "dv allowed or not", plus one BIT "confirmed/unconfirmed" might do!
3354
3355   // Case analysis:
3356   //         1       2        3        4       5       6      7
3357   // a       X       X        X        X       X       X      X
3358   // b                                         X              X
3359   // c       X       X        X        X                      X
3360   // d                                                        X
3361   //
3362
3363   // FIXME: this one is tricky:
3364   // - we could try a direct, reliable channel
3365   // - if that is unavailable / for load balancing, we may try:
3366   //   * multiple (?) direct unreliable channels - depending on loss rate?
3367   //   * some (?) DV channels - if above unavailable / too lossy?
3368   //   * _random_ other peers ("broadcasting") in hope of *discovering*
3369   //      a path back! - if all else fails
3370   // => need more on DV first!
3371
3372   // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
3373   GNUNET_free (hdr);
3374 }
3375
3376
3377 /**
3378  * Structure of the key material used to encrypt backchannel messages.
3379  */
3380 struct BackchannelKeyState
3381 {
3382   /**
3383    * State of our block cipher.
3384    */
3385   gcry_cipher_hd_t cipher;
3386
3387   /**
3388    * Actual key material.
3389    */
3390   struct {
3391
3392     /**
3393      * Key used for HMAC calculations (via #GNUNET_CRYPTO_hmac()).
3394      */
3395     struct GNUNET_CRYPTO_AuthKey hmac_key;
3396
3397     /**
3398      * Symmetric key to use for encryption.
3399      */
3400     char aes_key[256/8];
3401
3402     /**
3403      * Counter value to use during setup.
3404      */
3405     char aes_ctr[128/8];
3406
3407   } material;
3408 };
3409
3410
3411 static void
3412 bc_setup_key_state_from_km (const struct GNUNET_HashCode *km,
3413                             const struct GNUNET_ShortHashCode *iv,
3414                             struct BackchannelKeyState *key)
3415 {
3416   /* must match #dh_key_derive_eph_pub */
3417   GNUNET_assert (GNUNET_YES ==
3418                  GNUNET_CRYPTO_kdf (&key->material,
3419                                     sizeof (key->material),
3420                                     "transport-backchannel-key",
3421                                     strlen ("transport-backchannel-key"),
3422                                     &km,
3423                                     sizeof (km),
3424                                     iv,
3425                                     sizeof (*iv)));
3426   gcry_cipher_open (&key->cipher,
3427                     GCRY_CIPHER_AES256 /* low level: go for speed */,
3428                     GCRY_CIPHER_MODE_CTR,
3429                     0 /* flags */);
3430   gcry_cipher_setkey (key->cipher,
3431                       &key->material.aes_key,
3432                       sizeof (key->material.aes_key));
3433   gcry_cipher_setctr (key->cipher,
3434                       &key->material.aes_ctr,
3435                       sizeof (key->material.aes_ctr));
3436 }
3437
3438
3439 /**
3440  * Derive backchannel encryption key material from @a priv_ephemeral
3441  * and @a target and @a iv.
3442  *
3443  * @param priv_ephemeral ephemeral private key to use
3444  * @param target the target peer to encrypt to
3445  * @param iv unique IV to use
3446  * @param key[out] set to the key material
3447  */
3448 static void
3449 dh_key_derive_eph_pid (const struct GNUNET_CRYPTO_EcdhePrivateKey *priv_ephemeral,
3450                        const struct GNUNET_PeerIdentity *target,
3451                        const struct GNUNET_ShortHashCode *iv,
3452                        struct BackchannelKeyState *key)
3453 {
3454   struct GNUNET_HashCode km;
3455
3456   GNUNET_assert (GNUNET_YES ==
3457                  GNUNET_CRYPTO_ecdh_eddsa (priv_ephemeral,
3458                                            &target->public_key,
3459                                            &km));
3460   bc_setup_key_state_from_km (&km,
3461                               iv,
3462                               key);
3463 }
3464
3465
3466 /**
3467  * Derive backchannel encryption key material from #GST_my_private_key
3468  * and @a pub_ephemeral and @a iv.
3469  *
3470  * @param priv_ephemeral ephemeral private key to use
3471  * @param target the target peer to encrypt to
3472  * @param iv unique IV to use
3473  * @param key[out] set to the key material
3474  */
3475 static void
3476 dh_key_derive_eph_pub (const struct GNUNET_CRYPTO_EcdhePublicKey *pub_ephemeral,
3477                        const struct GNUNET_ShortHashCode *iv,
3478                        struct BackchannelKeyState *key)
3479 {
3480   struct GNUNET_HashCode km;
3481
3482   GNUNET_assert (GNUNET_YES ==
3483                  GNUNET_CRYPTO_eddsa_ecdh (GST_my_private_key,
3484                                            pub_ephemeral,
3485                                            &km));
3486   bc_setup_key_state_from_km (&km,
3487                               iv,
3488                               key);
3489 }
3490
3491
3492 /**
3493  * Do HMAC calculation for backchannel messages over @a data using key
3494  * material from @a key.
3495  *
3496  * @param key key material (from DH)
3497  * @param hmac[out] set to the HMAC
3498  * @param data data to perform HMAC calculation over
3499  * @param data_size number of bytes in @a data
3500  */
3501 static void
3502 bc_hmac (const struct BackchannelKeyState *key,
3503          struct GNUNET_HashCode *hmac,
3504          const void *data,
3505          size_t data_size)
3506 {
3507   GNUNET_CRYPTO_hmac (&key->material.hmac_key,
3508                       data,
3509                       data_size,
3510                       hmac);
3511 }
3512
3513
3514 /**
3515  * Perform backchannel encryption using symmetric secret in @a key
3516  * to encrypt data from @a in to @a dst.
3517  *
3518  * @param key[in,out] key material to use
3519  * @param dst where to write the result
3520  * @param in input data to encrypt (plaintext)
3521  * @param in_size number of bytes of input in @a in and available at @a dst
3522  */
3523 static void
3524 bc_encrypt (struct BackchannelKeyState *key,
3525             const void *in,
3526             void *dst,
3527             size_t in_size)
3528 {
3529   GNUNET_assert (0 ==
3530                  gcry_cipher_encrypt (key->cipher,
3531                                       dst,
3532                                       in_size,
3533                                       in,
3534                                       in_size));
3535 }
3536
3537
3538 /**
3539  * Perform backchannel encryption using symmetric secret in @a key
3540  * to encrypt data from @a in to @a dst.
3541  *
3542  * @param key[in,out] key material to use
3543  * @param ciph cipher text to decrypt
3544  * @param out[out] output data to generate (plaintext)
3545  * @param out_size number of bytes of input in @a ciph and available in @a out
3546  */
3547 static void
3548 bc_decrypt (struct BackchannelKeyState *key,
3549             void *out,
3550             const void *ciph,
3551             size_t out_size)
3552 {
3553   GNUNET_assert (0 ==
3554                  gcry_cipher_decrypt (key->cipher,
3555                                       out,
3556                                       out_size,
3557                                       ciph,
3558                                       out_size));
3559 }
3560
3561
3562 /**
3563  * Clean up key material in @a key.
3564  *
3565  * @param key key material to clean up (memory must not be free'd!)
3566  */
3567 static void
3568 bc_key_clean (struct BackchannelKeyState *key)
3569 {
3570   gcry_cipher_close (key->cipher);
3571   GNUNET_CRYPTO_zero_keys (&key->material,
3572                            sizeof (key->material));
3573 }
3574
3575
3576 /**
3577  * Communicator requests backchannel transmission.  Process the request.
3578  *
3579  * @param cls the client
3580  * @param cb the send message that was sent
3581  */
3582 static void
3583 handle_communicator_backchannel (void *cls,
3584                                  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
3585 {
3586   struct TransportClient *tc = cls;
3587   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
3588   struct GNUNET_TIME_Absolute ephemeral_validity;
3589   struct TransportBackchannelEncapsulationMessage *enc;
3590   struct TransportBackchannelRequestPayload ppay;
3591   struct BackchannelKeyState key;
3592   char *mpos;
3593   uint16_t msize;
3594
3595   /* encapsulate and encrypt message */
3596   msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
3597   enc = GNUNET_malloc (sizeof (*enc) + msize);
3598   enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
3599   enc->header.size = htons (sizeof (*enc) + msize);
3600   enc->target = cb->pid;
3601   lookup_ephemeral (&cb->pid,
3602                     &private_key,
3603                     &enc->ephemeral_key,
3604                     &ppay.sender_sig,
3605                     &ephemeral_validity);
3606   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
3607                               &enc->iv,
3608                               sizeof (enc->iv));
3609   dh_key_derive_eph_pid (&private_key,
3610                          &cb->pid,
3611                          &enc->iv,
3612                          &key);
3613   ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
3614   ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
3615   mpos = (char *) &enc[1];
3616   bc_encrypt (&key,
3617               &ppay,
3618               mpos,
3619               sizeof (ppay));
3620   bc_encrypt (&key,
3621               &cb[1],
3622               &mpos[sizeof (ppay)],
3623               ntohs (cb->header.size) - sizeof (*cb));
3624   bc_hmac (&key,
3625            &enc->hmac,
3626            mpos,
3627            sizeof (ppay) + ntohs (cb->header.size) - sizeof (*cb));
3628   bc_key_clean (&key);
3629   route_message (&cb->pid,
3630                  &enc->header);
3631   GNUNET_SERVICE_client_continue (tc->client);
3632 }
3633
3634
3635 /**
3636  * Address of our peer added.  Test message is well-formed.
3637  *
3638  * @param cls the client
3639  * @param aam the send message that was sent
3640  * @return #GNUNET_OK if message is well-formed
3641  */
3642 static int
3643 check_add_address (void *cls,
3644                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
3645 {
3646   struct TransportClient *tc = cls;
3647
3648   if (CT_COMMUNICATOR != tc->type)
3649   {
3650     GNUNET_break (0);
3651     return GNUNET_SYSERR;
3652   }
3653   GNUNET_MQ_check_zero_termination (aam);
3654   return GNUNET_OK;
3655 }
3656
3657
3658 /**
3659  * Ask peerstore to store our address.
3660  *
3661  * @param cls an `struct AddressListEntry *`
3662  */
3663 static void
3664 store_pi (void *cls);
3665
3666
3667 /**
3668  * Function called when peerstore is done storing our address.
3669  *
3670  * @param cls a `struct AddressListEntry`
3671  * @param success #GNUNET_YES if peerstore was successful
3672  */
3673 static void
3674 peerstore_store_own_cb (void *cls,
3675                         int success)
3676 {
3677   struct AddressListEntry *ale = cls;
3678
3679   ale->sc = NULL;
3680   if (GNUNET_YES != success)
3681     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3682                 "Failed to store our own address `%s' in peerstore!\n",
3683                 ale->address);
3684   /* refresh period is 1/4 of expiration time, that should be plenty
3685      without being excessive. */
3686   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
3687                                                                        4ULL),
3688                                           &store_pi,
3689                                           ale);
3690 }
3691
3692
3693 /**
3694  * Ask peerstore to store our address.
3695  *
3696  * @param cls an `struct AddressListEntry *`
3697  */
3698 static void
3699 store_pi (void *cls)
3700 {
3701   struct AddressListEntry *ale = cls;
3702   void *addr;
3703   size_t addr_len;
3704   struct GNUNET_TIME_Absolute expiration;
3705
3706   ale->st = NULL;
3707   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
3708   GNUNET_HELLO_sign_address (ale->address,
3709                              ale->nt,
3710                              expiration,
3711                              GST_my_private_key,
3712                              &addr,
3713                              &addr_len);
3714   ale->sc = GNUNET_PEERSTORE_store (peerstore,
3715                                     "transport",
3716                                     &GST_my_identity,
3717                                     GNUNET_PEERSTORE_TRANSPORT_HELLO_KEY,
3718                                     addr,
3719                                     addr_len,
3720                                     expiration,
3721                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
3722                                     &peerstore_store_own_cb,
3723                                     ale);
3724   GNUNET_free (addr);
3725   if (NULL == ale->sc)
3726   {
3727     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3728                 "Failed to store our address `%s' with peerstore\n",
3729                 ale->address);
3730     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
3731                                             &store_pi,
3732                                             ale);
3733   }
3734 }
3735
3736
3737 /**
3738  * Address of our peer added.  Process the request.
3739  *
3740  * @param cls the client
3741  * @param aam the send message that was sent
3742  */
3743 static void
3744 handle_add_address (void *cls,
3745                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
3746 {
3747   struct TransportClient *tc = cls;
3748   struct AddressListEntry *ale;
3749   size_t slen;
3750
3751   slen = ntohs (aam->header.size) - sizeof (*aam);
3752   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
3753   ale->tc = tc;
3754   ale->address = (const char *) &ale[1];
3755   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
3756   ale->aid = aam->aid;
3757   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
3758   memcpy (&ale[1],
3759           &aam[1],
3760           slen);
3761   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
3762                                tc->details.communicator.addr_tail,
3763                                ale);
3764   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
3765                                       ale);
3766   GNUNET_SERVICE_client_continue (tc->client);
3767 }
3768
3769
3770 /**
3771  * Address of our peer deleted.  Process the request.
3772  *
3773  * @param cls the client
3774  * @param dam the send message that was sent
3775  */
3776 static void
3777 handle_del_address (void *cls,
3778                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
3779 {
3780   struct TransportClient *tc = cls;
3781
3782   if (CT_COMMUNICATOR != tc->type)
3783   {
3784     GNUNET_break (0);
3785     GNUNET_SERVICE_client_drop (tc->client);
3786     return;
3787   }
3788   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
3789        NULL != ale;
3790        ale = ale->next)
3791   {
3792     if (dam->aid != ale->aid)
3793       continue;
3794     GNUNET_assert (ale->tc == tc);
3795     free_address_list_entry (ale);
3796     GNUNET_SERVICE_client_continue (tc->client);
3797   }
3798   GNUNET_break (0);
3799   GNUNET_SERVICE_client_drop (tc->client);
3800 }
3801
3802
3803 /**
3804  * Context from #handle_incoming_msg().  Closure for many
3805  * message handlers below.
3806  */
3807 struct CommunicatorMessageContext
3808 {
3809   /**
3810    * Which communicator provided us with the message.
3811    */
3812   struct TransportClient *tc;
3813
3814   /**
3815    * Additional information for flow control and about the sender.
3816    */
3817   struct GNUNET_TRANSPORT_IncomingMessage im;
3818
3819   /**
3820    * Number of hops the message has travelled (if DV-routed).
3821    * FIXME: make use of this in ACK handling!
3822    */
3823   uint16_t total_hops;
3824 };
3825
3826
3827 /**
3828  * Given an inbound message @a msg from a communicator @a cmc,
3829  * demultiplex it based on the type calling the right handler.
3830  *
3831  * @param cmc context for demultiplexing
3832  * @param msg message to demultiplex
3833  */
3834 static void
3835 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3836                       const struct GNUNET_MessageHeader *msg);
3837
3838
3839 /**
3840  * Send ACK to communicator (if requested) and free @a cmc.
3841  *
3842  * @param cmc context for which we are done handling the message
3843  */
3844 static void
3845 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
3846 {
3847   if (0 != ntohl (cmc->im.fc_on))
3848   {
3849     /* send ACK when done to communicator for flow control! */
3850     struct GNUNET_MQ_Envelope *env;
3851     struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
3852
3853     env = GNUNET_MQ_msg (ack,
3854                          GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
3855     ack->reserved = htonl (0);
3856     ack->fc_id = cmc->im.fc_id;
3857     ack->sender = cmc->im.sender;
3858     GNUNET_MQ_send (cmc->tc->mq,
3859                     env);
3860   }
3861   GNUNET_SERVICE_client_continue (cmc->tc->client);
3862   GNUNET_free (cmc);
3863 }
3864
3865
3866 /**
3867  * Communicator gave us an unencapsulated message to pass as-is to
3868  * CORE.  Process the request.
3869  *
3870  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3871  * @param mh the message that was received
3872  */
3873 static void
3874 handle_raw_message (void *cls,
3875                     const struct GNUNET_MessageHeader *mh)
3876 {
3877   struct CommunicatorMessageContext *cmc = cls;
3878   uint16_t size = ntohs (mh->size);
3879
3880   if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
3881        (size < sizeof (struct GNUNET_MessageHeader)) )
3882   {
3883     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3884
3885     GNUNET_break (0);
3886     finish_cmc_handling (cmc);
3887     GNUNET_SERVICE_client_drop (client);
3888     return;
3889   }
3890   /* Forward to all CORE clients */
3891   for (struct TransportClient *tc = clients_head;
3892        NULL != tc;
3893        tc = tc->next)
3894   {
3895     struct GNUNET_MQ_Envelope *env;
3896     struct InboundMessage *im;
3897
3898     if (CT_CORE != tc->type)
3899       continue;
3900     env = GNUNET_MQ_msg_extra (im,
3901                                size,
3902                                GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
3903     im->peer = cmc->im.sender;
3904     memcpy (&im[1],
3905             mh,
3906             size);
3907     GNUNET_MQ_send (tc->mq,
3908                     env);
3909   }
3910   /* FIXME: consider doing this _only_ once the message
3911      was drained from the CORE MQs to extend flow control to CORE!
3912      (basically, increment counter in cmc, decrement on MQ send continuation! */
3913   finish_cmc_handling (cmc);
3914 }
3915
3916
3917 /**
3918  * Communicator gave us a fragment box.  Check the message.
3919  *
3920  * @param cls a `struct CommunicatorMessageContext`
3921  * @param fb the send message that was sent
3922  * @return #GNUNET_YES if message is well-formed
3923  */
3924 static int
3925 check_fragment_box (void *cls,
3926                     const struct TransportFragmentBox *fb)
3927 {
3928   uint16_t size = ntohs (fb->header.size);
3929   uint16_t bsize = size - sizeof (*fb);
3930
3931   if (0 == bsize)
3932   {
3933     GNUNET_break_op (0);
3934     return GNUNET_SYSERR;
3935   }
3936   if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
3937   {
3938     GNUNET_break_op (0);
3939     return GNUNET_SYSERR;
3940   }
3941   if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
3942   {
3943     GNUNET_break_op (0);
3944     return GNUNET_SYSERR;
3945   }
3946   return GNUNET_YES;
3947 }
3948
3949
3950 /**
3951  * Generate a fragment acknowledgement for an @a rc.
3952  *
3953  * @param rc context to generate ACK for, @a rc ACK state is reset
3954  */
3955 static void
3956 send_fragment_ack (struct ReassemblyContext *rc)
3957 {
3958   struct TransportFragmentAckMessage *ack;
3959
3960   ack = GNUNET_new (struct TransportFragmentAckMessage);
3961   ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3962   ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3963   ack->frag_uuid = htonl (rc->frag_uuid);
3964   ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3965   ack->msg_uuid = rc->msg_uuid;
3966   ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3967   if (0 == rc->msg_missing)
3968     ack->reassembly_timeout
3969       = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3970   else
3971     ack->reassembly_timeout
3972       = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3973   route_message (&rc->neighbour->pid,
3974                  &ack->header);
3975   rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3976   rc->num_acks = 0;
3977   rc->extra_acks = 0LLU;
3978 }
3979
3980
3981 /**
3982  * Communicator gave us a fragment.  Process the request.
3983  *
3984  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3985  * @param fb the message that was received
3986  */
3987 static void
3988 handle_fragment_box (void *cls,
3989                      const struct TransportFragmentBox *fb)
3990 {
3991   struct CommunicatorMessageContext *cmc = cls;
3992   struct Neighbour *n;
3993   struct ReassemblyContext *rc;
3994   const struct GNUNET_MessageHeader *msg;
3995   uint16_t msize;
3996   uint16_t fsize;
3997   uint16_t frag_off;
3998   uint32_t frag_uuid;
3999   char *target;
4000   struct GNUNET_TIME_Relative cdelay;
4001   int ack_now;
4002
4003   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
4004                                          &cmc->im.sender);
4005   if (NULL == n)
4006   {
4007     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
4008
4009     GNUNET_break (0);
4010     finish_cmc_handling (cmc);
4011     GNUNET_SERVICE_client_drop (client);
4012     return;
4013   }
4014   if (NULL == n->reassembly_map)
4015   {
4016     n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
4017                                                                GNUNET_YES);
4018     n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4019     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
4020                                                                &reassembly_cleanup_task,
4021                                                                n);
4022   }
4023   msize = ntohs (fb->msg_size);
4024   rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
4025                                            &fb->msg_uuid);
4026   if (NULL == rc)
4027   {
4028     rc = GNUNET_malloc (sizeof (*rc) +
4029                         msize + /* reassembly payload buffer */
4030                         (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
4031     rc->msg_uuid = fb->msg_uuid;
4032     rc->neighbour = n;
4033     rc->msg_size = msize;
4034     rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
4035     rc->last_frag = GNUNET_TIME_absolute_get ();
4036     rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
4037                                            rc,
4038                                            rc->reassembly_timeout.abs_value_us);
4039     GNUNET_assert (GNUNET_OK ==
4040                    GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
4041                                                        &rc->msg_uuid,
4042                                                        rc,
4043                                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4044     target = (char *) &rc[1];
4045     rc->bitfield = (uint8_t *) (target + rc->msg_size);
4046     rc->msg_missing = rc->msg_size;
4047   }
4048   else
4049   {
4050     target = (char *) &rc[1];
4051   }
4052   if (msize != rc->msg_size)
4053   {
4054     GNUNET_break (0);
4055     finish_cmc_handling (cmc);
4056     return;
4057   }
4058
4059   /* reassemble */
4060   fsize = ntohs (fb->header.size) - sizeof (*fb);
4061   frag_off = ntohs (fb->frag_off);
4062   memcpy (&target[frag_off],
4063           &fb[1],
4064           fsize);
4065   /* update bitfield and msg_missing */
4066   for (unsigned int i=frag_off;i<frag_off+fsize;i++)
4067   {
4068     if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
4069     {
4070       rc->bitfield[i / 8] |= (1 << (i % 8));
4071       rc->msg_missing--;
4072     }
4073   }
4074
4075   /* Compute cummulative ACK */
4076   frag_uuid = ntohl (fb->frag_uuid);
4077   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
4078   cdelay = GNUNET_TIME_relative_multiply (cdelay,
4079                                           rc->num_acks);
4080   rc->last_frag = GNUNET_TIME_absolute_get ();
4081   rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
4082                                                 cdelay);
4083   ack_now = GNUNET_NO;
4084   if (0 == rc->num_acks)
4085   {
4086     /* case one: first ack */
4087     rc->frag_uuid = frag_uuid;
4088     rc->extra_acks = 0LLU;
4089     rc->num_acks = 1;
4090   }
4091   else if ( (frag_uuid >= rc->frag_uuid) &&
4092             (frag_uuid <= rc->frag_uuid + 64) )
4093   {
4094     /* case two: ack fits after existing min UUID */
4095     if ( (frag_uuid == rc->frag_uuid) ||
4096          (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
4097     {
4098       /* duplicate fragment, ack now! */
4099       ack_now = GNUNET_YES;
4100     }
4101     else
4102     {
4103       rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
4104       rc->num_acks++;
4105     }
4106   }
4107   else if ( (rc->frag_uuid > frag_uuid) &&
4108             ( ( (rc->frag_uuid == frag_uuid + 64) &&
4109                 (0 == rc->extra_acks) ) ||
4110               ( (rc->frag_uuid < frag_uuid + 64) &&
4111                 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
4112   {
4113     /* can fit ack by shifting extra acks and starting at
4114        frag_uid, test above esured that the bits we will
4115        shift 'extra_acks' by are all zero. */
4116     rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
4117     rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
4118     rc->frag_uuid = frag_uuid;
4119     rc->num_acks++;
4120   }
4121   if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
4122     ack_now = GNUNET_YES; /* maximum acks received */
4123   // FIXME: possibly also ACK based on RTT (but for that we'd need to
4124   // determine the queue used for the ACK first!)
4125
4126   /* is reassembly complete? */
4127   if (0 != rc->msg_missing)
4128   {
4129     if (ack_now)
4130       send_fragment_ack (rc);
4131     finish_cmc_handling (cmc);
4132     return;
4133   }
4134   /* reassembly is complete, verify result */
4135   msg = (const struct GNUNET_MessageHeader *) &rc[1];
4136   if (ntohs (msg->size) != rc->msg_size)
4137   {
4138     GNUNET_break (0);
4139     free_reassembly_context (rc);
4140     finish_cmc_handling (cmc);
4141     return;
4142   }
4143   /* successful reassembly */
4144   send_fragment_ack (rc);
4145   demultiplex_with_cmc (cmc,
4146                         msg);
4147   /* FIXME: really free here? Might be bad if fragments are still
4148      en-route and we forget that we finished this reassembly immediately!
4149      -> keep around until timeout?
4150      -> shorten timeout based on ACK? */
4151   free_reassembly_context (rc);
4152 }
4153
4154
4155 /**
4156  * Check the @a fa against the fragments associated with @a pm.
4157  * If it matches, remove the matching fragments from the transmission
4158  * list.
4159  *
4160  * @param pm pending message to check against the ack
4161  * @param fa the ack that was received
4162  * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
4163  */
4164 static int
4165 check_ack_against_pm (struct PendingMessage *pm,
4166                       const struct TransportFragmentAckMessage *fa)
4167 {
4168   int match;
4169   struct PendingMessage *nxt;
4170   uint32_t fs = ntohl (fa->frag_uuid);
4171   uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
4172
4173   match = GNUNET_NO;
4174   for (struct PendingMessage *frag = pm->head_frag;
4175        NULL != frag;
4176        frag = nxt)
4177   {
4178     const struct TransportFragmentBox *tfb
4179       = (const struct TransportFragmentBox *) &pm[1];
4180     uint32_t fu = ntohl (tfb->frag_uuid);
4181
4182     GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
4183     nxt = frag->next_frag;
4184     /* Check for exact match or match in the 'xtra' bitmask */
4185     if ( (fu == fs) ||
4186          ( (fu > fs) &&
4187            (fu <= fs + 64) &&
4188            (0 != (1LLU << (fu - fs - 1) & xtra)) ) )
4189     {
4190       match = GNUNET_YES;
4191       free_fragment_tree (frag);
4192     }
4193   }
4194   return match;
4195 }
4196
4197
4198 /**
4199  * Communicator gave us a fragment acknowledgement.  Process the request.
4200  *
4201  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4202  * @param fa the message that was received
4203  */
4204 static void
4205 handle_fragment_ack (void *cls,
4206                      const struct TransportFragmentAckMessage *fa)
4207 {
4208   struct CommunicatorMessageContext *cmc = cls;
4209   struct Neighbour *n;
4210   int matched;
4211
4212   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
4213                                          &cmc->im.sender);
4214   if (NULL == n)
4215   {
4216     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
4217
4218     GNUNET_break (0);
4219     finish_cmc_handling (cmc);
4220     GNUNET_SERVICE_client_drop (client);
4221     return;
4222   }
4223   /* FIXME-OPTIMIZE: maybe use another hash map here? */
4224   matched = GNUNET_NO;
4225   for (struct PendingMessage *pm = n->pending_msg_head;
4226        NULL != pm;
4227        pm = pm->prev_neighbour)
4228   {
4229     if (0 !=
4230         GNUNET_memcmp (&fa->msg_uuid,
4231                        &pm->msg_uuid))
4232       continue;
4233     matched = GNUNET_YES;
4234     if (GNUNET_YES ==
4235         check_ack_against_pm (pm,
4236                               fa))
4237     {
4238       struct GNUNET_TIME_Relative avg_ack_delay
4239         = GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
4240       // FIXME: update RTT and other reliability data!
4241       // ISSUE: we don't know which of n's queues the message(s)
4242       // took (and in fact the different messages might have gone
4243       // over different queues and possibly over multiple).
4244       // => track queues with PendingMessages, and update RTT only if
4245       //    the queue used is unique?
4246       //    -> how can we get loss rates?
4247       //    -> or, add extra state to Box and ACK to identify queue?
4248       // IDEA: generate MULTIPLE frag-uuids per fragment and track
4249       //    the queue with the fragment! (-> this logic must
4250       //    be moved into check_ack_against_pm!)
4251       (void) avg_ack_delay;
4252     }
4253     else
4254     {
4255       GNUNET_STATISTICS_update (GST_stats,
4256                                 "# FRAGMENT_ACKS dropped, no matching fragment",
4257                                 1,
4258                                 GNUNET_NO);
4259     }
4260     if (NULL == pm->head_frag)
4261     {
4262       // if entire message is ACKed, handle that as well.
4263       // => clean up PM, any post actions?
4264       free_pending_message (pm);
4265     }
4266     else
4267     {
4268       struct GNUNET_TIME_Relative reassembly_timeout
4269         = GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
4270       // OPTIMIZE-FIXME: adjust retransmission strategy based on reassembly_timeout!
4271       (void) reassembly_timeout;
4272     }
4273     break;
4274   }
4275   if (GNUNET_NO == matched)
4276   {
4277     GNUNET_STATISTICS_update (GST_stats,
4278                               "# FRAGMENT_ACKS dropped, no matching pending message",
4279                               1,
4280                               GNUNET_NO);
4281   }
4282   finish_cmc_handling (cmc);
4283 }
4284
4285
4286 /**
4287  * Communicator gave us a reliability box.  Check the message.
4288  *
4289  * @param cls a `struct CommunicatorMessageContext`
4290  * @param rb the send message that was sent
4291  * @return #GNUNET_YES if message is well-formed
4292  */
4293 static int
4294 check_reliability_box (void *cls,
4295                        const struct TransportReliabilityBox *rb)
4296 {
4297   GNUNET_MQ_check_boxed_message (rb);
4298   return GNUNET_YES;
4299 }
4300
4301
4302 /**
4303  * Communicator gave us a reliability box.  Process the request.
4304  *
4305  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4306  * @param rb the message that was received
4307  */
4308 static void
4309 handle_reliability_box (void *cls,
4310                         const struct TransportReliabilityBox *rb)
4311 {
4312   struct CommunicatorMessageContext *cmc = cls;
4313   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
4314
4315   if (0 == ntohl (rb->ack_countdown))
4316   {
4317     struct TransportReliabilityAckMessage *ack;
4318
4319     /* FIXME: implement cummulative ACKs and ack_countdown,
4320        then setting the avg_ack_delay field below: */
4321     ack = GNUNET_malloc (sizeof (*ack) +
4322                          sizeof (struct GNUNET_ShortHashCode));
4323     ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
4324     ack->header.size = htons (sizeof (*ack) +
4325                               sizeof (struct GNUNET_ShortHashCode));
4326     memcpy (&ack[1],
4327             &rb->msg_uuid,
4328             sizeof (struct GNUNET_ShortHashCode));
4329     route_message (&cmc->im.sender,
4330                    &ack->header);
4331   }
4332   /* continue with inner message */
4333   demultiplex_with_cmc (cmc,
4334                         inbox);
4335 }
4336
4337
4338 /**
4339  * Communicator gave us a reliability ack.  Process the request.
4340  *
4341  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4342  * @param ra the message that was received
4343  */
4344 static void
4345 handle_reliability_ack (void *cls,
4346                         const struct TransportReliabilityAckMessage *ra)
4347 {
4348   struct CommunicatorMessageContext *cmc = cls;
4349   struct Neighbour *n;
4350   unsigned int n_acks;
4351   const struct GNUNET_ShortHashCode *msg_uuids;
4352   struct PendingMessage *nxt;
4353   int matched;
4354
4355   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
4356                                          &cmc->im.sender);
4357   if (NULL == n)
4358   {
4359     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
4360
4361     GNUNET_break (0);
4362     finish_cmc_handling (cmc);
4363     GNUNET_SERVICE_client_drop (client);
4364     return;
4365   }
4366   n_acks = (ntohs (ra->header.size) - sizeof (*ra))
4367     / sizeof (struct GNUNET_ShortHashCode);
4368   msg_uuids = (const struct GNUNET_ShortHashCode *) &ra[1];
4369
4370   /* FIXME-OPTIMIZE: maybe use another hash map here? */
4371   matched = GNUNET_NO;
4372   for (struct PendingMessage *pm = n->pending_msg_head;
4373        NULL != pm;
4374        pm = nxt)
4375   {
4376     int in_list;
4377
4378     nxt = pm->next_neighbour;
4379     in_list = GNUNET_NO;
4380     for (unsigned int i=0;i<n_acks;i++)
4381     {
4382       if (0 !=
4383           GNUNET_memcmp (&msg_uuids[i],
4384                          &pm->msg_uuid))
4385         continue;
4386       in_list = GNUNET_YES;
4387       break;
4388     }
4389     if (GNUNET_NO == in_list)
4390       continue;
4391
4392     /* this pm was acked! */
4393     matched = GNUNET_YES;
4394     free_pending_message (pm);
4395
4396     {
4397       struct GNUNET_TIME_Relative avg_ack_delay
4398         = GNUNET_TIME_relative_ntoh (ra->avg_ack_delay);
4399       // FIXME: update RTT and other reliability data!
4400       // ISSUE: we don't know which of n's queues the message(s)
4401       // took (and in fact the different messages might have gone
4402       // over different queues and possibly over multiple).
4403       // => track queues with PendingMessages, and update RTT only if
4404       //    the queue used is unique?
4405       //    -> how can we get loss rates?
4406       //    -> or, add extra state to MSG and ACKs to identify queue?
4407       //    -> if we do this, might just do the same for the avg_ack_delay!
4408       (void) avg_ack_delay;
4409     }
4410   }
4411   if (GNUNET_NO == matched)
4412   {
4413     GNUNET_STATISTICS_update (GST_stats,
4414                               "# FRAGMENT_ACKS dropped, no matching pending message",
4415                               1,
4416                               GNUNET_NO);
4417   }
4418   finish_cmc_handling (cmc);
4419 }
4420
4421
4422 /**
4423  * Communicator gave us a backchannel encapsulation.  Check the message.
4424  *
4425  * @param cls a `struct CommunicatorMessageContext`
4426  * @param be the send message that was sent
4427  * @return #GNUNET_YES if message is well-formed
4428  */
4429 static int
4430 check_backchannel_encapsulation (void *cls,
4431                                  const struct TransportBackchannelEncapsulationMessage *be)
4432 {
4433   uint16_t size = ntohs (be->header.size);
4434
4435   (void) cls;
4436   if (size - sizeof (*be) <
4437       sizeof (struct TransportBackchannelRequestPayload) +
4438       sizeof (struct GNUNET_MessageHeader) )
4439   {
4440     GNUNET_break_op (0);
4441     return GNUNET_SYSERR;
4442   }
4443   return GNUNET_YES;
4444 }
4445
4446
4447 /**
4448  * Communicator gave us a backchannel encapsulation.  Process the request.
4449  * (We are not the origin of the backchannel here, the communicator simply
4450  * received a backchannel message and we are expected to forward it.)
4451  *
4452  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4453  * @param be the message that was received
4454  */
4455 static void
4456 handle_backchannel_encapsulation (void *cls,
4457                                   const struct TransportBackchannelEncapsulationMessage *be)
4458 {
4459   struct CommunicatorMessageContext *cmc = cls;
4460   struct BackchannelKeyState key;
4461   struct GNUNET_HashCode hmac;
4462   const char *hdr;
4463   size_t hdr_len;
4464
4465   if (0 != GNUNET_memcmp (&be->target,
4466                           &GST_my_identity))
4467   {
4468     /* not for me, try to route to target */
4469     /* FIXME: someone needs to update be->distance! */
4470     /* FIXME: BE routing can be special, should we put all of this
4471        on 'route_message'? Maybe at least pass some more arguments? */
4472     route_message (&be->target,
4473                    GNUNET_copy_message (&be->header));
4474     finish_cmc_handling (cmc);
4475     return;
4476   }
4477   dh_key_derive_eph_pub (&be->ephemeral_key,
4478                          &be->iv,
4479                          &key);
4480   hdr = (const char *) &be[1];
4481   hdr_len = ntohs (be->header.size) - sizeof (*be);
4482   bc_hmac (&key,
4483            &hmac,
4484            hdr,
4485            hdr_len);
4486   if (0 !=
4487       GNUNET_memcmp (&hmac,
4488                      &be->hmac))
4489   {
4490     /* HMAC missmatch, disard! */
4491     GNUNET_break_op (0);
4492     finish_cmc_handling (cmc);
4493     return;
4494   }
4495   /* begin actual decryption */
4496   {
4497     struct TransportBackchannelRequestPayload ppay;
4498     char body[hdr_len - sizeof (ppay)];
4499
4500     GNUNET_assert (hdr_len >= sizeof (ppay) + sizeof (struct GNUNET_MessageHeader));
4501     bc_decrypt (&key,
4502                 &ppay,
4503                 hdr,
4504                 sizeof (ppay));
4505     bc_decrypt (&key,
4506                 &body,
4507                 &hdr[sizeof (ppay)],
4508                 hdr_len - sizeof (ppay));
4509     bc_key_clean (&key);
4510     // FIXME: verify signatures in ppay!
4511     // => check if ephemeral key is known & valid, if not
4512     // => verify sig, cache ephemeral key
4513     // => update monotonic_time of sender for replay detection
4514
4515     // FIXME: forward to specified communicator!
4516     // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
4517   }
4518   finish_cmc_handling (cmc);
4519 }
4520
4521
4522 /**
4523  * Task called when we should check if any of the DV paths
4524  * we have learned to a target are due for garbage collection.
4525  *
4526  * Collects stale paths, and possibly frees the entire DV
4527  * entry if no paths are left. Otherwise re-schedules itself.
4528  *
4529  * @param cls a `struct DistanceVector`
4530  */
4531 static void
4532 path_cleanup_cb (void *cls)
4533 {
4534   struct DistanceVector *dv = cls;
4535   struct DistanceVectorHop *pos;
4536
4537   dv->timeout_task = NULL;
4538   while (NULL != (pos = dv->dv_head))
4539   {
4540     GNUNET_assert (dv == pos->dv);
4541     if (GNUNET_TIME_absolute_get_remaining (pos->timeout).rel_value_us > 0)
4542       break;
4543     free_distance_vector_hop (pos);
4544   }
4545   if (NULL == pos)
4546   {
4547     free_dv_route (dv);
4548     return;
4549   }
4550   dv->timeout_task = GNUNET_SCHEDULER_add_at (pos->timeout,
4551                                               &path_cleanup_cb,
4552                                               dv);
4553 }
4554
4555
4556 /**
4557  * We have learned a @a path through the network to some other peer, add it to
4558  * our DV data structure (returning #GNUNET_YES on success).
4559  *
4560  * We do not add paths if we have a sufficient number of shorter
4561  * paths to this target already (returning #GNUNET_NO).
4562  *
4563  * We also do not add problematic paths, like those where we lack the first
4564  * hop in our neighbour list (i.e. due to a topology change) or where some
4565  * non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
4566  *
4567  * @param path the path we learned, path[0] should be us,
4568  *             and then path contains a valid path from us to `path[path_len-1]`
4569  *             path[1] should be a direct neighbour (we should check!)
4570  * @param path_len number of entries on the @a path, at least three!
4571  * @param network_latency how long does the message take from us to `path[path_len-1]`?
4572  *          set to "forever" if unknown
4573  * @return #GNUNET_YES on success,
4574  *         #GNUNET_NO if we have better path(s) to the target
4575  *         #GNUNET_SYSERR if the path is useless and/or invalid
4576  *                         (i.e. path[1] not a direct neighbour
4577  *                        or path[i+1] is a direct neighbour for i>0)
4578  */
4579 static int
4580 learn_dv_path (const struct GNUNET_PeerIdentity *path,
4581                unsigned int path_len,
4582                struct GNUNET_TIME_Relative network_latency)
4583 {
4584   struct DistanceVectorHop *hop;
4585   struct DistanceVector *dv;
4586   struct Neighbour *next_hop;
4587   unsigned int shorter_distance;
4588
4589   if (path_len < 3)
4590   {
4591     /* what a boring path! not allowed! */
4592     GNUNET_break (0);
4593     return GNUNET_SYSERR;
4594   }
4595   GNUNET_assert (0 ==
4596                  GNUNET_memcmp (&GST_my_identity,
4597                                 &path[0]));
4598   next_hop = GNUNET_CONTAINER_multipeermap_get (neighbours,
4599                                                 &path[1]);
4600   if (NULL == next_hop)
4601   {
4602     /* next hop must be a neighbour, otherwise this whole thing is useless! */
4603     GNUNET_break (0);
4604     return GNUNET_SYSERR;
4605   }
4606   for (unsigned int i=2;i<path_len;i++)
4607     if (NULL !=
4608         GNUNET_CONTAINER_multipeermap_get (neighbours,
4609                                            &path[i]))
4610     {
4611       /* Useless path, we have a direct connection to some hop
4612          in the middle of the path, so this one doesn't even
4613          seem terribly useful for redundancy */
4614       return GNUNET_SYSERR;
4615     }
4616   dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
4617                                           &path[path_len - 1]);
4618   if (NULL == dv)
4619   {
4620     dv = GNUNET_new (struct DistanceVector);
4621     dv->target = path[path_len - 1];
4622     dv->timeout_task = GNUNET_SCHEDULER_add_delayed (DV_PATH_VALIDITY_TIMEOUT,
4623                                                      &path_cleanup_cb,
4624                                                      dv);
4625     GNUNET_assert (GNUNET_OK ==
4626                    GNUNET_CONTAINER_multipeermap_put (dv_routes,
4627                                                       &dv->target,
4628                                                       dv,
4629                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4630   }
4631   /* Check if we have this path already! */
4632   shorter_distance = 0;
4633   for (struct DistanceVectorHop *pos = dv->dv_head;
4634        NULL != pos;
4635        pos = pos->next_dv)
4636   {
4637     if (pos->distance < path_len - 2)
4638       shorter_distance++;
4639     /* Note that the distances in 'pos' excludes us (path[0]) and
4640        the next_hop (path[1]), so we need to subtract two
4641        and check next_hop explicitly */
4642     if ( (pos->distance == path_len - 2) &&
4643          (pos->next_hop == next_hop) )
4644     {
4645       int match = GNUNET_YES;
4646
4647       for (unsigned int i=0;i<pos->distance;i++)
4648       {
4649         if (0 !=
4650             GNUNET_memcmp (&pos->path[i],
4651                            &path[i+2]))
4652         {
4653           match = GNUNET_NO;
4654           break;
4655         }
4656       }
4657       if (GNUNET_YES == match)
4658       {
4659         struct GNUNET_TIME_Relative last_timeout;
4660
4661         /* Re-discovered known path, update timeout */
4662         GNUNET_STATISTICS_update (GST_stats,
4663                                   "# Known DV path refreshed",
4664                                   1,
4665                                   GNUNET_NO);
4666         last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
4667         pos->timeout
4668           = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
4669         GNUNET_CONTAINER_MDLL_remove (dv,
4670                                       dv->dv_head,
4671                                       dv->dv_tail,
4672                                       pos);
4673         GNUNET_CONTAINER_MDLL_insert (dv,
4674                                       dv->dv_head,
4675                                       dv->dv_tail,
4676                                       pos);
4677         if (last_timeout.rel_value_us <
4678             GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
4679                                            DV_PATH_DISCOVERY_FREQUENCY).rel_value_us)
4680         {
4681           /* Some peer send DV learn messages too often, we are learning
4682              the same path faster than it would be useful; do not forward! */
4683           return GNUNET_NO;
4684         }
4685         return GNUNET_YES;
4686       }
4687     }
4688   }
4689   /* Count how many shorter paths we have (incl. direct
4690      neighbours) before simply giving up on this one! */
4691   if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
4692   {
4693     /* We have a shorter path already! */
4694     return GNUNET_NO;
4695   }
4696   /* create new DV path entry */
4697   hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
4698                        sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
4699   hop->next_hop = next_hop;
4700   hop->dv = dv;
4701   hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
4702   memcpy (&hop[1],
4703           &path[2],
4704           sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
4705   hop->timeout = GNUNET_TIME_relative_to_absolute (DV_PATH_VALIDITY_TIMEOUT);
4706   hop->distance = path_len - 2;
4707   GNUNET_CONTAINER_MDLL_insert (dv,
4708                                 dv->dv_head,
4709                                 dv->dv_tail,
4710                                 hop);
4711   GNUNET_CONTAINER_MDLL_insert (neighbour,
4712                                 next_hop->dv_head,
4713                                 next_hop->dv_tail,
4714                                 hop);
4715   return GNUNET_YES;
4716 }
4717
4718
4719 /**
4720  * Communicator gave us a DV learn message.  Check the message.
4721  *
4722  * @param cls a `struct CommunicatorMessageContext`
4723  * @param dvl the send message that was sent
4724  * @return #GNUNET_YES if message is well-formed
4725  */
4726 static int
4727 check_dv_learn (void *cls,
4728                 const struct TransportDVLearn *dvl)
4729 {
4730   uint16_t size = ntohs (dvl->header.size);
4731   uint16_t num_hops = ntohs (dvl->num_hops);
4732   const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
4733
4734   (void) cls;
4735   if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
4736   {
4737     GNUNET_break_op (0);
4738     return GNUNET_SYSERR;
4739   }
4740   if (num_hops > MAX_DV_HOPS_ALLOWED)
4741   {
4742     GNUNET_break_op (0);
4743     return GNUNET_SYSERR;
4744   }
4745   for (unsigned int i=0;i<num_hops;i++)
4746   {
4747     if (0 == GNUNET_memcmp (&dvl->initiator,
4748                             &hops[i].hop))
4749     {
4750       GNUNET_break_op (0);
4751       return GNUNET_SYSERR;
4752     }
4753     if (0 == GNUNET_memcmp (&GST_my_identity,
4754                             &hops[i].hop))
4755     {
4756       GNUNET_break_op (0);
4757       return GNUNET_SYSERR;
4758     }
4759   }
4760   return GNUNET_YES;
4761 }
4762
4763
4764 /**
4765  * Build and forward a DV learn message to @a next_hop.
4766  *
4767  * @param next_hop peer to send the message to
4768  * @param msg message received
4769  * @param bi_history bitmask specifying hops on path that were bidirectional
4770  * @param nhops length of the @a hops array
4771  * @param hops path the message traversed so far
4772  * @param in_time when did we receive the message, used to calculate network delay
4773  */
4774 static void
4775 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
4776                   const struct TransportDVLearn *msg,
4777                   uint16_t bi_history,
4778                   uint16_t nhops,
4779                   const struct DVPathEntryP *hops,
4780                   struct GNUNET_TIME_Absolute in_time)
4781 {
4782   struct DVPathEntryP *dhops;
4783   struct TransportDVLearn *fwd;
4784   struct GNUNET_TIME_Relative nnd;
4785
4786   /* compute message for forwarding */
4787   GNUNET_assert (nhops < MAX_DV_HOPS_ALLOWED);
4788   fwd = GNUNET_malloc (sizeof (struct TransportDVLearn) +
4789                        (nhops + 1) * sizeof (struct DVPathEntryP));
4790   fwd->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
4791   fwd->header.size = htons (sizeof (struct TransportDVLearn) +
4792                             (nhops + 1) * sizeof (struct DVPathEntryP));
4793   fwd->num_hops = htons (nhops + 1);
4794   fwd->bidirectional = htons (bi_history);
4795   nnd = GNUNET_TIME_relative_add (GNUNET_TIME_absolute_get_duration (in_time),
4796                                   GNUNET_TIME_relative_ntoh (msg->non_network_delay));
4797   fwd->non_network_delay = GNUNET_TIME_relative_hton (nnd);
4798   fwd->init_sig = msg->init_sig;
4799   fwd->initiator = msg->initiator;
4800   fwd->challenge = msg->challenge;
4801   dhops = (struct DVPathEntryP *) &fwd[1];
4802   GNUNET_memcpy (dhops,
4803                  hops,
4804                  sizeof (struct DVPathEntryP) * nhops);
4805   dhops[nhops].hop = GST_my_identity;
4806   {
4807     struct DvHopPS dhp = {
4808       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
4809       .purpose.size = htonl (sizeof (dhp)),
4810       .pred = dhops[nhops-1].hop,
4811       .succ = *next_hop,
4812       .challenge = msg->challenge
4813     };
4814
4815     GNUNET_assert (GNUNET_OK ==
4816                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
4817                                              &dhp.purpose,
4818                                              &dhops[nhops].hop_sig));
4819   }
4820   route_message (next_hop,
4821                  &fwd->header);
4822 }
4823
4824
4825 /**
4826  * Check signature of type #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR
4827  *
4828  * @param init the signer
4829  * @param challenge the challenge that was signed
4830  * @param init_sig signature presumably by @a init
4831  * @return #GNUNET_OK if the signature is valid
4832  */
4833 static int
4834 validate_dv_initiator_signature (const struct GNUNET_PeerIdentity *init,
4835                                  const struct GNUNET_ShortHashCode *challenge,
4836                                  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
4837 {
4838   struct DvInitPS ip = {
4839     .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
4840     .purpose.size = htonl (sizeof (ip)),
4841     .challenge = *challenge
4842   };
4843
4844   if (GNUNET_OK !=
4845       GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR,
4846                                   &ip.purpose,
4847                                   init_sig,
4848                                   &init->public_key))
4849   {
4850     GNUNET_break_op (0);
4851     return GNUNET_SYSERR;
4852   }
4853   return GNUNET_OK;
4854 }
4855
4856
4857 /**
4858  * Communicator gave us a DV learn message.  Process the request.
4859  *
4860  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
4861  * @param dvl the message that was received
4862  */
4863 static void
4864 handle_dv_learn (void *cls,
4865                  const struct TransportDVLearn *dvl)
4866 {
4867   struct CommunicatorMessageContext *cmc = cls;
4868   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
4869   int bi_hop;
4870   uint16_t nhops;
4871   uint16_t bi_history;
4872   const struct DVPathEntryP *hops;
4873   int do_fwd;
4874   int did_initiator;
4875   struct GNUNET_TIME_Absolute in_time;
4876
4877   nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
4878   bi_history = ntohs (dvl->bidirectional);
4879   hops = (const struct DVPathEntryP *) &dvl[1];
4880   if (0 == nhops)
4881   {
4882     /* sanity check */
4883     if (0 != GNUNET_memcmp (&dvl->initiator,
4884                             &cmc->im.sender))
4885     {
4886       GNUNET_break (0);
4887       finish_cmc_handling (cmc);
4888       return;
4889     }
4890   }
4891   else
4892   {
4893     /* sanity check */
4894     if (0 != GNUNET_memcmp (&hops[nhops - 1].hop,
4895                             &cmc->im.sender))
4896     {
4897       GNUNET_break (0);
4898       finish_cmc_handling (cmc);
4899       return;
4900     }
4901   }
4902
4903   GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
4904   cc = cmc->tc->details.communicator.cc;
4905   bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE == cc); // FIXME: add bi-directional flag to cc?
4906   in_time = GNUNET_TIME_absolute_get ();
4907
4908   /* continue communicator here, everything else can happen asynchronous! */
4909   finish_cmc_handling (cmc);
4910
4911   /* OPTIMIZE-FIXME: Technically, we only need to bother checking
4912      the initiator signature if we send the message back to the initiator... */
4913   if (GNUNET_OK !=
4914       validate_dv_initiator_signature (&dvl->initiator,
4915                                        &dvl->challenge,
4916                                        &dvl->init_sig))
4917   {
4918     GNUNET_break_op (0);
4919     return;
4920   }
4921   // FIXME: asynchronously (!) verify hop-by-hop signatures!
4922   // => if signature verification load too high, implement random drop strategy!?
4923
4924   do_fwd = GNUNET_YES;
4925   if (0 == GNUNET_memcmp (&GST_my_identity,
4926                           &dvl->initiator))
4927   {
4928     struct GNUNET_PeerIdentity path[nhops + 1];
4929     struct GNUNET_TIME_Relative host_latency_sum;
4930     struct GNUNET_TIME_Relative latency;
4931     struct GNUNET_TIME_Relative network_latency;
4932
4933     /* We initiated this, learn the forward path! */
4934     path[0] = GST_my_identity;
4935     path[1] = hops[0].hop;
4936     host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
4937
4938     // Need also something to lookup initiation time
4939     // to compute RTT! -> add RTT argument here?
4940     latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
4941     // (based on dvl->challenge, we can identify time of origin!)
4942
4943     network_latency = GNUNET_TIME_relative_subtract (latency,
4944                                                      host_latency_sum);
4945     /* assumption: latency on all links is the same */
4946     network_latency = GNUNET_TIME_relative_divide (network_latency,
4947                                                    nhops);
4948
4949     for (unsigned int i=2;i<=nhops;i++)
4950     {
4951       struct GNUNET_TIME_Relative ilat;
4952
4953       /* assumption: linear latency increase per hop */
4954       ilat = GNUNET_TIME_relative_multiply (network_latency,
4955                                             i);
4956       path[i] = hops[i-1].hop;
4957       // FIXME: mark ALL of these as *confirmed* (with what timeout?)
4958       // -- and schedule a job for the confirmation to time out! --
4959       // and possibly do #cores_send_connect_info() if
4960       // the respective neighbour is NOT confirmed yet!
4961       learn_dv_path (path,
4962                      i,
4963                      ilat);
4964     }
4965     /* as we initiated, do not forward again (would be circular!) */
4966     do_fwd = GNUNET_NO;
4967     return;
4968   }
4969   else if (bi_hop)
4970   {
4971     /* last hop was bi-directional, we could learn something here! */
4972     struct GNUNET_PeerIdentity path[nhops + 2];
4973
4974     path[0] = GST_my_identity;
4975     path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
4976     for (unsigned int i=0;i<nhops;i++)
4977     {
4978       int iret;
4979
4980       if (0 == (bi_history & (1 << i)))
4981         break; /* i-th hop not bi-directional, stop learning! */
4982       if (i == nhops)
4983       {
4984         path[i + 2] = dvl->initiator;
4985       }
4986       else
4987       {
4988         path[i + 2] = hops[nhops - i - 2].hop;
4989       }
4990
4991       iret = learn_dv_path (path,
4992                             i + 2,
4993                             GNUNET_TIME_UNIT_FOREVER_REL);
4994       if (GNUNET_SYSERR == iret)
4995       {
4996         /* path invalid or too long to be interesting for US, thus should also
4997            not be interesting to our neighbours, cut path when forwarding to
4998            'i' hops, except of course for the one that goes back to the
4999            initiator */
5000         GNUNET_STATISTICS_update (GST_stats,
5001                                   "# DV learn not forwarded due invalidity of path",
5002                                   1,
5003                                   GNUNET_NO);
5004         do_fwd = GNUNET_NO;
5005         break;
5006       }
5007       if ( (GNUNET_NO == iret) &&
5008            (nhops == i + 1) )
5009       {
5010         /* we have better paths, and this is the longest target,
5011            so there cannot be anything interesting later */
5012         GNUNET_STATISTICS_update (GST_stats,
5013                                   "# DV learn not forwarded, got better paths",
5014                                   1,
5015                                   GNUNET_NO);
5016         do_fwd = GNUNET_NO;
5017         break;
5018       }
5019     }
5020   }
5021
5022   if (MAX_DV_HOPS_ALLOWED == nhops)
5023   {
5024     /* At limit, we're out of here! */
5025     finish_cmc_handling (cmc);
5026     return;
5027   }
5028
5029   /* Forward to initiator, if path non-trivial and possible */
5030   bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
5031   did_initiator = GNUNET_NO;
5032   if ( (1 < nhops) &&
5033        (GNUNET_YES ==
5034         GNUNET_CONTAINER_multipeermap_contains (neighbours,
5035                                                 &dvl->initiator)) )
5036   {
5037     /* send back to origin! */
5038     forward_dv_learn (&dvl->initiator,
5039                       dvl,
5040                       bi_history,
5041                       nhops,
5042                       hops,
5043                       in_time);
5044     did_initiator = GNUNET_YES;
5045   }
5046   /* We forward under two conditions: either we still learned something
5047      ourselves (do_fwd), or the path was darn short and thus the initiator is
5048      likely to still be very interested in this (and we did NOT already
5049      send it back to the initiator) */
5050   if ( (do_fwd) ||
5051        ( (nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
5052          (GNUNET_NO == did_initiator) ) )
5053   {
5054     /* FIXME: loop over all neighbours, pick those with low
5055        queues AND that are not yet on the path; possibly
5056        adapt threshold to nhops! */
5057 #if FIXME
5058     forward_dv_learn (NULL, // fill in peer from iterator here!
5059                       dvl,
5060                       bi_history,
5061                       nhops,
5062                       hops,
5063                       in_time);
5064 #endif
5065   }
5066 }
5067
5068
5069 /**
5070  * Communicator gave us a DV box.  Check the message.
5071  *
5072  * @param cls a `struct CommunicatorMessageContext`
5073  * @param dvb the send message that was sent
5074  * @return #GNUNET_YES if message is well-formed
5075  */
5076 static int
5077 check_dv_box (void *cls,
5078               const struct TransportDVBox *dvb)
5079 {
5080   uint16_t size = ntohs (dvb->header.size);
5081   uint16_t num_hops = ntohs (dvb->num_hops);
5082   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
5083   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
5084   uint16_t isize;
5085   uint16_t itype;
5086
5087   (void) cls;
5088   if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
5089   {
5090     GNUNET_break_op (0);
5091     return GNUNET_SYSERR;
5092   }
5093   isize = ntohs (inbox->size);
5094   if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
5095   {
5096     GNUNET_break_op (0);
5097     return GNUNET_SYSERR;
5098   }
5099   itype = ntohs (inbox->type);
5100   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
5101        (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
5102   {
5103     GNUNET_break_op (0);
5104     return GNUNET_SYSERR;
5105   }
5106   if (0 ==
5107       GNUNET_memcmp (&dvb->origin,
5108                      &GST_my_identity))
5109   {
5110     GNUNET_break_op (0);
5111     return GNUNET_SYSERR;
5112   }
5113   return GNUNET_YES;
5114 }
5115
5116
5117 /**
5118  * Create a DV Box message and queue it for transmission to
5119  * @ea next_hop.
5120  *
5121  * @param next_hop peer to receive the message next
5122  * @param total_hops how many hops did the message take so far
5123  * @param num_hops length of the @a hops array
5124  * @param origin origin of the message
5125  * @param hops next peer(s) to the destination, including destination
5126  * @param payload payload of the box
5127  * @param payload_size number of bytes in @a payload
5128  */
5129 static void
5130 forward_dv_box (struct Neighbour *next_hop,
5131                 uint16_t total_hops,
5132                 uint16_t num_hops,
5133                 const struct GNUNET_PeerIdentity *origin,
5134                 const struct GNUNET_PeerIdentity *hops,
5135                 const void *payload,
5136                 uint16_t payload_size)
5137 {
5138   struct TransportDVBox *dvb;
5139   struct GNUNET_PeerIdentity *dhops;
5140
5141   GNUNET_assert (UINT16_MAX <
5142                  sizeof (struct TransportDVBox) +
5143                  sizeof (struct GNUNET_PeerIdentity) * num_hops +
5144                  payload_size);
5145   dvb = GNUNET_malloc (sizeof (struct TransportDVBox) +
5146                        sizeof (struct GNUNET_PeerIdentity) * num_hops +
5147                        payload_size);
5148   dvb->header.size = htons (sizeof (struct TransportDVBox) +
5149                             sizeof (struct GNUNET_PeerIdentity) * num_hops +
5150                             payload_size);
5151   dvb->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX);
5152   dvb->total_hops = htons (total_hops);
5153   dvb->num_hops = htons (num_hops);
5154   dvb->origin = *origin;
5155   dhops = (struct GNUNET_PeerIdentity *) &dvb[1];
5156   memcpy (dhops,
5157           hops,
5158           num_hops * sizeof (struct GNUNET_PeerIdentity));
5159   memcpy (&dhops[num_hops],
5160           payload,
5161           payload_size);
5162   route_message (&next_hop->pid,
5163                  &dvb->header);
5164 }
5165
5166
5167 /**
5168  * Communicator gave us a DV box.  Process the request.
5169  *
5170  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
5171  * @param dvb the message that was received
5172  */
5173 static void
5174 handle_dv_box (void *cls,
5175                const struct TransportDVBox *dvb)
5176 {
5177   struct CommunicatorMessageContext *cmc = cls;
5178   uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
5179   uint16_t num_hops = ntohs (dvb->num_hops);
5180   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
5181   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
5182
5183   if (num_hops > 0)
5184   {
5185     /* We're trying from the end of the hops array, as we may be
5186        able to find a shortcut unknown to the origin that way */
5187     for (int i=num_hops-1;i>=0;i--)
5188     {
5189       struct Neighbour *n;
5190
5191       if (0 ==
5192           GNUNET_memcmp (&hops[i],
5193                          &GST_my_identity))
5194       {
5195         GNUNET_break_op (0);
5196         finish_cmc_handling (cmc);
5197         return;
5198       }
5199       n = GNUNET_CONTAINER_multipeermap_get (neighbours,
5200                                              &hops[i]);
5201       if (NULL == n)
5202         continue;
5203       forward_dv_box (n,
5204                       ntohs (dvb->total_hops) + 1,
5205                       num_hops - i - 1, /* number of hops left */
5206                       &dvb->origin,
5207                       &hops[i+1], /* remaining hops */
5208                       (const void *) &dvb[1],
5209                       size);
5210       finish_cmc_handling (cmc);
5211       return;
5212     }
5213     /* Woopsie, next hop not in neighbours, drop! */
5214     GNUNET_STATISTICS_update (GST_stats,
5215                               "# DV Boxes dropped: next hop unknown",
5216                               1,
5217                               GNUNET_NO);
5218     finish_cmc_handling (cmc);
5219     return;
5220   }
5221   /* We are the target. Unbox and handle message. */
5222   cmc->im.sender = dvb->origin;
5223   cmc->total_hops = ntohs (dvb->total_hops);
5224   demultiplex_with_cmc (cmc,
5225                         inbox);
5226 }
5227
5228
5229 /**
5230  * Client notified us about transmission from a peer.  Process the request.
5231  *
5232  * @param cls a `struct TransportClient` which sent us the message
5233  * @param obm the send message that was sent
5234  * @return #GNUNET_YES if message is well-formed
5235  */
5236 static int
5237 check_incoming_msg (void *cls,
5238                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
5239 {
5240   struct TransportClient *tc = cls;
5241
5242   if (CT_COMMUNICATOR != tc->type)
5243   {
5244     GNUNET_break (0);
5245     return GNUNET_SYSERR;
5246   }
5247   GNUNET_MQ_check_boxed_message (im);
5248   return GNUNET_OK;
5249 }
5250
5251
5252 /**
5253  * Communicator gave us a transport address validation challenge.  Process the request.
5254  *
5255  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
5256  * @param tvc the message that was received
5257  */
5258 static void
5259 handle_validation_challenge (void *cls,
5260                              const struct TransportValidationChallenge *tvc)
5261 {
5262   struct CommunicatorMessageContext *cmc = cls;
5263   struct TransportValidationResponse *tvr;
5264
5265   if (cmc->total_hops > 0)
5266   {
5267     /* DV routing is not allowed for validation challenges! */
5268     GNUNET_break_op (0);
5269     finish_cmc_handling (cmc);
5270     return;
5271   }
5272   tvr = GNUNET_new (struct TransportValidationResponse);
5273   tvr->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE);
5274   tvr->header.size = htons (sizeof (*tvr));
5275   tvr->challenge = tvc->challenge;
5276   tvr->origin_time = tvc->sender_time;
5277   tvr->validity_duration = cmc->im.expected_address_validity;
5278   {
5279     /* create signature */
5280     struct TransportValidationPS tvp = {
5281       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
5282       .purpose.size = htonl (sizeof (tvp)),
5283       .validity_duration = tvr->validity_duration,
5284       .challenge = tvc->challenge
5285     };
5286
5287     GNUNET_assert (GNUNET_OK ==
5288                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
5289                                              &tvp.purpose,
5290                                              &tvr->signature));
5291   }
5292   route_message (&cmc->im.sender,
5293                  &tvr->header);
5294   finish_cmc_handling (cmc);
5295 }
5296
5297
5298 /**
5299  * Closure for #check_known_challenge.
5300  */
5301 struct CheckKnownChallengeContext
5302 {
5303   /**
5304    * Set to the challenge we are looking for.
5305    */
5306   const struct GNUNET_ShortHashCode *challenge;
5307
5308   /**
5309    * Set to a matching validation state, if one was found.
5310    */
5311   struct ValidationState *vs;
5312 };
5313
5314
5315 /**
5316  * Test if the validation state in @a value matches the
5317  * challenge from @a cls.
5318  *
5319  * @param cls a `struct CheckKnownChallengeContext`
5320  * @param pid unused (must match though)
5321  * @param value a `struct ValidationState`
5322  * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
5323  */
5324 static int
5325 check_known_challenge (void *cls,
5326                        const struct GNUNET_PeerIdentity *pid,
5327                        void *value)
5328 {
5329   struct CheckKnownChallengeContext *ckac = cls;
5330   struct ValidationState *vs = value;
5331
5332   (void) pid;
5333   if (0 != GNUNET_memcmp (&vs->challenge,
5334                           ckac->challenge))
5335     return GNUNET_OK;
5336   ckac->vs = vs;
5337   return GNUNET_NO;
5338 }
5339
5340
5341 /**
5342  * Function called when peerstore is done storing a
5343  * validated address.
5344  *
5345  * @param cls a `struct ValidationState`
5346  * @param success #GNUNET_YES on success
5347  */
5348 static void
5349 peerstore_store_validation_cb (void *cls,
5350                                int success)
5351 {
5352   struct ValidationState *vs = cls;
5353
5354   vs->sc = NULL;
5355   if (GNUNET_YES == success)
5356     return;
5357   GNUNET_STATISTICS_update (GST_stats,
5358                             "# Peerstore failed to store foreign address",
5359                             1,
5360                             GNUNET_NO);
5361 }
5362
5363
5364 /**
5365  * Task run periodically to validate some address based on #validation_heap.
5366  *
5367  * @param cls NULL
5368  */
5369 static void
5370 validation_start_cb (void *cls);
5371
5372
5373 /**
5374  * Set the time for next_challenge of @a vs to @a new_time.
5375  * Updates the heap and if necessary reschedules the job.
5376  *
5377  * @param vs validation state to update
5378  * @param new_time new time for revalidation
5379  */
5380 static void
5381 update_next_challenge_time (struct ValidationState *vs,
5382                             struct GNUNET_TIME_Absolute new_time)
5383 {
5384   struct GNUNET_TIME_Relative delta;
5385
5386   if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
5387     return; /* be lazy */
5388   vs->next_challenge = new_time;
5389   if (NULL == vs->hn)
5390     vs->hn = GNUNET_CONTAINER_heap_insert (validation_heap,
5391                                            vs,
5392                                            new_time.abs_value_us);
5393   else
5394     GNUNET_CONTAINER_heap_update_cost (vs->hn,
5395                                        new_time.abs_value_us);
5396   if ( (vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
5397        (NULL != validation_task) )
5398     return;
5399   if (NULL != validation_task)
5400     GNUNET_SCHEDULER_cancel (validation_task);
5401   /* randomize a bit */
5402   delta.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
5403                                                  MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
5404   new_time = GNUNET_TIME_absolute_add (new_time,
5405                                        delta);
5406   validation_task = GNUNET_SCHEDULER_add_at (new_time,
5407                                              &validation_start_cb,
5408                                              NULL);
5409 }
5410
5411
5412 /**
5413  * Find the queue matching @a pid and @a address.
5414  *
5415  * @param pid peer the queue must go to
5416  * @param address address the queue must use
5417  * @return NULL if no such queue exists
5418  */
5419 static struct Queue *
5420 find_queue (const struct GNUNET_PeerIdentity *pid,
5421             const char *address)
5422 {
5423   struct Neighbour *n;
5424
5425   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
5426                                          pid);
5427   if (NULL == n)
5428     return NULL;
5429   for (struct Queue *pos = n->queue_head;
5430        NULL != pos;
5431        pos = pos->next_neighbour)
5432   {
5433     if (0 == strcmp (pos->address,
5434                      address))
5435       return pos;
5436   }
5437   return NULL;
5438 }
5439
5440
5441 /**
5442  * Task run periodically to check whether the validity of the given queue has
5443  * run its course. If so, finds either another queue to take over, or clears
5444  * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
5445  * chance to take over, and if that fails, notifies CORE about the disconnect.
5446  *
5447  * @param cls a `struct Queue`
5448  */
5449 static void
5450 core_queue_visibility_check (void *cls)
5451 {
5452   struct Queue *q = cls;
5453
5454   q->visibility_task = NULL;
5455   if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
5456   {
5457     q->visibility_task
5458       = GNUNET_SCHEDULER_add_at (q->validated_until,
5459                                  &core_queue_visibility_check,
5460                                  q);
5461     return;
5462   }
5463   update_neighbour_core_visibility (q->neighbour);
5464 }
5465
5466
5467 /**
5468  * Check whether the CORE visibility of @a n should change.  Finds either a
5469  * queue to preserve the visibility, or clears the neighbour's `core_visible`
5470  * flag. In the latter case, gives DV routes a chance to take over, and if
5471  * that fails, notifies CORE about the disconnect.  If so, check whether we
5472  * need to notify CORE.
5473  *
5474  * @param n neighbour to perform the check for
5475  */
5476 static void
5477 update_neighbour_core_visibility (struct Neighbour *n)
5478 {
5479   struct DistanceVector *dv;
5480
5481   GNUNET_assert (GNUNET_YES == n->core_visible);
5482   /* Check if _any_ queue of this neighbour is still valid, if so, schedule
5483      the #core_queue_visibility_check() task for that queue */
5484   for (struct Queue *q = n->queue_head;
5485        NULL != q;
5486        q = q->next_neighbour)
5487   {
5488     if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
5489     {
5490       /* found a valid queue, use this one */
5491       q->visibility_task
5492         = GNUNET_SCHEDULER_add_at (q->validated_until,
5493                                    &core_queue_visibility_check,
5494                                    q);
5495       return;
5496     }
5497   }
5498   n->core_visible = GNUNET_NO;
5499
5500   /* Check if _any_ DV route to this neighbour is currently
5501      valid, if so, do NOT tell core about the loss of direct
5502      connectivity (DV still counts!) */
5503   dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
5504                                           &n->pid);
5505   if (GNUNET_YES == dv->core_visible)
5506     return;
5507   /* Nothing works anymore, need to tell CORE about the loss of
5508      connectivity! */
5509   cores_send_disconnect_info (&n->pid);
5510 }
5511
5512
5513 /**
5514  * Communicator gave us a transport address validation response.  Process the request.
5515  *
5516  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
5517  * @param tvr the message that was received
5518  */
5519 static void
5520 handle_validation_response (void *cls,
5521                             const struct TransportValidationResponse *tvr)
5522 {
5523   struct CommunicatorMessageContext *cmc = cls;
5524   struct ValidationState *vs;
5525   struct CheckKnownChallengeContext ckac = {
5526     .challenge = &tvr->challenge,
5527     .vs = NULL
5528   };
5529   struct GNUNET_TIME_Absolute origin_time;
5530   struct Queue *q;
5531   struct DistanceVector *dv;
5532
5533   /* check this is one of our challenges */
5534   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
5535                                                      &cmc->im.sender,
5536                                                      &check_known_challenge,
5537                                                      &ckac);
5538   if (NULL == (vs = ckac.vs))
5539   {
5540     /* This can happen simply if we 'forgot' the challenge by now,
5541        i.e. because we received the validation response twice */
5542     GNUNET_STATISTICS_update (GST_stats,
5543                               "# Validations dropped, challenge unknown",
5544                               1,
5545                               GNUNET_NO);
5546     finish_cmc_handling (cmc);
5547     return;
5548   }
5549
5550   /* sanity check on origin time */
5551   origin_time = GNUNET_TIME_absolute_ntoh (tvr->origin_time);
5552   if ( (origin_time.abs_value_us < vs->first_challenge_use.abs_value_us) ||
5553        (origin_time.abs_value_us > vs->last_challenge_use.abs_value_us) )
5554   {
5555     GNUNET_break_op (0);
5556     finish_cmc_handling (cmc);
5557     return;
5558   }
5559
5560   {
5561     /* check signature */
5562     struct TransportValidationPS tvp = {
5563       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE),
5564       .purpose.size = htonl (sizeof (tvp)),
5565       .validity_duration = tvr->validity_duration,
5566       .challenge = tvr->challenge
5567     };
5568
5569     if (GNUNET_OK !=
5570         GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_CHALLENGE,
5571                                     &tvp.purpose,
5572                                     &tvr->signature,
5573                                     &cmc->im.sender.public_key))
5574     {
5575       GNUNET_break_op (0);
5576       finish_cmc_handling (cmc);
5577       return;
5578     }
5579   }
5580
5581   /* validity is capped by our willingness to keep track of the
5582      validation entry and the maximum the other peer allows */
5583   vs->valid_until
5584     = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_min (GNUNET_TIME_relative_ntoh (tvr->validity_duration),
5585                                                                   MAX_ADDRESS_VALID_UNTIL));
5586   vs->validated_until
5587     = GNUNET_TIME_absolute_min (vs->valid_until,
5588                                 GNUNET_TIME_relative_to_absolute (ADDRESS_VALIDATION_LIFETIME));
5589   vs->validation_rtt = GNUNET_TIME_absolute_get_duration (origin_time);
5590   vs->challenge_backoff = GNUNET_TIME_UNIT_ZERO;
5591   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
5592                               &vs->challenge,
5593                               sizeof (vs->challenge));
5594   vs->first_challenge_use = GNUNET_TIME_absolute_subtract (vs->validated_until,
5595                                                            GNUNET_TIME_relative_multiply (vs->validation_rtt,
5596                                                                                           VALIDATION_RTT_BUFFER_FACTOR));
5597   vs->last_challenge_use = GNUNET_TIME_UNIT_ZERO_ABS; /* challenge was not yet used */
5598   update_next_challenge_time (vs,
5599                               vs->first_challenge_use);
5600   vs->sc = GNUNET_PEERSTORE_store (peerstore,
5601                                    "transport",
5602                                    &cmc->im.sender,
5603                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
5604                                    vs->address,
5605                                    strlen (vs->address) + 1,
5606                                    vs->valid_until,
5607                                    GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
5608                                    &peerstore_store_validation_cb,
5609                                    vs);
5610   finish_cmc_handling (cmc);
5611
5612   /* Finally, we now possibly have a confirmed (!) working queue,
5613      update queue status (if queue still is around) */
5614   q = find_queue (&vs->pid,
5615                   vs->address);
5616   if (NULL == q)
5617   {
5618     GNUNET_STATISTICS_update (GST_stats,
5619                               "# Queues lost at time of successful validation",
5620                               1,
5621                               GNUNET_NO);
5622     return;
5623   }
5624   q->validated_until = vs->validated_until;
5625   q->rtt = vs->validation_rtt;
5626   if (GNUNET_NO != q->neighbour->core_visible)
5627     return; /* nothing changed, we are done here */
5628   q->neighbour->core_visible = GNUNET_YES;
5629   q->visibility_task
5630     = GNUNET_SCHEDULER_add_at (q->validated_until,
5631                                &core_queue_visibility_check,
5632                                q);
5633   /* Check if _any_ DV route to this neighbour is
5634      currently valid, if so, do NOT tell core anything! */
5635   dv = GNUNET_CONTAINER_multipeermap_get (dv_routes,
5636                                           &q->neighbour->pid);
5637   if (GNUNET_YES == dv->core_visible)
5638     return; /* nothing changed, done */
5639   /* We lacked a confirmed connection to the neighbour
5640      before, so tell CORE about it (finally!) */
5641   cores_send_connect_info (&q->neighbour->pid,
5642                            GNUNET_BANDWIDTH_ZERO);
5643 }
5644
5645
5646 /**
5647  * Incoming meessage.  Process the request.
5648  *
5649  * @param im the send message that was received
5650  */
5651 static void
5652 handle_incoming_msg (void *cls,
5653                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
5654 {
5655   struct TransportClient *tc = cls;
5656   struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
5657
5658   cmc->tc = tc;
5659   cmc->im = *im;
5660   demultiplex_with_cmc (cmc,
5661                         (const struct GNUNET_MessageHeader *) &im[1]);
5662 }
5663
5664
5665 /**
5666  * Given an inbound message @a msg from a communicator @a cmc,
5667  * demultiplex it based on the type calling the right handler.
5668  *
5669  * @param cmc context for demultiplexing
5670  * @param msg message to demultiplex
5671  */
5672 static void
5673 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
5674                       const struct GNUNET_MessageHeader *msg)
5675 {
5676   struct GNUNET_MQ_MessageHandler handlers[] = {
5677     GNUNET_MQ_hd_var_size (fragment_box,
5678                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
5679                            struct TransportFragmentBox,
5680                            &cmc),
5681     GNUNET_MQ_hd_fixed_size (fragment_ack,
5682                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
5683                              struct TransportFragmentAckMessage,
5684                              &cmc),
5685     GNUNET_MQ_hd_var_size (reliability_box,
5686                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
5687                            struct TransportReliabilityBox,
5688                            &cmc),
5689     GNUNET_MQ_hd_fixed_size (reliability_ack,
5690                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
5691                              struct TransportReliabilityAckMessage,
5692                              &cmc),
5693     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
5694                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
5695                            struct TransportBackchannelEncapsulationMessage,
5696                            &cmc),
5697     GNUNET_MQ_hd_var_size (dv_learn,
5698                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
5699                            struct TransportDVLearn,
5700                            &cmc),
5701     GNUNET_MQ_hd_var_size (dv_box,
5702                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
5703                            struct TransportDVBox,
5704                            &cmc),
5705     GNUNET_MQ_hd_fixed_size (validation_challenge,
5706                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
5707                              struct TransportValidationChallenge,
5708                              &cmc),
5709     GNUNET_MQ_hd_fixed_size (validation_response,
5710                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,
5711                              struct TransportValidationResponse,
5712                              &cmc),
5713     GNUNET_MQ_handler_end()
5714   };
5715   int ret;
5716
5717   ret = GNUNET_MQ_handle_message (handlers,
5718                                   msg);
5719   if (GNUNET_SYSERR == ret)
5720   {
5721     GNUNET_break (0);
5722     GNUNET_SERVICE_client_drop (cmc->tc->client);
5723     GNUNET_free (cmc);
5724     return;
5725   }
5726   if (GNUNET_NO == ret)
5727   {
5728     /* unencapsulated 'raw' message */
5729     handle_raw_message (&cmc,
5730                         msg);
5731   }
5732 }
5733
5734
5735 /**
5736  * New queue became available.  Check message.
5737  *
5738  * @param cls the client
5739  * @param aqm the send message that was sent
5740  */
5741 static int
5742 check_add_queue_message (void *cls,
5743                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
5744 {
5745   struct TransportClient *tc = cls;
5746
5747   if (CT_COMMUNICATOR != tc->type)
5748   {
5749     GNUNET_break (0);
5750     return GNUNET_SYSERR;
5751   }
5752   GNUNET_MQ_check_zero_termination (aqm);
5753   return GNUNET_OK;
5754 }
5755
5756
5757 /**
5758  * Bandwidth tracker informs us that the delay until we should receive
5759  * more has changed.
5760  *
5761  * @param cls a `struct Queue` for which the delay changed
5762  */
5763 static void
5764 tracker_update_in_cb (void *cls)
5765 {
5766   struct Queue *queue = cls;
5767   struct GNUNET_TIME_Relative in_delay;
5768   unsigned int rsize;
5769
5770   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
5771   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
5772                                                  rsize);
5773   // FIXME: how exactly do we do inbound flow control?
5774 }
5775
5776
5777 /**
5778  * If necessary, generates the UUID for a @a pm
5779  *
5780  * @param pm pending message to generate UUID for.
5781  */
5782 static void
5783 set_pending_message_uuid (struct PendingMessage *pm)
5784 {
5785   if (pm->msg_uuid_set)
5786     return;
5787   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
5788                               &pm->msg_uuid,
5789                               sizeof (pm->msg_uuid));
5790   pm->msg_uuid_set = GNUNET_YES;
5791 }
5792
5793
5794 /**
5795  * Fragment the given @a pm to the given @a mtu.  Adds
5796  * additional fragments to the neighbour as well. If the
5797  * @a mtu is too small, generates and error for the @a pm
5798  * and returns NULL.
5799  *
5800  * @param pm pending message to fragment for transmission
5801  * @param mtu MTU to apply
5802  * @return new message to transmit
5803  */
5804 static struct PendingMessage *
5805 fragment_message (struct PendingMessage *pm,
5806                   uint16_t mtu)
5807 {
5808   struct PendingMessage *ff;
5809
5810   set_pending_message_uuid (pm);
5811
5812   /* This invariant is established in #handle_add_queue_message() */
5813   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
5814
5815   /* select fragment for transmission, descending the tree if it has
5816      been expanded until we are at a leaf or at a fragment that is small enough */
5817   ff = pm;
5818   while ( ( (ff->bytes_msg > mtu) ||
5819             (pm == ff) ) &&
5820           (ff->frag_off == ff->bytes_msg) &&
5821           (NULL != ff->head_frag) )
5822   {
5823     ff = ff->head_frag; /* descent into fragmented fragments */
5824   }
5825
5826   if ( ( (ff->bytes_msg > mtu) ||
5827          (pm == ff) ) &&
5828        (pm->frag_off < pm->bytes_msg) )
5829   {
5830     /* Did not yet calculate all fragments, calculate next fragment */
5831     struct PendingMessage *frag;
5832     struct TransportFragmentBox tfb;
5833     const char *orig;
5834     char *msg;
5835     uint16_t fragmax;
5836     uint16_t fragsize;
5837     uint16_t msize;
5838     uint16_t xoff = 0;
5839
5840     orig = (const char *) &ff[1];
5841     msize = ff->bytes_msg;
5842     if (pm != ff)
5843     {
5844       const struct TransportFragmentBox *tfbo;
5845
5846       tfbo = (const struct TransportFragmentBox *) orig;
5847       orig += sizeof (struct TransportFragmentBox);
5848       msize -= sizeof (struct TransportFragmentBox);
5849       xoff = ntohs (tfbo->frag_off);
5850     }
5851     fragmax = mtu - sizeof (struct TransportFragmentBox);
5852     fragsize = GNUNET_MIN (msize - ff->frag_off,
5853                            fragmax);
5854     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
5855                           sizeof (struct TransportFragmentBox) +
5856                           fragsize);
5857     frag->target = pm->target;
5858     frag->frag_parent = ff;
5859     frag->timeout = pm->timeout;
5860     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
5861     frag->pmt = PMT_FRAGMENT_BOX;
5862     msg = (char *) &frag[1];
5863     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
5864     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
5865                              fragsize);
5866     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
5867     tfb.msg_uuid = pm->msg_uuid;
5868     tfb.frag_off = htons (ff->frag_off + xoff);
5869     tfb.msg_size = htons (pm->bytes_msg);
5870     memcpy (msg,
5871             &tfb,
5872             sizeof (tfb));
5873     memcpy (&msg[sizeof (tfb)],
5874             &orig[ff->frag_off],
5875             fragsize);
5876     GNUNET_CONTAINER_MDLL_insert (frag,
5877                                   ff->head_frag,
5878                                   ff->tail_frag,
5879                                   frag);
5880     ff->frag_off += fragsize;
5881     ff = frag;
5882   }
5883
5884   /* Move head to the tail and return it */
5885   GNUNET_CONTAINER_MDLL_remove (frag,
5886                                 ff->frag_parent->head_frag,
5887                                 ff->frag_parent->tail_frag,
5888                                 ff);
5889   GNUNET_CONTAINER_MDLL_insert_tail (frag,
5890                                      ff->frag_parent->head_frag,
5891                                      ff->frag_parent->tail_frag,
5892                                      ff);
5893   return ff;
5894 }
5895
5896
5897 /**
5898  * Reliability-box the given @a pm. On error (can there be any), NULL
5899  * may be returned, otherwise the "replacement" for @a pm (which
5900  * should then be added to the respective neighbour's queue instead of
5901  * @a pm).  If the @a pm is already fragmented or reliability boxed,
5902  * or itself an ACK, this function simply returns @a pm.
5903  *
5904  * @param pm pending message to box for transmission over unreliabile queue
5905  * @return new message to transmit
5906  */
5907 static struct PendingMessage *
5908 reliability_box_message (struct PendingMessage *pm)
5909 {
5910   struct TransportReliabilityBox rbox;
5911   struct PendingMessage *bpm;
5912   char *msg;
5913
5914   if (PMT_CORE != pm->pmt)
5915     return pm;  /* already fragmented or reliability boxed, or control message: do nothing */
5916   if (NULL != pm->bpm)
5917     return pm->bpm; /* already computed earlier: do nothing */
5918   GNUNET_assert (NULL == pm->head_frag);
5919   if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
5920   {
5921     /* failed hard */
5922     GNUNET_break (0);
5923     client_send_response (pm,
5924                           GNUNET_NO,
5925                           0);
5926     return NULL;
5927   }
5928   bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
5929                        sizeof (rbox) +
5930                        pm->bytes_msg);
5931   bpm->target = pm->target;
5932   bpm->frag_parent = pm;
5933   GNUNET_CONTAINER_MDLL_insert (frag,
5934                                 pm->head_frag,
5935                                 pm->tail_frag,
5936                                 bpm);
5937   bpm->timeout = pm->timeout;
5938   bpm->pmt = PMT_RELIABILITY_BOX;
5939   bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
5940   set_pending_message_uuid (bpm);
5941   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
5942   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
5943   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
5944   rbox.msg_uuid = pm->msg_uuid;
5945   msg = (char *) &bpm[1];
5946   memcpy (msg,
5947           &rbox,
5948           sizeof (rbox));
5949   memcpy (&msg[sizeof (rbox)],
5950           &pm[1],
5951           pm->bytes_msg);
5952   pm->bpm = bpm;
5953   return bpm;
5954 }
5955
5956
5957 /**
5958  * We believe we are ready to transmit a message on a queue. Double-checks
5959  * with the queue's "tracker_out" and then gives the message to the
5960  * communicator for transmission (updating the tracker, and re-scheduling
5961  * itself if applicable).
5962  *
5963  * @param cls the `struct Queue` to process transmissions for
5964  */
5965 static void
5966 transmit_on_queue (void *cls)
5967 {
5968   struct Queue *queue = cls;
5969   struct Neighbour *n = queue->neighbour;
5970   struct PendingMessage *pm;
5971   struct PendingMessage *s;
5972   uint32_t overhead;
5973
5974   queue->transmit_task = NULL;
5975   if (NULL == (pm = n->pending_msg_head))
5976   {
5977     /* no message pending, nothing to do here! */
5978     return;
5979   }
5980   schedule_transmit_on_queue (queue);
5981   if (NULL != queue->transmit_task)
5982     return; /* do it later */
5983   overhead = 0;
5984   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
5985     overhead += sizeof (struct TransportReliabilityBox);
5986   s = pm;
5987   if ( ( (0 != queue->mtu) &&
5988          (pm->bytes_msg + overhead > queue->mtu) ) ||
5989        (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
5990        (NULL != pm->head_frag /* fragments already exist, should
5991                                  respect that even if MTU is 0 for
5992                                  this queue */) )
5993     s = fragment_message (s,
5994                           (0 == queue->mtu)
5995                           ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
5996                           : queue->mtu);
5997   if (NULL == s)
5998   {
5999     /* Fragmentation failed, try next message... */
6000     schedule_transmit_on_queue (queue);
6001     return;
6002   }
6003   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
6004     s = reliability_box_message (s);
6005   if (NULL == s)
6006   {
6007     /* Reliability boxing failed, try next message... */
6008     schedule_transmit_on_queue (queue);
6009     return;
6010   }
6011
6012   /* Pass 's' for transission to the communicator */
6013   queue_send_msg (queue,
6014                   s,
6015                   &s[1],
6016                   s->bytes_msg);
6017   // FIXME: do something similar to the logic below
6018   // in defragmentation / reliability ACK handling!
6019
6020   /* Check if this transmission somehow conclusively finished handing 'pm'
6021      even without any explicit ACKs */
6022   if ( (PMT_CORE == s->pmt) &&
6023        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
6024   {
6025     /* Full message sent, and over reliabile channel */
6026     client_send_response (pm,
6027                           GNUNET_YES,
6028                           pm->bytes_msg);
6029   }
6030   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
6031             (PMT_FRAGMENT_BOX == s->pmt) )
6032   {
6033     struct PendingMessage *pos;
6034
6035     /* Fragment sent over reliabile channel */
6036     free_fragment_tree (s);
6037     pos = s->frag_parent;
6038     GNUNET_CONTAINER_MDLL_remove (frag,
6039                                   pos->head_frag,
6040                                   pos->tail_frag,
6041                                   s);
6042     GNUNET_free (s);
6043     /* check if subtree is done */
6044     while ( (NULL == pos->head_frag) &&
6045             (pos->frag_off == pos->bytes_msg) &&
6046             (pos != pm) )
6047     {
6048       s = pos;
6049       pos = s->frag_parent;
6050       GNUNET_CONTAINER_MDLL_remove (frag,
6051                                     pos->head_frag,
6052                                     pos->tail_frag,
6053                                     s);
6054       GNUNET_free (s);
6055     }
6056
6057     /* Was this the last applicable fragmment? */
6058     if ( (NULL == pm->head_frag) &&
6059          (pm->frag_off == pm->bytes_msg) )
6060       client_send_response (pm,
6061                             GNUNET_YES,
6062                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
6063   }
6064   else if (PMT_CORE != pm->pmt)
6065   {
6066     /* This was an acknowledgement of some type, always free */
6067     free_pending_message (pm);
6068   }
6069   else
6070   {
6071     /* message not finished, waiting for acknowledgement */
6072     struct Neighbour *neighbour = pm->target;
6073     /* Update time by which we might retransmit 's' based on queue
6074        characteristics (i.e. RTT); it takes one RTT for the message to
6075        arrive and the ACK to come back in the best case; but the other
6076        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
6077        retransmitting.  Note that in the future this heuristic should
6078        likely be improved further (measure RTT stability, consider
6079        message urgency and size when delaying ACKs, etc.) */
6080     s->next_attempt = GNUNET_TIME_relative_to_absolute
6081       (GNUNET_TIME_relative_multiply (queue->rtt,
6082                                       4));
6083     if (s == pm)
6084     {
6085       struct PendingMessage *pos;
6086
6087       /* re-insert sort in neighbour list */
6088       GNUNET_CONTAINER_MDLL_remove (neighbour,
6089                                     neighbour->pending_msg_head,
6090                                     neighbour->pending_msg_tail,
6091                                     pm);
6092       pos = neighbour->pending_msg_tail;
6093       while ( (NULL != pos) &&
6094               (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
6095         pos = pos->prev_neighbour;
6096       GNUNET_CONTAINER_MDLL_insert_after (neighbour,
6097                                           neighbour->pending_msg_head,
6098                                           neighbour->pending_msg_tail,
6099                                           pos,
6100                                           pm);
6101     }
6102     else
6103     {
6104       /* re-insert sort in fragment list */
6105       struct PendingMessage *fp = s->frag_parent;
6106       struct PendingMessage *pos;
6107
6108       GNUNET_CONTAINER_MDLL_remove (frag,
6109                                     fp->head_frag,
6110                                     fp->tail_frag,
6111                                     s);
6112       pos = fp->tail_frag;
6113       while ( (NULL != pos) &&
6114               (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
6115         pos = pos->prev_frag;
6116       GNUNET_CONTAINER_MDLL_insert_after (frag,
6117                                           fp->head_frag,
6118                                           fp->tail_frag,
6119                                           pos,
6120                                           s);
6121     }
6122   }
6123
6124   /* finally, re-schedule queue transmission task itself */
6125   schedule_transmit_on_queue (queue);
6126 }
6127
6128
6129 /**
6130  * Bandwidth tracker informs us that the delay until we
6131  * can transmit again changed.
6132  *
6133  * @param cls a `struct Queue` for which the delay changed
6134  */
6135 static void
6136 tracker_update_out_cb (void *cls)
6137 {
6138   struct Queue *queue = cls;
6139   struct Neighbour *n = queue->neighbour;
6140
6141   if (NULL == n->pending_msg_head)
6142   {
6143     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6144                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
6145                 queue->address);
6146     return; /* no message pending, nothing to do here! */
6147   }
6148   GNUNET_SCHEDULER_cancel (queue->transmit_task);
6149   queue->transmit_task = NULL;
6150   schedule_transmit_on_queue (queue);
6151 }
6152
6153
6154 /**
6155  * Bandwidth tracker informs us that excessive outbound bandwidth was
6156  * allocated which is not being used.
6157  *
6158  * @param cls a `struct Queue` for which the excess was noted
6159  */
6160 static void
6161 tracker_excess_out_cb (void *cls)
6162 {
6163   (void) cls;
6164
6165   /* FIXME: trigger excess bandwidth report to core? Right now,
6166      this is done internally within transport_api2_core already,
6167      but we probably want to change the logic and trigger it
6168      from here via a message instead! */
6169   /* TODO: maybe inform someone at this point? */
6170   GNUNET_STATISTICS_update (GST_stats,
6171                             "# Excess outbound bandwidth reported",
6172                             1,
6173                             GNUNET_NO);
6174 }
6175
6176
6177
6178 /**
6179  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
6180  * which is not being used.
6181  *
6182  * @param cls a `struct Queue` for which the excess was noted
6183  */
6184 static void
6185 tracker_excess_in_cb (void *cls)
6186 {
6187   (void) cls;
6188
6189   /* TODO: maybe inform somone at this point? */
6190   GNUNET_STATISTICS_update (GST_stats,
6191                             "# Excess inbound bandwidth reported",
6192                             1,
6193                             GNUNET_NO);
6194 }
6195
6196
6197 /**
6198  * Queue to a peer went down.  Process the request.
6199  *
6200  * @param cls the client
6201  * @param dqm the send message that was sent
6202  */
6203 static void
6204 handle_del_queue_message (void *cls,
6205                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
6206 {
6207   struct TransportClient *tc = cls;
6208
6209   if (CT_COMMUNICATOR != tc->type)
6210   {
6211     GNUNET_break (0);
6212     GNUNET_SERVICE_client_drop (tc->client);
6213     return;
6214   }
6215   for (struct Queue *queue = tc->details.communicator.queue_head;
6216        NULL != queue;
6217        queue = queue->next_client)
6218   {
6219     struct Neighbour *neighbour = queue->neighbour;
6220
6221     if ( (dqm->qid != queue->qid) ||
6222          (0 != GNUNET_memcmp (&dqm->receiver,
6223                               &neighbour->pid)) )
6224       continue;
6225     free_queue (queue);
6226     GNUNET_SERVICE_client_continue (tc->client);
6227     return;
6228   }
6229   GNUNET_break (0);
6230   GNUNET_SERVICE_client_drop (tc->client);
6231 }
6232
6233
6234 /**
6235  * Message was transmitted.  Process the request.
6236  *
6237  * @param cls the client
6238  * @param sma the send message that was sent
6239  */
6240 static void
6241 handle_send_message_ack (void *cls,
6242                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
6243 {
6244   struct TransportClient *tc = cls;
6245   struct QueueEntry *qe;
6246
6247   if (CT_COMMUNICATOR != tc->type)
6248   {
6249     GNUNET_break (0);
6250     GNUNET_SERVICE_client_drop (tc->client);
6251     return;
6252   }
6253
6254   /* find our queue entry matching the ACK */
6255   qe = NULL;
6256   for (struct Queue *queue = tc->details.communicator.queue_head;
6257        NULL != queue;
6258        queue = queue->next_client)
6259   {
6260     if (0 != GNUNET_memcmp (&queue->neighbour->pid,
6261                             &sma->receiver))
6262       continue;
6263     for (struct QueueEntry *qep = queue->queue_head;
6264          NULL != qep;
6265          qep = qep->next)
6266     {
6267       if (qep->mid != sma->mid)
6268         continue;
6269       qe = qep;
6270       break;
6271     }
6272     break;
6273   }
6274   if (NULL == qe)
6275   {
6276     /* this should never happen */
6277     GNUNET_break (0);
6278     GNUNET_SERVICE_client_drop (tc->client);
6279     return;
6280   }
6281   GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
6282                                qe->queue->queue_tail,
6283                                qe);
6284   qe->queue->queue_length--;
6285   tc->details.communicator.total_queue_length--;
6286   GNUNET_SERVICE_client_continue (tc->client);
6287
6288   /* if applicable, resume transmissions that waited on ACK */
6289   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
6290   {
6291     /* Communicator dropped below threshold, resume all queues */
6292     GNUNET_STATISTICS_update (GST_stats,
6293                               "# Transmission throttled due to communicator queue limit",
6294                               -1,
6295                               GNUNET_NO);
6296     for (struct Queue *queue = tc->details.communicator.queue_head;
6297          NULL != queue;
6298          queue = queue->next_client)
6299       schedule_transmit_on_queue (queue);
6300   }
6301   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
6302   {
6303     /* queue dropped below threshold; only resume this one queue */
6304     GNUNET_STATISTICS_update (GST_stats,
6305                               "# Transmission throttled due to queue queue limit",
6306                               -1,
6307                               GNUNET_NO);
6308     schedule_transmit_on_queue (qe->queue);
6309   }
6310
6311   /* TODO: we also should react on the status! */
6312   // FIXME: this probably requires queue->pm = s assignment!
6313   // FIXME: react to communicator status about transmission request. We got:
6314   sma->status; // OK success, SYSERR failure
6315
6316   GNUNET_free (qe);
6317 }
6318
6319
6320 /**
6321  * Iterator telling new MONITOR client about all existing
6322  * queues to peers.
6323  *
6324  * @param cls the new `struct TransportClient`
6325  * @param pid a connected peer
6326  * @param value the `struct Neighbour` with more information
6327  * @return #GNUNET_OK (continue to iterate)
6328  */
6329 static int
6330 notify_client_queues (void *cls,
6331                       const struct GNUNET_PeerIdentity *pid,
6332                       void *value)
6333 {
6334   struct TransportClient *tc = cls;
6335   struct Neighbour *neighbour = value;
6336
6337   GNUNET_assert (CT_MONITOR == tc->type);
6338   for (struct Queue *q = neighbour->queue_head;
6339        NULL != q;
6340        q = q->next_neighbour)
6341   {
6342     struct MonitorEvent me = {
6343       .rtt = q->rtt,
6344       .cs = q->cs,
6345       .num_msg_pending = q->num_msg_pending,
6346       .num_bytes_pending = q->num_bytes_pending
6347     };
6348
6349     notify_monitor (tc,
6350                     pid,
6351                     q->address,
6352                     q->nt,
6353                     &me);
6354   }
6355   return GNUNET_OK;
6356 }
6357
6358
6359 /**
6360  * Initialize a monitor client.
6361  *
6362  * @param cls the client
6363  * @param start the start message that was sent
6364  */
6365 static void
6366 handle_monitor_start (void *cls,
6367                       const struct GNUNET_TRANSPORT_MonitorStart *start)
6368 {
6369   struct TransportClient *tc = cls;
6370
6371   if (CT_NONE != tc->type)
6372   {
6373     GNUNET_break (0);
6374     GNUNET_SERVICE_client_drop (tc->client);
6375     return;
6376   }
6377   tc->type = CT_MONITOR;
6378   tc->details.monitor.peer = start->peer;
6379   tc->details.monitor.one_shot = ntohl (start->one_shot);
6380   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
6381                                          &notify_client_queues,
6382                                          tc);
6383   GNUNET_SERVICE_client_mark_monitor (tc->client);
6384   GNUNET_SERVICE_client_continue (tc->client);
6385 }
6386
6387
6388 /**
6389  * Find transport client providing communication service
6390  * for the protocol @a prefix.
6391  *
6392  * @param prefix communicator name
6393  * @return NULL if no such transport client is available
6394  */
6395 static struct TransportClient *
6396 lookup_communicator (const char *prefix)
6397 {
6398   for (struct TransportClient *tc = clients_head;
6399        NULL != tc;
6400        tc = tc->next)
6401   {
6402     if (CT_COMMUNICATOR != tc->type)
6403       continue;
6404     if (0 == strcmp (prefix,
6405                      tc->details.communicator.address_prefix))
6406       return tc;
6407   }
6408   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
6409               "Somone suggested use of communicator for `%s', but we do not have such a communicator!\n",
6410               prefix);
6411   return NULL;
6412 }
6413
6414
6415 /**
6416  * Signature of a function called with a communicator @a address of a peer
6417  * @a pid that an application wants us to connect to.
6418  *
6419  * @param pid target peer
6420  * @param address the address to try
6421  */
6422 static void
6423 suggest_to_connect (const struct GNUNET_PeerIdentity *pid,
6424                     const char *address)
6425 {
6426   static uint32_t idgen;
6427   struct TransportClient *tc;
6428   char *prefix;
6429   struct GNUNET_TRANSPORT_CreateQueue *cqm;
6430   struct GNUNET_MQ_Envelope *env;
6431   size_t alen;
6432
6433   prefix = GNUNET_HELLO_address_to_prefix (address);
6434   if (NULL == prefix)
6435   {
6436     GNUNET_break (0); /* We got an invalid address!? */
6437     return;
6438   }
6439   tc = lookup_communicator (prefix);
6440   if (NULL == tc)
6441   {
6442     GNUNET_STATISTICS_update (GST_stats,
6443                               "# Suggestions ignored due to missing communicator",
6444                               1,
6445                               GNUNET_NO);
6446     return;
6447   }
6448   /* forward suggestion for queue creation to communicator */
6449   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6450               "Request #%u for `%s' communicator to create queue to `%s'\n",
6451               (unsigned int) idgen,
6452               prefix,
6453               address);
6454   alen = strlen (address) + 1;
6455   env = GNUNET_MQ_msg_extra (cqm,
6456                              alen,
6457                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
6458   cqm->request_id = htonl (idgen++);
6459   cqm->receiver = *pid;
6460   memcpy (&cqm[1],
6461           address,
6462           alen);
6463   GNUNET_MQ_send (tc->mq,
6464                   env);
6465 }
6466
6467
6468 /**
6469  * The queue @a q (which matches the peer and address in @a vs) is
6470  * ready for queueing. We should now queue the validation request.
6471  *
6472  * @param q queue to send on
6473  * @param vs state to derive validation challenge from
6474  */
6475 static void
6476 validation_transmit_on_queue (struct Queue *q,
6477                               struct ValidationState *vs)
6478 {
6479   struct TransportValidationChallenge tvc;
6480
6481   vs->last_challenge_use = GNUNET_TIME_absolute_get ();
6482   tvc.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE);
6483   tvc.header.size = htons (sizeof (tvc));
6484   tvc.reserved = htonl (0);
6485   tvc.challenge = vs->challenge;
6486   tvc.sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
6487   queue_send_msg (q,
6488                   NULL,
6489                   &tvc,
6490                   sizeof (tvc));
6491 }
6492
6493
6494 /**
6495  * Task run periodically to validate some address based on #validation_heap.
6496  *
6497  * @param cls NULL
6498  */
6499 static void
6500 validation_start_cb (void *cls)
6501 {
6502   struct ValidationState *vs;
6503   struct Queue *q;
6504
6505   (void) cls;
6506   validation_task = NULL;
6507   vs = GNUNET_CONTAINER_heap_peek (validation_heap);
6508   /* drop validations past their expiration */
6509   while ( (NULL != vs) &&
6510           (0 == GNUNET_TIME_absolute_get_remaining (vs->valid_until).rel_value_us) )
6511   {
6512     free_validation_state (vs);
6513     vs = GNUNET_CONTAINER_heap_peek (validation_heap);
6514   }
6515   if (NULL == vs)
6516     return; /* woopsie, no more addresses known, should only
6517                happen if we're really a lonely peer */
6518   q = find_queue (&vs->pid,
6519                   vs->address);
6520   if (NULL == q)
6521   {
6522     vs->awaiting_queue = GNUNET_YES;
6523     suggest_to_connect (&vs->pid,
6524                         vs->address);
6525   }
6526   else
6527     validation_transmit_on_queue (q,
6528                                   vs);
6529   /* Finally, reschedule next attempt */
6530   vs->challenge_backoff = GNUNET_TIME_randomized_backoff (vs->challenge_backoff,
6531                                                           MAX_VALIDATION_CHALLENGE_FREQ);
6532   update_next_challenge_time (vs,
6533                               GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
6534 }
6535
6536
6537 /**
6538  * Closure for #check_connection_quality.
6539  */
6540 struct QueueQualityContext
6541 {
6542   /**
6543    * Set to the @e k'th queue encountered.
6544    */
6545   struct Queue *q;
6546
6547   /**
6548    * Set to the number of quality queues encountered.
6549    */
6550   unsigned int quality_count;
6551
6552   /**
6553    * Set to the total number of queues encountered.
6554    */
6555   unsigned int num_queues;
6556
6557   /**
6558    * Decremented for each queue, for selection of the
6559    * k-th queue in @e q.
6560    */
6561   unsigned int k;
6562
6563 };
6564
6565
6566 /**
6567  * Check whether any queue to the given neighbour is
6568  * of a good "quality" and if so, increment the counter.
6569  * Also counts the total number of queues, and returns
6570  * the k-th queue found.
6571  *
6572  * @param cls a `struct QueueQualityContext *` with counters
6573  * @param pid peer this is about
6574  * @param value a `struct Neighbour`
6575  * @return #GNUNET_OK (continue to iterate)
6576  */
6577 static int
6578 check_connection_quality (void *cls,
6579                           const struct GNUNET_PeerIdentity *pid,
6580                           void *value)
6581 {
6582   struct QueueQualityContext *ctx = cls;
6583   struct Neighbour *n = value;
6584   int do_inc;
6585
6586   (void) pid;
6587   do_inc = GNUNET_NO;
6588   for (struct Queue *q = n->queue_head;
6589        NULL != q;
6590        q = q->next_neighbour)
6591   {
6592     if (0 != q->distance)
6593       continue; /* DV does not count */
6594     ctx->num_queues++;
6595     if (0 == ctx->k--)
6596       ctx->q = q;
6597     /* OPTIMIZE-FIXME: in the future, add reliability / goodput
6598        statistics and consider those as well here? */
6599     if (q->rtt.rel_value_us < DV_QUALITY_RTT_THRESHOLD.rel_value_us)
6600       do_inc = GNUNET_YES;
6601   }
6602   if (GNUNET_YES == do_inc)
6603     ctx->quality_count++;
6604   return GNUNET_OK;
6605 }
6606
6607
6608 /**
6609  * Task run when we CONSIDER initiating a DV learn
6610  * process. We first check that sending out a message is
6611  * even possible (queues exist), then that it is desirable
6612  * (if not, reschedule the task for later), and finally
6613  * we may then begin the job.  If there are too many
6614  * entries in the #dvlearn_map, we purge the oldest entry
6615  * using #lle_tail.
6616  *
6617  * @param cls NULL
6618  */
6619 static void
6620 start_dv_learn (void *cls)
6621 {
6622   struct LearnLaunchEntry *lle;
6623   struct QueueQualityContext qqc;
6624   struct TransportDVLearn dvl;
6625
6626   (void) cls;
6627   dvlearn_task = NULL;
6628   if (0 ==
6629       GNUNET_CONTAINER_multipeermap_size (neighbours))
6630     return; /* lost all connectivity, cannot do learning */
6631   qqc.quality_count = 0;
6632   qqc.num_queues = 0;
6633   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
6634                                          &check_connection_quality,
6635                                          &qqc);
6636   if (qqc.quality_count > DV_LEARN_QUALITY_THRESHOLD)
6637   {
6638     struct GNUNET_TIME_Relative delay;
6639     unsigned int factor;
6640
6641     /* scale our retries by how far we are above the threshold */
6642     factor = qqc.quality_count / DV_LEARN_QUALITY_THRESHOLD;
6643     delay = GNUNET_TIME_relative_multiply (DV_LEARN_BASE_FREQUENCY,
6644                                            factor);
6645     dvlearn_task = GNUNET_SCHEDULER_add_delayed (delay,
6646                                                  &start_dv_learn,
6647                                                  NULL);
6648     return;
6649   }
6650   /* remove old entries in #dvlearn_map if it has grown too big */
6651   while (MAX_DV_LEARN_PENDING >=
6652          GNUNET_CONTAINER_multishortmap_size (dvlearn_map))
6653   {
6654     lle = lle_tail;
6655     GNUNET_assert (GNUNET_YES ==
6656                    GNUNET_CONTAINER_multishortmap_remove (dvlearn_map,
6657                                                           &lle->challenge,
6658                                                           lle));
6659     GNUNET_CONTAINER_DLL_remove (lle_head,
6660                                  lle_tail,
6661                                  lle);
6662     GNUNET_free (lle);
6663   }
6664   /* setup data structure for learning */
6665   lle = GNUNET_new (struct LearnLaunchEntry);
6666   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
6667                               &lle->challenge,
6668                               sizeof (lle->challenge));
6669   GNUNET_CONTAINER_DLL_insert (lle_head,
6670                                lle_tail,
6671                                lle);
6672   GNUNET_break (GNUNET_YES ==
6673                 GNUNET_CONTAINER_multishortmap_put (dvlearn_map,
6674                                                     &lle->challenge,
6675                                                     lle,
6676                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6677   dvl.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN);
6678   dvl.header.size = htons (sizeof (dvl));
6679   dvl.num_hops = htons (0);
6680   dvl.bidirectional = htons (0);
6681   dvl.non_network_delay = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
6682   {
6683     struct DvInitPS dvip = {
6684       .purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_INITIATOR),
6685       .purpose.size = htonl (sizeof (dvip)),
6686       .challenge = lle->challenge
6687     };
6688
6689     GNUNET_assert (GNUNET_OK ==
6690                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
6691                                              &dvip.purpose,
6692                                              &dvl.init_sig));
6693   }
6694   dvl.initiator = GST_my_identity;
6695   dvl.challenge = lle->challenge;
6696
6697   qqc.quality_count = 0;
6698   qqc.k = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
6699                                     qqc.num_queues);
6700   qqc.num_queues = 0;
6701   qqc.q = NULL;
6702   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
6703                                          &check_connection_quality,
6704                                          &qqc);
6705   GNUNET_assert (NULL != qqc.q);
6706
6707   /* Do this as close to transmission time as possible! */
6708   lle->launch_time = GNUNET_TIME_absolute_get ();
6709
6710   queue_send_msg (qqc.q,
6711                   NULL,
6712                   &dvl,
6713                   sizeof (dvl));
6714   /* reschedule this job, randomizing the time it runs (but no
6715      actual backoff!) */
6716   dvlearn_task
6717     = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY),
6718                                     &start_dv_learn,
6719                                     NULL);
6720 }
6721
6722
6723 /**
6724  * A new queue has been created, check if any address validation
6725  * requests have been waiting for it.
6726  *
6727  * @param cls a `struct Queue`
6728  * @param pid peer concerned (unused)
6729  * @param value a `struct ValidationState`
6730  * @return #GNUNET_NO if a match was found and we can stop looking
6731  */
6732 static int
6733 check_validation_request_pending (void *cls,
6734                                   const struct GNUNET_PeerIdentity *pid,
6735                                   void *value)
6736 {
6737   struct Queue *q = cls;
6738   struct ValidationState *vs = value;
6739
6740   (void) pid;
6741   if ( (GNUNET_YES == vs->awaiting_queue) &&
6742        (0 == strcmp (vs->address,
6743                      q->address)) )
6744   {
6745     vs->awaiting_queue = GNUNET_NO;
6746     validation_transmit_on_queue (q,
6747                                   vs);
6748     return GNUNET_NO;
6749   }
6750   return GNUNET_OK;
6751 }
6752
6753
6754 /**
6755  * New queue became available.  Process the request.
6756  *
6757  * @param cls the client
6758  * @param aqm the send message that was sent
6759  */
6760 static void
6761 handle_add_queue_message (void *cls,
6762                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
6763 {
6764   struct TransportClient *tc = cls;
6765   struct Queue *queue;
6766   struct Neighbour *neighbour;
6767   const char *addr;
6768   uint16_t addr_len;
6769
6770   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
6771   {
6772     /* MTU so small as to be useless for transmissions,
6773        required for #fragment_message()! */
6774     GNUNET_break_op (0);
6775     GNUNET_SERVICE_client_drop (tc->client);
6776     return;
6777   }
6778   neighbour = lookup_neighbour (&aqm->receiver);
6779   if (NULL == neighbour)
6780   {
6781     neighbour = GNUNET_new (struct Neighbour);
6782     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
6783     neighbour->pid = aqm->receiver;
6784     GNUNET_assert (GNUNET_OK ==
6785                    GNUNET_CONTAINER_multipeermap_put (neighbours,
6786                                                       &neighbour->pid,
6787                                                       neighbour,
6788                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
6789   }
6790   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
6791   addr = (const char *) &aqm[1];
6792
6793   queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
6794   queue->tc = tc;
6795   queue->address = (const char *) &queue[1];
6796   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
6797   queue->qid = aqm->qid;
6798   queue->mtu = ntohl (aqm->mtu);
6799   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
6800   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
6801   queue->neighbour = neighbour;
6802   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
6803                                   &tracker_update_in_cb,
6804                                   queue,
6805                                   GNUNET_BANDWIDTH_ZERO,
6806                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
6807                                   &tracker_excess_in_cb,
6808                                   queue);
6809   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
6810                                   &tracker_update_out_cb,
6811                                   queue,
6812                                   GNUNET_BANDWIDTH_ZERO,
6813                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
6814                                   &tracker_excess_out_cb,
6815                                   queue);
6816   memcpy (&queue[1],
6817           addr,
6818           addr_len);
6819   /* notify monitors about new queue */
6820   {
6821     struct MonitorEvent me = {
6822       .rtt = queue->rtt,
6823       .cs = queue->cs
6824     };
6825
6826     notify_monitors (&neighbour->pid,
6827                      queue->address,
6828                      queue->nt,
6829                      &me);
6830   }
6831   GNUNET_CONTAINER_MDLL_insert (neighbour,
6832                                 neighbour->queue_head,
6833                                 neighbour->queue_tail,
6834                                 queue);
6835   GNUNET_CONTAINER_MDLL_insert (client,
6836                                 tc->details.communicator.queue_head,
6837                                 tc->details.communicator.queue_tail,
6838                                 queue);
6839   /* check if valdiations are waiting for the queue */
6840   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
6841                                                      &aqm->receiver,
6842                                                      &check_validation_request_pending,
6843                                                      queue);
6844   /* might be our first queue, try launching DV learning */
6845   if (NULL == dvlearn_task)
6846     dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn,
6847                                              NULL);
6848   GNUNET_SERVICE_client_continue (tc->client);
6849 }
6850
6851
6852 /**
6853  * Communicator tells us that our request to create a queue "worked", that
6854  * is setting up the queue is now in process.
6855  *
6856  * @param cls the `struct TransportClient`
6857  * @param cqr confirmation message
6858  */
6859 static void
6860 handle_queue_create_ok (void *cls,
6861                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
6862 {
6863   struct TransportClient *tc = cls;
6864
6865   if (CT_COMMUNICATOR != tc->type)
6866   {
6867     GNUNET_break (0);
6868     GNUNET_SERVICE_client_drop (tc->client);
6869     return;
6870   }
6871   GNUNET_STATISTICS_update (GST_stats,
6872                             "# Suggestions succeeded at communicator",
6873                             1,
6874                             GNUNET_NO);
6875   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6876               "Request #%u for communicator to create queue succeeded\n",
6877               (unsigned int) ntohs (cqr->request_id));
6878   GNUNET_SERVICE_client_continue (tc->client);
6879 }
6880
6881
6882 /**
6883  * Communicator tells us that our request to create a queue failed. This usually
6884  * indicates that the provided address is simply invalid or that the communicator's
6885  * resources are exhausted.
6886  *
6887  * @param cls the `struct TransportClient`
6888  * @param cqr failure message
6889  */
6890 static void
6891 handle_queue_create_fail (void *cls,
6892                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
6893 {
6894   struct TransportClient *tc = cls;
6895
6896   if (CT_COMMUNICATOR != tc->type)
6897   {
6898     GNUNET_break (0);
6899     GNUNET_SERVICE_client_drop (tc->client);
6900     return;
6901   }
6902   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6903               "Request #%u for communicator to create queue failed\n",
6904               (unsigned int) ntohs (cqr->request_id));
6905   GNUNET_STATISTICS_update (GST_stats,
6906                             "# Suggestions failed in queue creation at communicator",
6907                             1,
6908                             GNUNET_NO);
6909   GNUNET_SERVICE_client_continue (tc->client);
6910 }
6911
6912
6913 /**
6914  * We have received a `struct ExpressPreferenceMessage` from an application client.
6915  *
6916  * @param cls handle to the client
6917  * @param msg the start message
6918  */
6919 static void
6920 handle_suggest_cancel (void *cls,
6921                        const struct ExpressPreferenceMessage *msg)
6922 {
6923   struct TransportClient *tc = cls;
6924   struct PeerRequest *pr;
6925
6926   if (CT_APPLICATION != tc->type)
6927   {
6928     GNUNET_break (0);
6929     GNUNET_SERVICE_client_drop (tc->client);
6930     return;
6931   }
6932   pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests,
6933                                           &msg->peer);
6934   if (NULL == pr)
6935   {
6936     GNUNET_break (0);
6937     GNUNET_SERVICE_client_drop (tc->client);
6938     return;
6939   }
6940   (void) stop_peer_request (tc,
6941                             &pr->pid,
6942                             pr);
6943   GNUNET_SERVICE_client_continue (tc->client);
6944 }
6945
6946
6947 /**
6948  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
6949  * messages. We do nothing here, real verification is done later.
6950  *
6951  * @param cls a `struct TransportClient *`
6952  * @param msg message to verify
6953  * @return #GNUNET_OK
6954  */
6955 static int
6956 check_address_consider_verify (void *cls,
6957                                const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
6958 {
6959   (void) cls;
6960   (void) hdr;
6961   return GNUNET_OK;
6962 }
6963
6964
6965 /**
6966  * Closure for #check_known_address.
6967  */
6968 struct CheckKnownAddressContext
6969 {
6970   /**
6971    * Set to the address we are looking for.
6972    */
6973   const char *address;
6974
6975   /**
6976    * Set to a matching validation state, if one was found.
6977    */
6978   struct ValidationState *vs;
6979 };
6980
6981
6982 /**
6983  * Test if the validation state in @a value matches the
6984  * address from @a cls.
6985  *
6986  * @param cls a `struct CheckKnownAddressContext`
6987  * @param pid unused (must match though)
6988  * @param value a `struct ValidationState`
6989  * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
6990  */
6991 static int
6992 check_known_address (void *cls,
6993                      const struct GNUNET_PeerIdentity *pid,
6994                      void *value)
6995 {
6996   struct CheckKnownAddressContext *ckac = cls;
6997   struct ValidationState *vs = value;
6998
6999   (void) pid;
7000   if (0 != strcmp (vs->address,
7001                    ckac->address))
7002     return GNUNET_OK;
7003   ckac->vs = vs;
7004   return GNUNET_NO;
7005 }
7006
7007
7008 /**
7009  * Start address validation.
7010  *
7011  * @param pid peer the @a address is for
7012  * @param address an address to reach @a pid (presumably)
7013  * @param expiration when did @a pid claim @a address will become invalid
7014  */
7015 static void
7016 start_address_validation (const struct GNUNET_PeerIdentity *pid,
7017                           const char *address,
7018                           struct GNUNET_TIME_Absolute expiration)
7019 {
7020   struct GNUNET_TIME_Absolute now;
7021   struct ValidationState *vs;
7022   struct CheckKnownAddressContext ckac = {
7023     .address = address,
7024     .vs = NULL
7025   };
7026
7027   if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
7028     return; /* expired */
7029   (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
7030                                                      pid,
7031                                                      &check_known_address,
7032                                                      &ckac);
7033   if (NULL != (vs = ckac.vs))
7034   {
7035     /* if 'vs' is not currently valid, we need to speed up retrying the validation */
7036     if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
7037     {
7038       /* reduce backoff as we got a fresh advertisement */
7039       vs->challenge_backoff = GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
7040                                                         GNUNET_TIME_relative_divide (vs->challenge_backoff,
7041                                                                                      2));
7042       update_next_challenge_time (vs,
7043                                   GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
7044     }
7045     return;
7046   }
7047   now = GNUNET_TIME_absolute_get();
7048   vs = GNUNET_new (struct ValidationState);
7049   vs->pid = *pid;
7050   vs->valid_until = expiration;
7051   vs->first_challenge_use = now;
7052   vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
7053   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
7054                               &vs->challenge,
7055                               sizeof (vs->challenge));
7056   vs->address = GNUNET_strdup (address);
7057   GNUNET_assert (GNUNET_YES ==
7058                  GNUNET_CONTAINER_multipeermap_put (validation_map,
7059                                                     &vs->pid,
7060                                                     vs,
7061                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
7062   update_next_challenge_time (vs,
7063                               now);
7064 }
7065
7066
7067 /**
7068  * Function called by PEERSTORE for each matching record.
7069  *
7070  * @param cls closure
7071  * @param record peerstore record information
7072  * @param emsg error message, or NULL if no errors
7073  */
7074 static void
7075 handle_hello (void *cls,
7076               const struct GNUNET_PEERSTORE_Record *record,
7077               const char *emsg)
7078 {
7079   struct PeerRequest *pr = cls;
7080   const char *val;
7081
7082   if (NULL != emsg)
7083   {
7084     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
7085                 "Got failure from PEERSTORE: %s\n",
7086                 emsg);
7087     return;
7088   }
7089   val = record->value;
7090   if ( (0 == record->value_size) ||
7091        ('\0' != val[record->value_size - 1]) )
7092   {
7093     GNUNET_break (0);
7094     return;
7095   }
7096   start_address_validation (&pr->pid,
7097                             (const char *) record->value,
7098                             record->expiry);
7099 }
7100
7101
7102 /**
7103  * We have received a `struct ExpressPreferenceMessage` from an application client.
7104  *
7105  * @param cls handle to the client
7106  * @param msg the start message
7107  */
7108 static void
7109 handle_suggest (void *cls,
7110                 const struct ExpressPreferenceMessage *msg)
7111 {
7112   struct TransportClient *tc = cls;
7113   struct PeerRequest *pr;
7114
7115   if (CT_NONE == tc->type)
7116   {
7117     tc->type = CT_APPLICATION;
7118     tc->details.application.requests
7119       = GNUNET_CONTAINER_multipeermap_create (16,
7120                                               GNUNET_YES);
7121   }
7122   if (CT_APPLICATION != tc->type)
7123   {
7124     GNUNET_break (0);
7125     GNUNET_SERVICE_client_drop (tc->client);
7126     return;
7127   }
7128   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
7129               "Client suggested we talk to %s with preference %d at rate %u\n",
7130               GNUNET_i2s (&msg->peer),
7131               (int) ntohl (msg->pk),
7132               (int) ntohl (msg->bw.value__));
7133   pr = GNUNET_new (struct PeerRequest);
7134   pr->tc = tc;
7135   pr->pid = msg->peer;
7136   pr->bw = msg->bw;
7137   pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk);
7138   if (GNUNET_YES !=
7139       GNUNET_CONTAINER_multipeermap_put (tc->details.application.requests,
7140                                          &pr->pid,
7141                                          pr,
7142                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
7143   {
7144     GNUNET_break (0);
7145     GNUNET_free (pr);
7146     GNUNET_SERVICE_client_drop (tc->client);
7147     return;
7148   }
7149   pr->wc = GNUNET_PEERSTORE_watch (peerstore,
7150                                    "transport",
7151                                    &pr->pid,
7152                                    GNUNET_PEERSTORE_TRANSPORT_URLADDRESS_KEY,
7153                                    &handle_hello,
7154                                    pr);
7155   GNUNET_SERVICE_client_continue (tc->client);
7156 }
7157
7158
7159 /**
7160  * Given another peers address, consider checking it for validity
7161  * and then adding it to the Peerstore.
7162  *
7163  * @param cls a `struct TransportClient`
7164  * @param hdr message containing the raw address data and
7165  *        signature in the body, see #GNUNET_HELLO_extract_address()
7166  */
7167 static void
7168 handle_address_consider_verify (void *cls,
7169                                 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
7170 {
7171   struct TransportClient *tc = cls;
7172   char *address;
7173   enum GNUNET_NetworkType nt;
7174   struct GNUNET_TIME_Absolute expiration;
7175
7176   (void) cls;
7177   // OPTIMIZE-FIXME: checking that we know this address already should
7178   //        be done BEFORE checking the signature => HELLO API change!
7179   // OPTIMIZE-FIXME: pre-check: rate-limit signature verification / validation?!
7180   address = GNUNET_HELLO_extract_address (&hdr[1],
7181                                           ntohs (hdr->header.size) - sizeof (*hdr),
7182                                           &hdr->peer,
7183                                           &nt,
7184                                           &expiration);
7185   if (NULL == address)
7186   {
7187     GNUNET_break_op (0);
7188     return;
7189   }
7190   start_address_validation (&hdr->peer,
7191                             address,
7192                             expiration);
7193   GNUNET_free (address);
7194   GNUNET_SERVICE_client_continue (tc->client);
7195 }
7196
7197
7198 /**
7199  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_HELLO_VALIDATION
7200  * messages.
7201  *
7202  * @param cls a `struct TransportClient *`
7203  * @param m message to verify
7204  * @return #GNUNET_OK on success
7205  */
7206 static int
7207 check_request_hello_validation (void *cls,
7208                                 const struct RequestHelloValidationMessage *m)
7209 {
7210   (void) cls;
7211   GNUNET_MQ_check_zero_termination (m);
7212   return GNUNET_OK;
7213 }
7214
7215
7216 /**
7217  * A client encountered an address of another peer. Consider validating it,
7218  * and if validation succeeds, persist it to PEERSTORE.
7219  *
7220  * @param cls a `struct TransportClient *`
7221  * @param m message to verify
7222  */
7223 static void
7224 handle_request_hello_validation (void *cls,
7225                                  const struct RequestHelloValidationMessage *m)
7226 {
7227   struct TransportClient *tc = cls;
7228
7229   start_address_validation (&m->peer,
7230                             (const char *) &m[1],
7231                             GNUNET_TIME_absolute_ntoh (m->expiration));
7232   GNUNET_SERVICE_client_continue (tc->client);
7233 }
7234
7235
7236 /**
7237  * Free neighbour entry.
7238  *
7239  * @param cls NULL
7240  * @param pid unused
7241  * @param value a `struct Neighbour`
7242  * @return #GNUNET_OK (always)
7243  */
7244 static int
7245 free_neighbour_cb (void *cls,
7246                    const struct GNUNET_PeerIdentity *pid,
7247                    void *value)
7248 {
7249   struct Neighbour *neighbour = value;
7250
7251   (void) cls;
7252   (void) pid;
7253   GNUNET_break (0); // should this ever happen?
7254   free_neighbour (neighbour);
7255
7256   return GNUNET_OK;
7257 }
7258
7259
7260 /**
7261  * Free DV route entry.
7262  *
7263  * @param cls NULL
7264  * @param pid unused
7265  * @param value a `struct DistanceVector`
7266  * @return #GNUNET_OK (always)
7267  */
7268 static int
7269 free_dv_routes_cb (void *cls,
7270                    const struct GNUNET_PeerIdentity *pid,
7271                    void *value)
7272 {
7273   struct DistanceVector *dv = value;
7274
7275   (void) cls;
7276   (void) pid;
7277   free_dv_route (dv);
7278
7279   return GNUNET_OK;
7280 }
7281
7282
7283 /**
7284  * Free ephemeral entry.
7285  *
7286  * @param cls NULL
7287  * @param pid unused
7288  * @param value a `struct EphemeralCacheEntry`
7289  * @return #GNUNET_OK (always)
7290  */
7291 static int
7292 free_ephemeral_cb (void *cls,
7293                    const struct GNUNET_PeerIdentity *pid,
7294                    void *value)
7295 {
7296   struct EphemeralCacheEntry *ece = value;
7297
7298   (void) cls;
7299   (void) pid;
7300   free_ephemeral (ece);
7301   return GNUNET_OK;
7302 }
7303
7304
7305 /**
7306  * Free validation state.
7307  *
7308  * @param cls NULL
7309  * @param pid unused
7310  * @param value a `struct ValidationState`
7311  * @return #GNUNET_OK (always)
7312  */
7313 static int
7314 free_validation_state_cb (void *cls,
7315                           const struct GNUNET_PeerIdentity *pid,
7316                           void *value)
7317 {
7318   struct ValidationState *vs = value;
7319
7320   (void) cls;
7321   (void) pid;
7322   free_validation_state (vs);
7323   return GNUNET_OK;
7324 }
7325
7326
7327 /**
7328  * Function called when the service shuts down.  Unloads our plugins
7329  * and cancels pending validations.
7330  *
7331  * @param cls closure, unused
7332  */
7333 static void
7334 do_shutdown (void *cls)
7335 {
7336   struct LearnLaunchEntry *lle;
7337   (void) cls;
7338
7339   if (NULL != ephemeral_task)
7340   {
7341     GNUNET_SCHEDULER_cancel (ephemeral_task);
7342     ephemeral_task = NULL;
7343   }
7344   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
7345                                          &free_neighbour_cb,
7346                                          NULL);
7347   if (NULL != peerstore)
7348   {
7349     GNUNET_PEERSTORE_disconnect (peerstore,
7350                                  GNUNET_NO);
7351     peerstore = NULL;
7352   }
7353   if (NULL != GST_stats)
7354   {
7355     GNUNET_STATISTICS_destroy (GST_stats,
7356                                GNUNET_NO);
7357     GST_stats = NULL;
7358   }
7359   if (NULL != GST_my_private_key)
7360   {
7361     GNUNET_free (GST_my_private_key);
7362     GST_my_private_key = NULL;
7363   }
7364   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
7365   neighbours = NULL;
7366   GNUNET_CONTAINER_multipeermap_iterate (validation_map,
7367                                          &free_validation_state_cb,
7368                                          NULL);
7369   GNUNET_CONTAINER_multipeermap_destroy (validation_map);
7370   validation_map = NULL;
7371   while (NULL != (lle = lle_head))
7372   {
7373     GNUNET_CONTAINER_DLL_remove (lle_head,
7374                                  lle_tail,
7375                                  lle);
7376     GNUNET_free (lle);
7377   }
7378   GNUNET_CONTAINER_multishortmap_destroy (dvlearn_map);
7379   dvlearn_map = NULL;
7380   GNUNET_CONTAINER_heap_destroy (validation_heap);
7381   validation_heap = NULL;
7382   GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
7383                                          &free_dv_routes_cb,
7384                                          NULL);
7385   GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
7386   dv_routes = NULL;
7387   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
7388                                          &free_ephemeral_cb,
7389                                          NULL);
7390   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
7391   ephemeral_map = NULL;
7392   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
7393   ephemeral_heap = NULL;
7394 }
7395
7396
7397 /**
7398  * Initiate transport service.
7399  *
7400  * @param cls closure
7401  * @param c configuration to use
7402  * @param service the initialized service
7403  */
7404 static void
7405 run (void *cls,
7406      const struct GNUNET_CONFIGURATION_Handle *c,
7407      struct GNUNET_SERVICE_Handle *service)
7408 {
7409   (void) cls;
7410   (void) service;
7411   /* setup globals */
7412   GST_cfg = c;
7413   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
7414                                                      GNUNET_YES);
7415   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
7416                                                     GNUNET_YES);
7417   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
7418                                                         GNUNET_YES);
7419   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
7420   dvlearn_map = GNUNET_CONTAINER_multishortmap_create (2 * MAX_DV_LEARN_PENDING,
7421                                                        GNUNET_YES);
7422   validation_map = GNUNET_CONTAINER_multipeermap_create (1024,
7423                                                          GNUNET_YES);
7424   validation_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
7425   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
7426   if (NULL == GST_my_private_key)
7427   {
7428     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
7429                 _("Transport service is lacking key configuration settings. Exiting.\n"));
7430     GNUNET_SCHEDULER_shutdown ();
7431     return;
7432   }
7433   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
7434                                       &GST_my_identity.public_key);
7435   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
7436              "My identity is `%s'\n",
7437              GNUNET_i2s_full (&GST_my_identity));
7438   GST_stats = GNUNET_STATISTICS_create ("transport",
7439                                         GST_cfg);
7440   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
7441                                  NULL);
7442   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
7443   if (NULL == peerstore)
7444   {
7445     GNUNET_break (0);
7446     GNUNET_SCHEDULER_shutdown ();
7447     return;
7448   }
7449 }
7450
7451
7452 /**
7453  * Define "main" method using service macro.
7454  */
7455 GNUNET_SERVICE_MAIN
7456 ("transport",
7457  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
7458  &run,
7459  &client_connect_cb,
7460  &client_disconnect_cb,
7461  NULL,
7462  /* communication with applications */
7463  GNUNET_MQ_hd_fixed_size (suggest,
7464                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST,
7465                           struct ExpressPreferenceMessage,
7466                           NULL),
7467  GNUNET_MQ_hd_fixed_size (suggest_cancel,
7468                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL,
7469                           struct ExpressPreferenceMessage,
7470                           NULL),
7471  GNUNET_MQ_hd_var_size (request_hello_validation,
7472                         GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_HELLO_VALIDATION,
7473                         struct RequestHelloValidationMessage,
7474                         NULL),
7475  /* communication with core */
7476  GNUNET_MQ_hd_fixed_size (client_start,
7477                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
7478                           struct StartMessage,
7479                           NULL),
7480  GNUNET_MQ_hd_var_size (client_send,
7481                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
7482                         struct OutboundMessage,
7483                         NULL),
7484  /* communication with communicators */
7485  GNUNET_MQ_hd_var_size (communicator_available,
7486                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
7487                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
7488                         NULL),
7489  GNUNET_MQ_hd_var_size (communicator_backchannel,
7490                         GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
7491                         struct GNUNET_TRANSPORT_CommunicatorBackchannel,
7492                         NULL),
7493  GNUNET_MQ_hd_var_size (add_address,
7494                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
7495                         struct GNUNET_TRANSPORT_AddAddressMessage,
7496                         NULL),
7497  GNUNET_MQ_hd_fixed_size (del_address,
7498                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
7499                           struct GNUNET_TRANSPORT_DelAddressMessage,
7500                           NULL),
7501  GNUNET_MQ_hd_var_size (incoming_msg,
7502                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
7503                         struct GNUNET_TRANSPORT_IncomingMessage,
7504                         NULL),
7505  GNUNET_MQ_hd_fixed_size (queue_create_ok,
7506                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
7507                           struct GNUNET_TRANSPORT_CreateQueueResponse,
7508                           NULL),
7509  GNUNET_MQ_hd_fixed_size (queue_create_fail,
7510                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
7511                           struct GNUNET_TRANSPORT_CreateQueueResponse,
7512                           NULL),
7513  GNUNET_MQ_hd_var_size (add_queue_message,
7514                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
7515                         struct GNUNET_TRANSPORT_AddQueueMessage,
7516                         NULL),
7517  GNUNET_MQ_hd_var_size (address_consider_verify,
7518                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
7519                         struct GNUNET_TRANSPORT_AddressToVerify,
7520                         NULL),
7521  GNUNET_MQ_hd_fixed_size (del_queue_message,
7522                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
7523                           struct GNUNET_TRANSPORT_DelQueueMessage,
7524                           NULL),
7525  GNUNET_MQ_hd_fixed_size (send_message_ack,
7526                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
7527                           struct GNUNET_TRANSPORT_SendMessageToAck,
7528                           NULL),
7529  /* communication with monitors */
7530  GNUNET_MQ_hd_fixed_size (monitor_start,
7531                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
7532                           struct GNUNET_TRANSPORT_MonitorStart,
7533                           NULL),
7534  GNUNET_MQ_handler_end ());
7535
7536
7537 /* end of file gnunet-service-transport.c */