Merge branch 'master' of git+ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement next:
36  * - address validation: what is our plan here?
37  *   #1 Peerstore only gets 'validated' addresses
38  *   #2 transport needs another API to "trigger" validation!
39  *      API may be used by core/application or communicators;
40  *      => use yet another lib/MQ/connection?
41  *   #3 transport should use validation to also establish
42  *      effective flow control (for uni-directional transports!)
43  *   #4 UDP broadcasting logic must be extended to use the new API
44  *   #5 only validated addresses are selected for scheduling; that
45  *      also ensures we know the RTT
46  *   #6 to ensure flow control and RTT are OK, we always do the
47  *      'validation', even if address comes from PEERSTORE
48  *   #7
49  * - ACK handling / retransmission
50  * - address verification
51  * - track RTT, distance, loss, etc.
52  * - DV data structures:
53  *   + learning
54  *   + forgetting
55  *   + using them!
56  * - routing of messages (using DV data structures!)
57  * - handling of DV-boxed messages that need to be forwarded
58  * - backchannel message encryption & decryption
59  * -
60  *
61  * Easy:
62  * - figure out how to call XXX_suggestion_cb!
63  *
64  * Later:
65  * - change transport-core API to provide proper flow control in both
66  *   directions, allow multiple messages per peer simultaneously (tag
67  *   confirmations with unique message ID), and replace quota-out with
68  *   proper flow control;
69  * - if messages are below MTU, consider adding ACKs and other stuff
70  *   (requires planning at receiver, and additional MST-style demultiplex
71  *    at receiver!)
72  * - could avoid copying body of message into each fragment and keep
73  *   fragments as just pointers into the original message and only
74  *   fully build fragments just before transmission (optimization, should
75  *   reduce CPU and memory use)
76  *
77  * Design realizations / discussion:
78  * - communicators do flow control by calling MQ "notify sent"
79  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
80  *   or explicitly via background channel FC ACKs.  As long as the
81  *   channel is not full, they may 'notify sent' even if the other
82  *   peer has not yet confirmed receipt. The other peer confirming
83  *   is _only_ for FC, not for more reliable transmission; reliable
84  *   transmission (i.e. of fragments) is left to _transport_.
85  * - ACKs sent back in uni-directional communicators are done via
86  *   the background channel API; here transport _may_ initially
87  *   broadcast (with bounded # hops) if no path is known;
88  * - transport should _integrate_ DV-routing and build a view of
89  *   the network; then background channel traffic can be
90  *   routed via DV as well as explicit "DV" traffic.
91  * - background channel is also used for ACKs and NAT traversal support
92  * - transport service is responsible for AEAD'ing the background
93  *   channel, timestamps and monotonic time are used against replay
94  *   of old messages -> peerstore needs to be supplied with
95  *   "latest timestamps seen" data
96  * - if transport implements DV, we likely need a 3rd peermap
97  *   in addition to ephemerals and (direct) neighbours
98  *   ==> check if stuff needs to be moved out of "Neighbour"
99  * - transport should encapsualte core-level messages and do its
100  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
101  *   for retransmission
102  */
103 #include "platform.h"
104 #include "gnunet_util_lib.h"
105 #include "gnunet_statistics_service.h"
106 #include "gnunet_transport_monitor_service.h"
107 #include "gnunet_peerstore_service.h"
108 #include "gnunet_hello_lib.h"
109 #include "gnunet_signatures.h"
110 #include "transport.h"
111
112
113 /**
114  * What is the size we assume for a read operation in the
115  * absence of an MTU for the purpose of flow control?
116  */
117 #define IN_PACKET_SIZE_WITHOUT_MTU 128
118
119 /**
120  * If a queue delays the next message by more than this number
121  * of seconds we log a warning. Note: this is for testing,
122  * the value chosen here might be too aggressively low!
123  */
124 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
125
126 /**
127  * How long are ephemeral keys valid?
128  */
129 #define EPHEMERAL_VALIDITY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
130
131 /**
132  * How long do we keep partially reassembled messages around before giving up?
133  */
134 #define REASSEMBLY_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
135
136 /**
137  * How many messages can we have pending for a given communicator
138  * process before we start to throttle that communicator?
139  *
140  * Used if a communicator might be CPU-bound and cannot handle the traffic.
141  */
142 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
143
144 /**
145  * How many messages can we have pending for a given queue (queue to
146  * a particular peer via a communicator) process before we start to
147  * throttle that queue?
148  */
149 #define QUEUE_LENGTH_LIMIT 32
150
151
152 GNUNET_NETWORK_STRUCT_BEGIN
153
154 /**
155  * Outer layer of an encapsulated backchannel message.
156  */
157 struct TransportBackchannelEncapsulationMessage
158 {
159   /**
160    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
161    */
162   struct GNUNET_MessageHeader header;
163
164   /**
165    * Distance the backchannel message has traveled, to be updated at
166    * each hop.  Used to bound the number of hops in case a backchannel
167    * message is broadcast and thus travels without routing
168    * information (during initial backchannel discovery).
169    */
170   uint32_t distance;
171
172   /**
173    * Target's peer identity (as backchannels may be transmitted
174    * indirectly, or even be broadcast).
175    */
176   struct GNUNET_PeerIdentity target;
177
178   /**
179    * Ephemeral key setup by the sender for @e target, used
180    * to encrypt the payload.
181    */
182   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
183
184   // FIXME: probably should add random IV here as well,
185   // especially if we re-use ephemeral keys!
186
187   /**
188    * HMAC over the ciphertext of the encrypted, variable-size
189    * body that follows.  Verified via DH of @e target and
190    * @e ephemeral_key
191    */
192   struct GNUNET_HashCode hmac;
193
194   /* Followed by encrypted, variable-size payload */
195 };
196
197
198 /**
199  * Body by which a peer confirms that it is using an ephemeral key.
200  */
201 struct EphemeralConfirmation
202 {
203
204   /**
205    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
206    */
207   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
208
209   /**
210    * How long is this signature over the ephemeral key valid?
211    * Note that the receiver MUST IGNORE the absolute time, and
212    * only interpret the value as a mononic time and reject
213    * "older" values than the last one observed.  Even with this,
214    * there is no real guarantee against replay achieved here,
215    * as the latest timestamp is not persisted.  This is
216    * necessary as we do not want to require synchronized
217    * clocks and may not have a bidirectional communication
218    * channel.  Communicators must protect against replay
219    * attacks when using backchannel communication!
220    */
221   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
222
223   /**
224    * Target's peer identity.
225    */
226   struct GNUNET_PeerIdentity target;
227
228   /**
229    * Ephemeral key setup by the sender for @e target, used
230    * to encrypt the payload.
231    */
232   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
233
234 };
235
236
237 /**
238  * Plaintext of the variable-size payload that is encrypted
239  * within a `struct TransportBackchannelEncapsulationMessage`
240  */
241 struct TransportBackchannelRequestPayload
242 {
243
244   /**
245    * Sender's peer identity.
246    */
247   struct GNUNET_PeerIdentity sender;
248
249   /**
250    * Signature of the sender over an
251    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
252    */
253   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
254
255   /**
256    * How long is this signature over the ephemeral key
257    * valid?
258    */
259   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
260
261   /**
262    * Current monotonic time of the sending transport service.  Used to
263    * detect replayed messages.  Note that the receiver should remember
264    * a list of the recently seen timestamps and only reject messages
265    * if the timestamp is in the list, or the list is "full" and the
266    * timestamp is smaller than the lowest in the list.  This list of
267    * timestamps per peer should be persisted to guard against replays
268    * after restarts.
269    */
270   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
271
272   /* Followed by a `struct GNUNET_MessageHeader` with a message
273      for a communicator */
274
275   /* Followed by a 0-termianted string specifying the name of
276      the communicator which is to receive the message */
277
278 };
279
280
281 /**
282  * Outer layer of an encapsulated unfragmented application message sent
283  * over an unreliable channel.
284  */
285 struct TransportReliabilityBox
286 {
287   /**
288    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX
289    */
290   struct GNUNET_MessageHeader header;
291
292   /**
293    * Number of messages still to be sent before a commulative
294    * ACK is requested.  Zero if an ACK is requested immediately.
295    * In NBO.  Note that the receiver may send the ACK faster
296    * if it believes that is reasonable.
297    */
298   uint32_t ack_countdown GNUNET_PACKED;
299
300   /**
301    * Unique ID of the message used for signalling receipt of
302    * messages sent over possibly unreliable channels.  Should
303    * be a random.
304    */
305   struct GNUNET_ShortHashCode msg_uuid;
306 };
307
308
309 /**
310  * Confirmation that the receiver got a
311  * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. Note that the
312  * confirmation may be transmitted over a completely different queue,
313  * so ACKs are identified by a combination of PID of sender and
314  * message UUID, without the queue playing any role!
315  */
316 struct TransportReliabilityAckMessage
317 {
318   /**
319    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK
320    */
321   struct GNUNET_MessageHeader header;
322
323   /**
324    * Reserved. Zero.
325    */
326   uint32_t reserved GNUNET_PACKED;
327
328   /**
329    * How long was the ACK delayed relative to the average time of
330    * receipt of the messages being acknowledged?  Used to calculate
331    * the average RTT by taking the receipt time of the ack minus the
332    * average transmission time of the sender minus this value.
333    */
334   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
335
336   /* followed by any number of `struct GNUNET_ShortHashCode`
337      messages providing ACKs */
338 };
339
340
341 /**
342  * Outer layer of an encapsulated fragmented application message.
343  */
344 struct TransportFragmentBox
345 {
346   /**
347    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT
348    */
349   struct GNUNET_MessageHeader header;
350
351   /**
352    * Unique ID of this fragment (and fragment transmission!). Will
353    * change even if a fragement is retransmitted to make each
354    * transmission attempt unique! Should be incremented by one for
355    * each fragment transmission. If a client receives a duplicate
356    * fragment (same @e frag_off), it must send
357    * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
358    */
359   uint32_t frag_uuid GNUNET_PACKED;
360
361   /**
362    * Original message ID for of the message that all the1
363    * fragments belong to.  Must be the same for all fragments.
364    */
365   struct GNUNET_ShortHashCode msg_uuid;
366
367   /**
368    * Offset of this fragment in the overall message.
369    */
370   uint16_t frag_off GNUNET_PACKED;
371
372   /**
373    * Total size of the message that is being fragmented.
374    */
375   uint16_t msg_size GNUNET_PACKED;
376
377 };
378
379
380 /**
381  * Outer layer of an fragmented application message sent over a queue
382  * with finite MTU.  When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
383  * received, the receiver has two RTTs or 64 further fragments with
384  * the same basic message time to send an acknowledgement, possibly
385  * acknowledging up to 65 fragments in one ACK.  ACKs must also be
386  * sent immediately once all fragments were sent.
387  */
388 struct TransportFragmentAckMessage
389 {
390   /**
391    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
392    */
393   struct GNUNET_MessageHeader header;
394
395   /**
396    * Unique ID of the lowest fragment UUID being acknowledged.
397    */
398   uint32_t frag_uuid GNUNET_PACKED;
399
400   /**
401    * Bitfield of up to 64 additional fragments following the
402    * @e msg_uuid being acknowledged by this message.
403    */
404   uint64_t extra_acks GNUNET_PACKED;
405
406   /**
407    * Original message ID for of the message that all the
408    * fragments belong to.
409    */
410   struct GNUNET_ShortHashCode msg_uuid;
411
412   /**
413    * How long was the ACK delayed relative to the average time of
414    * receipt of the fragments being acknowledged?  Used to calculate
415    * the average RTT by taking the receipt time of the ack minus the
416    * average transmission time of the sender minus this value.
417    */
418   struct GNUNET_TIME_RelativeNBO avg_ack_delay;
419
420   /**
421    * How long until the receiver will stop trying reassembly
422    * of this message?
423    */
424   struct GNUNET_TIME_RelativeNBO reassembly_timeout;
425 };
426
427
428 /**
429  * Internal message used by transport for distance vector learning.
430  * If @e num_hops does not exceed the threshold, peers should append
431  * themselves to the peer list and flood the message (possibly only
432  * to a subset of their neighbours to limit discoverability of the
433  * network topology).  To the extend that the @e bidirectional bits
434  * are set, peers may learn the inverse paths even if they did not
435  * initiate.
436  *
437  * Unless received on a bidirectional queue and @e num_hops just
438  * zero, peers that can forward to the initator should always try to
439  * forward to the initiator.
440  */
441 struct TransportDVLearn
442 {
443   /**
444    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN
445    */
446   struct GNUNET_MessageHeader header;
447
448   /**
449    * Number of hops this messages has travelled, in NBO. Zero if
450    * sent by initiator.
451    */
452   uint16_t num_hops GNUNET_PACKED;
453
454   /**
455    * Bitmask of the last 16 hops indicating whether they are confirmed
456    * available (without DV) in both directions or not, in NBO.  Used
457    * to possibly instantly learn a path in both directions.  Each peer
458    * should shift this value by one to the left, and then set the
459    * lowest bit IF the current sender can be reached from it (without
460    * DV routing).
461    */
462   uint16_t bidirectional GNUNET_PACKED;
463
464   /**
465    * Peers receiving this message and delaying forwarding to other
466    * peers for any reason should increment this value such as to
467    * enable the origin to determine the actual network-only delay
468    * in addition to the real-time delay (assuming the message loops
469    * back to the origin).
470    */
471   struct GNUNET_TIME_Relative cummulative_non_network_delay;
472
473   /**
474    * Identity of the peer that started this learning activity.
475    */
476   struct GNUNET_PeerIdentity initiator;
477
478   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values,
479      excluding the initiator of the DV trace; the last entry is the
480      current sender; the current peer must not be included. */
481
482 };
483
484
485 /**
486  * Outer layer of an encapsulated message send over multiple hops.
487  * The path given only includes the identities of the subsequent
488  * peers, i.e. it will be empty if we are the receiver. Each
489  * forwarding peer should scan the list from the end, and if it can,
490  * forward to the respective peer. The list should then be shortened
491  * by all the entries up to and including that peer.  Each hop should
492  * also increment @e total_hops to allow the receiver to get a precise
493  * estimate on the number of hops the message travelled.  Senders must
494  * provide a learned path that thus should work, but intermediaries
495  * know of a shortcut, they are allowed to send the message via that
496  * shortcut.
497  *
498  * If a peer finds itself still on the list, it must drop the message.
499  */
500 struct TransportDVBox
501 {
502   /**
503    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
504    */
505   struct GNUNET_MessageHeader header;
506
507   /**
508    * Number of total hops this messages travelled. In NBO.
509    * @e origin sets this to zero, to be incremented at
510    * each hop.
511    */
512   uint16_t total_hops GNUNET_PACKED;
513
514   /**
515    * Number of hops this messages includes. In NBO.
516    */
517   uint16_t num_hops GNUNET_PACKED;
518
519   /**
520    * Identity of the peer that originated the message.
521    */
522   struct GNUNET_PeerIdentity origin;
523
524   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
525      excluding the @e origin and the current peer, the last must be
526      the ultimate target; if @e num_hops is zero, the receiver of this
527      message is the ultimate target. */
528
529   /* Followed by the actual message, which itself may be
530      another box, but not a DV_LEARN or DV_BOX message! */
531 };
532
533
534 GNUNET_NETWORK_STRUCT_END
535
536
537 /**
538  * What type of client is the `struct TransportClient` about?
539  */
540 enum ClientType
541 {
542   /**
543    * We do not know yet (client is fresh).
544    */
545   CT_NONE = 0,
546
547   /**
548    * Is the CORE service, we need to forward traffic to it.
549    */
550   CT_CORE = 1,
551
552   /**
553    * It is a monitor, forward monitor data.
554    */
555   CT_MONITOR = 2,
556
557   /**
558    * It is a communicator, use for communication.
559    */
560   CT_COMMUNICATOR = 3,
561
562   /**
563    * "Application" telling us where to connect (i.e. TOPOLOGY, DHT or CADET).
564    */
565   CT_APPLICATION = 4
566 };
567
568
569 /**
570  * Entry in our cache of ephemeral keys we currently use.
571  * This way, we only sign an ephemeral once per @e target,
572  * and then can re-use it over multiple
573  * #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION
574  * messages (as signing is expensive).
575  */
576 struct EphemeralCacheEntry
577 {
578
579   /**
580    * Target's peer identity (we don't re-use ephemerals
581    * to limit linkability of messages).
582    */
583   struct GNUNET_PeerIdentity target;
584
585   /**
586    * Signature affirming @e ephemeral_key of type
587    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
588    */
589   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
590
591   /**
592    * How long is @e sender_sig valid
593    */
594   struct GNUNET_TIME_Absolute ephemeral_validity;
595
596   /**
597    * Our ephemeral key.
598    */
599   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
600
601   /**
602    * Our private ephemeral key.
603    */
604   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
605
606   /**
607    * Node in the ephemeral cache for this entry.
608    * Used for expiration.
609    */
610   struct GNUNET_CONTAINER_HeapNode *hn;
611 };
612
613
614 /**
615  * Client connected to the transport service.
616  */
617 struct TransportClient;
618
619
620 /**
621  * A neighbour that at least one communicator is connected to.
622  */
623 struct Neighbour;
624
625
626 /**
627  * Entry in our #dv_routes table, representing a (set of) distance
628  * vector routes to a particular peer.
629  */
630 struct DistanceVector;
631
632 /**
633  * One possible hop towards a DV target.
634  */
635 struct DistanceVectorHop
636 {
637
638   /**
639    * Kept in a MDLL, sorted by @e timeout.
640    */
641   struct DistanceVectorHop *next_dv;
642
643   /**
644    * Kept in a MDLL, sorted by @e timeout.
645    */
646   struct DistanceVectorHop *prev_dv;
647
648   /**
649    * Kept in a MDLL.
650    */
651   struct DistanceVectorHop *next_neighbour;
652
653   /**
654    * Kept in a MDLL.
655    */
656   struct DistanceVectorHop *prev_neighbour;
657
658   /**
659    * What would be the next hop to @e target?
660    */
661   struct Neighbour *next_hop;
662
663   /**
664    * Distance vector entry this hop belongs with.
665    */
666   struct DistanceVector *dv;
667
668   /**
669    * Array of @e distance hops to the target, excluding @e next_hop.
670    * NULL if the entire path is us to @e next_hop to `target`. Allocated
671    * at the end of this struct.
672    */
673   const struct GNUNET_PeerIdentity *path;
674
675   /**
676    * At what time do we forget about this path unless we see it again
677    * while learning?
678    */
679   struct GNUNET_TIME_Absolute timeout;
680
681   /**
682    * How many hops in total to the `target` (excluding @e next_hop and `target` itself),
683    * thus 0 still means a distance of 2 hops (to @e next_hop and then to `target`)?
684    */
685   unsigned int distance;
686 };
687
688
689 /**
690  * Entry in our #dv_routes table, representing a (set of) distance
691  * vector routes to a particular peer.
692  */
693 struct DistanceVector
694 {
695
696   /**
697    * To which peer is this a route?
698    */
699   struct GNUNET_PeerIdentity target;
700
701   /**
702    * Known paths to @e target.
703    */
704   struct DistanceVectorHop *dv_head;
705
706   /**
707    * Known paths to @e target.
708    */
709   struct DistanceVectorHop *dv_tail;
710
711   /**
712    * Task scheduled to purge expired paths from @e dv_head MDLL.
713    */
714   struct GNUNET_SCHEDULER_Task *timeout_task;
715 };
716
717
718 /**
719  * A queue is a message queue provided by a communicator
720  * via which we can reach a particular neighbour.
721  */
722 struct Queue;
723
724
725 /**
726  * Entry identifying transmission in one of our `struct
727  * Queue` which still awaits an ACK.  This is used to
728  * ensure we do not overwhelm a communicator and limit the number of
729  * messages outstanding per communicator (say in case communicator is
730  * CPU bound) and per queue (in case bandwidth allocation exceeds
731  * what the communicator can actually provide towards a particular
732  * peer/target).
733  */
734 struct QueueEntry
735 {
736
737   /**
738    * Kept as a DLL.
739    */
740   struct QueueEntry *next;
741
742   /**
743    * Kept as a DLL.
744    */
745   struct QueueEntry *prev;
746
747   /**
748    * Queue this entry is queued with.
749    */
750   struct Queue *queue;
751
752   /**
753    * Message ID used for this message with the queue used for transmission.
754    */
755   uint64_t mid;
756 };
757
758
759 /**
760  * A queue is a message queue provided by a communicator
761  * via which we can reach a particular neighbour.
762  */
763 struct Queue
764 {
765   /**
766    * Kept in a MDLL.
767    */
768   struct Queue *next_neighbour;
769
770   /**
771    * Kept in a MDLL.
772    */
773   struct Queue *prev_neighbour;
774
775   /**
776    * Kept in a MDLL.
777    */
778   struct Queue *prev_client;
779
780   /**
781    * Kept in a MDLL.
782    */
783   struct Queue *next_client;
784
785   /**
786    * Head of DLL of unacked transmission requests.
787    */
788   struct QueueEntry *queue_head;
789
790   /**
791    * End of DLL of unacked transmission requests.
792    */
793   struct QueueEntry *queue_tail;
794
795   /**
796    * Which neighbour is this queue for?
797    */
798   struct Neighbour *neighbour;
799
800   /**
801    * Which communicator offers this queue?
802    */
803   struct TransportClient *tc;
804
805   /**
806    * Address served by the queue.
807    */
808   const char *address;
809
810   /**
811    * Task scheduled for the time when this queue can (likely) transmit the
812    * next message. Still needs to check with the @e tracker_out to be sure.
813    */
814   struct GNUNET_SCHEDULER_Task *transmit_task;
815
816   /**
817    * Our current RTT estimate for this queue.
818    */
819   struct GNUNET_TIME_Relative rtt;
820
821   /**
822    * Message ID generator for transmissions on this queue.
823    */
824   uint64_t mid_gen;
825
826   /**
827    * Unique identifier of this queue with the communicator.
828    */
829   uint32_t qid;
830
831   /**
832    * Maximum transmission unit supported by this queue.
833    */
834   uint32_t mtu;
835
836   /**
837    * Distance to the target of this queue.
838    */
839   uint32_t distance;
840
841   /**
842    * Messages pending.
843    */
844   uint32_t num_msg_pending;
845
846   /**
847    * Bytes pending.
848    */
849   uint32_t num_bytes_pending;
850
851   /**
852    * Length of the DLL starting at @e queue_head.
853    */
854   unsigned int queue_length;
855
856   /**
857    * Network type offered by this queue.
858    */
859   enum GNUNET_NetworkType nt;
860
861   /**
862    * Connection status for this queue.
863    */
864   enum GNUNET_TRANSPORT_ConnectionStatus cs;
865
866   /**
867    * How much outbound bandwidth do we have available for this queue?
868    */
869   struct GNUNET_BANDWIDTH_Tracker tracker_out;
870
871   /**
872    * How much inbound bandwidth do we have available for this queue?
873    */
874   struct GNUNET_BANDWIDTH_Tracker tracker_in;
875 };
876
877
878 /**
879  * Information we keep for a message that we are reassembling.
880  */
881 struct ReassemblyContext
882 {
883
884   /**
885    * Original message ID for of the message that all the
886    * fragments belong to.
887    */
888   struct GNUNET_ShortHashCode msg_uuid;
889
890   /**
891    * Which neighbour is this context for?
892    */
893   struct Neighbour *neighbour;
894
895   /**
896    * Entry in the reassembly heap (sorted by expiration).
897    */
898   struct GNUNET_CONTAINER_HeapNode *hn;
899
900   /**
901    * Bitfield with @e msg_size bits representing the positions
902    * where we have received fragments.  When we receive a fragment,
903    * we check the bits in @e bitfield before incrementing @e msg_missing.
904    *
905    * Allocated after the reassembled message.
906    */
907   uint8_t *bitfield;
908
909   /**
910    * Task for sending ACK. We may send ACKs either because of hitting
911    * the @e extra_acks limit, or based on time and @e num_acks.  This
912    * task is for the latter case.
913    */
914   struct GNUNET_SCHEDULER_Task *ack_task;
915
916   /**
917    * At what time will we give up reassembly of this message?
918    */
919   struct GNUNET_TIME_Absolute reassembly_timeout;
920
921   /**
922    * Average delay of all acks in @e extra_acks and @e frag_uuid.
923    * Should be reset to zero when @e num_acks is set to 0.
924    */
925   struct GNUNET_TIME_Relative avg_ack_delay;
926
927   /**
928    * Time we received the last fragment.  @e avg_ack_delay must be
929    * incremented by now - @e last_frag multiplied by @e num_acks.
930    */
931   struct GNUNET_TIME_Absolute last_frag;
932
933   /**
934    * Bitfield of up to 64 additional fragments following @e frag_uuid
935    * to be acknowledged in the next cummulative ACK.
936    */
937   uint64_t extra_acks;
938
939   /**
940    * Unique ID of the lowest fragment UUID to be acknowledged in the
941    * next cummulative ACK.  Only valid if @e num_acks > 0.
942    */
943   uint32_t frag_uuid;
944
945   /**
946    * Number of ACKs we have accumulated so far.  Reset to 0
947    * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
948    */
949   unsigned int num_acks;
950
951   /**
952    * How big is the message we are reassembling in total?
953    */
954   uint16_t msg_size;
955
956   /**
957    * How many bytes of the message are still missing?  Defragmentation
958    * is complete when @e msg_missing == 0.
959    */
960   uint16_t msg_missing;
961
962   /* Followed by @e msg_size bytes of the (partially) defragmented original message */
963
964   /* Followed by @e bitfield data */
965 };
966
967
968 /**
969  * A neighbour that at least one communicator is connected to.
970  */
971 struct Neighbour
972 {
973
974   /**
975    * Which peer is this about?
976    */
977   struct GNUNET_PeerIdentity pid;
978
979   /**
980    * Map with `struct ReassemblyContext` structs for fragments under
981    * reassembly. May be NULL if we currently have no fragments from
982    * this @e pid (lazy initialization).
983    */
984   struct GNUNET_CONTAINER_MultiShortmap *reassembly_map;
985
986   /**
987    * Heap with `struct ReassemblyContext` structs for fragments under
988    * reassembly. May be NULL if we currently have no fragments from
989    * this @e pid (lazy initialization).
990    */
991   struct GNUNET_CONTAINER_Heap *reassembly_heap;
992
993   /**
994    * Task to free old entries from the @e reassembly_heap and @e reassembly_map.
995    */
996   struct GNUNET_SCHEDULER_Task *reassembly_timeout_task;
997
998   /**
999    * Head of list of messages pending for this neighbour.
1000    */
1001   struct PendingMessage *pending_msg_head;
1002
1003   /**
1004    * Tail of list of messages pending for this neighbour.
1005    */
1006   struct PendingMessage *pending_msg_tail;
1007
1008   /**
1009    * Head of MDLL of DV hops that have this neighbour as next hop. Must be
1010    * purged if this neighbour goes down.
1011    */
1012   struct DistanceVectorHop *dv_head;
1013
1014   /**
1015    * Tail of MDLL of DV hops that have this neighbour as next hop. Must be
1016    * purged if this neighbour goes down.
1017    */
1018   struct DistanceVectorHop *dv_tail;
1019
1020   /**
1021    * Head of DLL of queues to this peer.
1022    */
1023   struct Queue *queue_head;
1024
1025   /**
1026    * Tail of DLL of queues to this peer.
1027    */
1028   struct Queue *queue_tail;
1029
1030   /**
1031    * Task run to cleanup pending messages that have exceeded their timeout.
1032    */
1033   struct GNUNET_SCHEDULER_Task *timeout_task;
1034
1035   /**
1036    * Quota at which CORE is allowed to transmit to this peer.
1037    *
1038    * FIXME: not yet used, tricky to get right given multiple queues!
1039    *        (=> Idea: measure???)
1040    * FIXME: how do we set this value initially when we tell CORE?
1041    *    Options: start at a minimum value or at literally zero?
1042    *         (=> Current thought: clean would be zero!)
1043    */
1044   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
1045
1046   /**
1047    * What is the earliest timeout of any message in @e pending_msg_tail?
1048    */
1049   struct GNUNET_TIME_Absolute earliest_timeout;
1050
1051 };
1052
1053
1054 /**
1055  * A peer that an application (client) would like us to talk to directly.
1056  */
1057 struct PeerRequest
1058 {
1059
1060   /**
1061    * Which peer is this about?
1062    */
1063   struct GNUNET_PeerIdentity pid;
1064
1065   /**
1066    * Client responsible for the request.
1067    */
1068   struct TransportClient *tc;
1069
1070   /**
1071    * Handle for watching the peerstore for HELLOs for this peer.
1072    */
1073   struct GNUNET_PEERSTORE_WatchContext *wc;
1074
1075   /**
1076    * What kind of performance preference does this @e tc have?
1077    */
1078   enum GNUNET_MQ_PreferenceKind pk;
1079
1080   /**
1081    * How much bandwidth would this @e tc like to see?
1082    */
1083   struct GNUNET_BANDWIDTH_Value32NBO bw;
1084
1085 };
1086
1087
1088 /**
1089  * Types of different pending messages.
1090  */
1091 enum PendingMessageType
1092 {
1093
1094   /**
1095    * Ordinary message received from the CORE service.
1096    */
1097   PMT_CORE = 0,
1098
1099   /**
1100    * Fragment box.
1101    */
1102   PMT_FRAGMENT_BOX = 1,
1103
1104   /**
1105    * Reliability box.
1106    */
1107   PMT_RELIABILITY_BOX = 2,
1108
1109   /**
1110    * Any type of acknowledgement.
1111    */
1112   PMT_ACKNOWLEDGEMENT = 3
1113
1114
1115 };
1116
1117
1118 /**
1119  * Transmission request that is awaiting delivery.  The original
1120  * transmission requests from CORE may be too big for some queues.
1121  * In this case, a *tree* of fragments is created.  At each
1122  * level of the tree, fragments are kept in a DLL ordered by which
1123  * fragment should be sent next (at the head).  The tree is searched
1124  * top-down, with the original message at the root.
1125  *
1126  * To select a node for transmission, first it is checked if the
1127  * current node's message fits with the MTU.  If it does not, we
1128  * either calculate the next fragment (based on @e frag_off) from the
1129  * current node, or, if all fragments have already been created,
1130  * descend to the @e head_frag.  Even though the node was already
1131  * fragmented, the fragment may be too big if the fragment was
1132  * generated for a queue with a larger MTU. In this case, the node
1133  * may be fragmented again, thus creating a tree.
1134  *
1135  * When acknowledgements for fragments are received, the tree
1136  * must be pruned, removing those parts that were already
1137  * acknowledged.  When fragments are sent over a reliable
1138  * channel, they can be immediately removed.
1139  *
1140  * If a message is ever fragmented, then the original "full" message
1141  * is never again transmitted (even if it fits below the MTU), and
1142  * only (remaining) fragments are sent.
1143  */
1144 struct PendingMessage
1145 {
1146   /**
1147    * Kept in a MDLL of messages for this @a target.
1148    */
1149   struct PendingMessage *next_neighbour;
1150
1151   /**
1152    * Kept in a MDLL of messages for this @a target.
1153    */
1154   struct PendingMessage *prev_neighbour;
1155
1156   /**
1157    * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
1158    */
1159   struct PendingMessage *next_client;
1160
1161   /**
1162    * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
1163    */
1164   struct PendingMessage *prev_client;
1165
1166   /**
1167    * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx)
1168    */
1169   struct PendingMessage *next_frag;
1170
1171   /**
1172    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is #PMT_FRAGMENT_BOX)
1173    */
1174   struct PendingMessage *prev_frag;
1175
1176   /**
1177    * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
1178    */
1179   struct PendingMessage *bpm;
1180
1181   /**
1182    * Target of the request.
1183    */
1184   struct Neighbour *target;
1185
1186   /**
1187    * Client that issued the transmission request, if @e pmt is #PMT_CORE.
1188    */
1189   struct TransportClient *client;
1190
1191   /**
1192    * Head of a MDLL of fragments created for this core message.
1193    */
1194   struct PendingMessage *head_frag;
1195
1196   /**
1197    * Tail of a MDLL of fragments created for this core message.
1198    */
1199   struct PendingMessage *tail_frag;
1200
1201   /**
1202    * Our parent in the fragmentation tree.
1203    */
1204   struct PendingMessage *frag_parent;
1205
1206   /**
1207    * At what time should we give up on the transmission (and no longer retry)?
1208    */
1209   struct GNUNET_TIME_Absolute timeout;
1210
1211   /**
1212    * What is the earliest time for us to retry transmission of this message?
1213    */
1214   struct GNUNET_TIME_Absolute next_attempt;
1215
1216   /**
1217    * UUID to use for this message (used for reassembly of fragments, only
1218    * initialized if @e msg_uuid_set is #GNUNET_YES).
1219    */
1220   struct GNUNET_ShortHashCode msg_uuid;
1221
1222   /**
1223    * Counter incremented per generated fragment.
1224    */
1225   uint32_t frag_uuidgen;
1226
1227   /**
1228    * Type of the pending message.
1229    */
1230   enum PendingMessageType pmt;
1231
1232   /**
1233    * Size of the original message.
1234    */
1235   uint16_t bytes_msg;
1236
1237   /**
1238    * Offset at which we should generate the next fragment.
1239    */
1240   uint16_t frag_off;
1241
1242   /**
1243    * #GNUNET_YES once @e msg_uuid was initialized
1244    */
1245   int16_t msg_uuid_set;
1246
1247   /* Followed by @e bytes_msg to transmit */
1248 };
1249
1250
1251 /**
1252  * One of the addresses of this peer.
1253  */
1254 struct AddressListEntry
1255 {
1256
1257   /**
1258    * Kept in a DLL.
1259    */
1260   struct AddressListEntry *next;
1261
1262   /**
1263    * Kept in a DLL.
1264    */
1265   struct AddressListEntry *prev;
1266
1267   /**
1268    * Which communicator provides this address?
1269    */
1270   struct TransportClient *tc;
1271
1272   /**
1273    * The actual address.
1274    */
1275   const char *address;
1276
1277   /**
1278    * Current context for storing this address in the peerstore.
1279    */
1280   struct GNUNET_PEERSTORE_StoreContext *sc;
1281
1282   /**
1283    * Task to periodically do @e st operation.
1284    */
1285   struct GNUNET_SCHEDULER_Task *st;
1286
1287   /**
1288    * What is a typical lifetime the communicator expects this
1289    * address to have? (Always from now.)
1290    */
1291   struct GNUNET_TIME_Relative expiration;
1292
1293   /**
1294    * Address identifier used by the communicator.
1295    */
1296   uint32_t aid;
1297
1298   /**
1299    * Network type offered by this address.
1300    */
1301   enum GNUNET_NetworkType nt;
1302
1303 };
1304
1305
1306 /**
1307  * Client connected to the transport service.
1308  */
1309 struct TransportClient
1310 {
1311
1312   /**
1313    * Kept in a DLL.
1314    */
1315   struct TransportClient *next;
1316
1317   /**
1318    * Kept in a DLL.
1319    */
1320   struct TransportClient *prev;
1321
1322   /**
1323    * Handle to the client.
1324    */
1325   struct GNUNET_SERVICE_Client *client;
1326
1327   /**
1328    * Message queue to the client.
1329    */
1330   struct GNUNET_MQ_Handle *mq;
1331
1332   /**
1333    * What type of client is this?
1334    */
1335   enum ClientType type;
1336
1337   union
1338   {
1339
1340     /**
1341      * Information for @e type #CT_CORE.
1342      */
1343     struct {
1344
1345       /**
1346        * Head of list of messages pending for this client, sorted by
1347        * transmission time ("next_attempt" + possibly internal prioritization).
1348        */
1349       struct PendingMessage *pending_msg_head;
1350
1351       /**
1352        * Tail of list of messages pending for this client.
1353        */
1354       struct PendingMessage *pending_msg_tail;
1355
1356     } core;
1357
1358     /**
1359      * Information for @e type #CT_MONITOR.
1360      */
1361     struct {
1362
1363       /**
1364        * Peer identity to monitor the addresses of.
1365        * Zero to monitor all neighbours.  Valid if
1366        * @e type is #CT_MONITOR.
1367        */
1368       struct GNUNET_PeerIdentity peer;
1369
1370       /**
1371        * Is this a one-shot monitor?
1372        */
1373       int one_shot;
1374
1375     } monitor;
1376
1377
1378     /**
1379      * Information for @e type #CT_COMMUNICATOR.
1380      */
1381     struct {
1382       /**
1383        * If @e type is #CT_COMMUNICATOR, this communicator
1384        * supports communicating using these addresses.
1385        */
1386       char *address_prefix;
1387
1388       /**
1389        * Head of DLL of queues offered by this communicator.
1390        */
1391       struct Queue *queue_head;
1392
1393       /**
1394        * Tail of DLL of queues offered by this communicator.
1395        */
1396       struct Queue *queue_tail;
1397
1398       /**
1399        * Head of list of the addresses of this peer offered by this communicator.
1400        */
1401       struct AddressListEntry *addr_head;
1402
1403       /**
1404        * Tail of list of the addresses of this peer offered by this communicator.
1405        */
1406       struct AddressListEntry *addr_tail;
1407
1408       /**
1409        * Number of queue entries in all queues to this communicator. Used
1410        * throttle sending to a communicator if we see that the communicator
1411        * is globally unable to keep up.
1412        */
1413       unsigned int total_queue_length;
1414
1415       /**
1416        * Characteristics of this communicator.
1417        */
1418       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
1419
1420     } communicator;
1421
1422     /**
1423      * Information for @e type #CT_APPLICATION
1424      */
1425     struct {
1426
1427       /**
1428        * Map of requests for peers the given client application would like to
1429        * see connections for.  Maps from PIDs to `struct PeerRequest`.
1430        */
1431       struct GNUNET_CONTAINER_MultiPeerMap *requests;
1432
1433     } application;
1434
1435   } details;
1436
1437 };
1438
1439
1440 /**
1441  * Head of linked list of all clients to this service.
1442  */
1443 static struct TransportClient *clients_head;
1444
1445 /**
1446  * Tail of linked list of all clients to this service.
1447  */
1448 static struct TransportClient *clients_tail;
1449
1450 /**
1451  * Statistics handle.
1452  */
1453 static struct GNUNET_STATISTICS_Handle *GST_stats;
1454
1455 /**
1456  * Configuration handle.
1457  */
1458 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
1459
1460 /**
1461  * Our public key.
1462  */
1463 static struct GNUNET_PeerIdentity GST_my_identity;
1464
1465 /**
1466  * Our private key.
1467  */
1468 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
1469
1470 /**
1471  * Map from PIDs to `struct Neighbour` entries.  A peer is
1472  * a neighbour if we have an MQ to it from some communicator.
1473  */
1474 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
1475
1476 /**
1477  * Map from PIDs to `struct DistanceVector` entries describing
1478  * known paths to the peer.
1479  */
1480 static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
1481
1482 /**
1483  * Database for peer's HELLOs.
1484  */
1485 static struct GNUNET_PEERSTORE_Handle *peerstore;
1486
1487 /**
1488  * Heap sorting `struct EphemeralCacheEntry` by their
1489  * key/signature validity.
1490  */
1491 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
1492
1493 /**
1494  * Hash map for looking up `struct EphemeralCacheEntry`s
1495  * by peer identity. (We may have ephemerals in our
1496  * cache for which we do not have a neighbour entry,
1497  * and similar many neighbours may not need ephemerals,
1498  * so we use a second map.)
1499  */
1500 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
1501
1502 /**
1503  * Task to free expired ephemerals.
1504  */
1505 static struct GNUNET_SCHEDULER_Task *ephemeral_task;
1506
1507
1508 /**
1509  * Free cached ephemeral key.
1510  *
1511  * @param ece cached signature to free
1512  */
1513 static void
1514 free_ephemeral (struct EphemeralCacheEntry *ece)
1515 {
1516   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
1517                                         &ece->target,
1518                                         ece);
1519   GNUNET_CONTAINER_heap_remove_node (ece->hn);
1520   GNUNET_free (ece);
1521 }
1522
1523
1524 /**
1525  * Lookup neighbour record for peer @a pid.
1526  *
1527  * @param pid neighbour to look for
1528  * @return NULL if we do not have this peer as a neighbour
1529  */
1530 static struct Neighbour *
1531 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
1532 {
1533   return GNUNET_CONTAINER_multipeermap_get (neighbours,
1534                                             pid);
1535 }
1536
1537
1538 /**
1539  * Details about what to notify monitors about.
1540  */
1541 struct MonitorEvent
1542 {
1543   /**
1544    * @deprecated To be discussed if we keep these...
1545    */
1546   struct GNUNET_TIME_Absolute last_validation;
1547   struct GNUNET_TIME_Absolute valid_until;
1548   struct GNUNET_TIME_Absolute next_validation;
1549
1550   /**
1551    * Current round-trip time estimate.
1552    */
1553   struct GNUNET_TIME_Relative rtt;
1554
1555   /**
1556    * Connection status.
1557    */
1558   enum GNUNET_TRANSPORT_ConnectionStatus cs;
1559
1560   /**
1561    * Messages pending.
1562    */
1563   uint32_t num_msg_pending;
1564
1565   /**
1566    * Bytes pending.
1567    */
1568   uint32_t num_bytes_pending;
1569
1570
1571 };
1572
1573
1574 /**
1575  * Free a @dvh, and if it is the last path to the `target`,also
1576  * free the associated DV entry in #dv_routes.
1577  *
1578  * @param dvh hop to free
1579  */
1580 static void
1581 free_distance_vector_hop (struct DistanceVectorHop *dvh)
1582 {
1583   struct Neighbour *n = dvh->next_hop;
1584   struct DistanceVector *dv = dvh->dv;
1585
1586   GNUNET_CONTAINER_MDLL_remove (neighbour,
1587                                 n->dv_head,
1588                                 n->dv_tail,
1589                                 dvh);
1590   GNUNET_CONTAINER_MDLL_remove (dv,
1591                                 dv->dv_head,
1592                                 dv->dv_tail,
1593                                 dvh);
1594   GNUNET_free (dvh);
1595   if (NULL == dv->dv_head)
1596   {
1597     GNUNET_assert (GNUNET_YES ==
1598                    GNUNET_CONTAINER_multipeermap_remove (dv_routes,
1599                                                          &dv->target,
1600                                                          dv));
1601     if (NULL != dv->timeout_task)
1602       GNUNET_SCHEDULER_cancel (dv->timeout_task);
1603     GNUNET_free (dv);
1604   }
1605 }
1606
1607
1608 /**
1609  * Free entry in #dv_routes.  First frees all hops to the target, and
1610  * the last target will implicitly free @a dv as well.
1611  *
1612  * @param dv route to free
1613  */
1614 static void
1615 free_dv_route (struct DistanceVector *dv)
1616 {
1617   struct DistanceVectorHop *dvh;
1618
1619   while (NULL != (dvh = dv->dv_head))
1620     free_distance_vector_hop (dvh);
1621 }
1622
1623
1624 /**
1625  * Notify monitor @a tc about an event.  That @a tc
1626  * cares about the event has already been checked.
1627  *
1628  * Send @a tc information in @a me about a @a peer's status with
1629  * respect to some @a address to all monitors that care.
1630  *
1631  * @param tc monitor to inform
1632  * @param peer peer the information is about
1633  * @param address address the information is about
1634  * @param nt network type associated with @a address
1635  * @param me detailed information to transmit
1636  */
1637 static void
1638 notify_monitor (struct TransportClient *tc,
1639                 const struct GNUNET_PeerIdentity *peer,
1640                 const char *address,
1641                 enum GNUNET_NetworkType nt,
1642                 const struct MonitorEvent *me)
1643 {
1644   struct GNUNET_MQ_Envelope *env;
1645   struct GNUNET_TRANSPORT_MonitorData *md;
1646   size_t addr_len = strlen (address) + 1;
1647
1648   env = GNUNET_MQ_msg_extra (md,
1649                              addr_len,
1650                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
1651   md->nt = htonl ((uint32_t) nt);
1652   md->peer = *peer;
1653   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
1654   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
1655   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
1656   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
1657   md->cs = htonl ((uint32_t) me->cs);
1658   md->num_msg_pending = htonl (me->num_msg_pending);
1659   md->num_bytes_pending = htonl (me->num_bytes_pending);
1660   memcpy (&md[1],
1661           address,
1662           addr_len);
1663   GNUNET_MQ_send (tc->mq,
1664                   env);
1665 }
1666
1667
1668 /**
1669  * Send information in @a me about a @a peer's status with respect
1670  * to some @a address to all monitors that care.
1671  *
1672  * @param peer peer the information is about
1673  * @param address address the information is about
1674  * @param nt network type associated with @a address
1675  * @param me detailed information to transmit
1676  */
1677 static void
1678 notify_monitors (const struct GNUNET_PeerIdentity *peer,
1679                  const char *address,
1680                  enum GNUNET_NetworkType nt,
1681                  const struct MonitorEvent *me)
1682 {
1683   static struct GNUNET_PeerIdentity zero;
1684
1685   for (struct TransportClient *tc = clients_head;
1686        NULL != tc;
1687        tc = tc->next)
1688   {
1689     if (CT_MONITOR != tc->type)
1690       continue;
1691     if (tc->details.monitor.one_shot)
1692       continue;
1693     if ( (0 != memcmp (&tc->details.monitor.peer,
1694                        &zero,
1695                        sizeof (zero))) &&
1696          (0 != memcmp (&tc->details.monitor.peer,
1697                        peer,
1698                        sizeof (*peer))) )
1699       continue;
1700     notify_monitor (tc,
1701                     peer,
1702                     address,
1703                     nt,
1704                     me);
1705   }
1706 }
1707
1708
1709 /**
1710  * Called whenever a client connects.  Allocates our
1711  * data structures associated with that client.
1712  *
1713  * @param cls closure, NULL
1714  * @param client identification of the client
1715  * @param mq message queue for the client
1716  * @return our `struct TransportClient`
1717  */
1718 static void *
1719 client_connect_cb (void *cls,
1720                    struct GNUNET_SERVICE_Client *client,
1721                    struct GNUNET_MQ_Handle *mq)
1722 {
1723   struct TransportClient *tc;
1724
1725   tc = GNUNET_new (struct TransportClient);
1726   tc->client = client;
1727   tc->mq = mq;
1728   GNUNET_CONTAINER_DLL_insert (clients_head,
1729                                clients_tail,
1730                                tc);
1731   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1732               "Client %p connected\n",
1733               tc);
1734   return tc;
1735 }
1736
1737
1738 /**
1739  * Free @a rc
1740  *
1741  * @param rc data structure to free
1742  */
1743 static void
1744 free_reassembly_context (struct ReassemblyContext *rc)
1745 {
1746   struct Neighbour *n = rc->neighbour;
1747
1748   GNUNET_assert (rc ==
1749                  GNUNET_CONTAINER_heap_remove_node (rc->hn));
1750   GNUNET_assert (GNUNET_OK ==
1751                  GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
1752                                                         &rc->msg_uuid,
1753                                                         rc));
1754   GNUNET_free (rc);
1755 }
1756
1757
1758 /**
1759  * Task run to clean up reassembly context of a neighbour that have expired.
1760  *
1761  * @param cls a `struct Neighbour`
1762  */
1763 static void
1764 reassembly_cleanup_task (void *cls)
1765 {
1766   struct Neighbour *n = cls;
1767   struct ReassemblyContext *rc;
1768
1769   n->reassembly_timeout_task = NULL;
1770   while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
1771   {
1772     if (0 == GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout).rel_value_us)
1773     {
1774       free_reassembly_context (rc);
1775       continue;
1776     }
1777     GNUNET_assert (NULL == n->reassembly_timeout_task);
1778     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
1779                                                           &reassembly_cleanup_task,
1780                                                           n);
1781     return;
1782   }
1783 }
1784
1785
1786 /**
1787  * function called to #free_reassembly_context().
1788  *
1789  * @param cls NULL
1790  * @param key unused
1791  * @param value a `struct ReassemblyContext` to free
1792  * @return #GNUNET_OK (continue iteration)
1793  */
1794 static int
1795 free_reassembly_cb (void *cls,
1796                     const struct GNUNET_ShortHashCode *key,
1797                     void *value)
1798 {
1799   struct ReassemblyContext *rc = value;
1800   (void) cls;
1801   (void) key;
1802
1803   free_reassembly_context (rc);
1804   return GNUNET_OK;
1805 }
1806
1807
1808 /**
1809  * Release memory used by @a neighbour.
1810  *
1811  * @param neighbour neighbour entry to free
1812  */
1813 static void
1814 free_neighbour (struct Neighbour *neighbour)
1815 {
1816   struct DistanceVectorHop *dvh;
1817
1818   GNUNET_assert (NULL == neighbour->queue_head);
1819   GNUNET_assert (GNUNET_YES ==
1820                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
1821                                                        &neighbour->pid,
1822                                                        neighbour));
1823   if (NULL != neighbour->timeout_task)
1824     GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
1825   if (NULL != neighbour->reassembly_map)
1826   {
1827     GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
1828                                             &free_reassembly_cb,
1829                                             NULL);
1830     GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
1831     neighbour->reassembly_map = NULL;
1832     GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
1833     neighbour->reassembly_heap = NULL;
1834   }
1835   while (NULL != (dvh = neighbour->dv_head))
1836     free_distance_vector_hop (dvh);
1837   if (NULL != neighbour->reassembly_timeout_task)
1838     GNUNET_SCHEDULER_cancel (neighbour->reassembly_timeout_task);
1839   GNUNET_free (neighbour);
1840 }
1841
1842
1843 /**
1844  * Send message to CORE clients that we lost a connection.
1845  *
1846  * @param tc client to inform (must be CORE client)
1847  * @param pid peer the connection is for
1848  * @param quota_out current quota for the peer
1849  */
1850 static void
1851 core_send_connect_info (struct TransportClient *tc,
1852                         const struct GNUNET_PeerIdentity *pid,
1853                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1854 {
1855   struct GNUNET_MQ_Envelope *env;
1856   struct ConnectInfoMessage *cim;
1857
1858   GNUNET_assert (CT_CORE == tc->type);
1859   env = GNUNET_MQ_msg (cim,
1860                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1861   cim->quota_out = quota_out;
1862   cim->id = *pid;
1863   GNUNET_MQ_send (tc->mq,
1864                   env);
1865 }
1866
1867
1868 /**
1869  * Send message to CORE clients that we gained a connection
1870  *
1871  * @param pid peer the queue was for
1872  * @param quota_out current quota for the peer
1873  */
1874 static void
1875 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
1876                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1877 {
1878   for (struct TransportClient *tc = clients_head;
1879        NULL != tc;
1880        tc = tc->next)
1881   {
1882     if (CT_CORE != tc->type)
1883       continue;
1884     core_send_connect_info (tc,
1885                             pid,
1886                             quota_out);
1887   }
1888 }
1889
1890
1891 /**
1892  * Send message to CORE clients that we lost a connection.
1893  *
1894  * @param pid peer the connection was for
1895  */
1896 static void
1897 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
1898 {
1899   for (struct TransportClient *tc = clients_head;
1900        NULL != tc;
1901        tc = tc->next)
1902   {
1903     struct GNUNET_MQ_Envelope *env;
1904     struct DisconnectInfoMessage *dim;
1905
1906     if (CT_CORE != tc->type)
1907       continue;
1908     env = GNUNET_MQ_msg (dim,
1909                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1910     dim->peer = *pid;
1911     GNUNET_MQ_send (tc->mq,
1912                     env);
1913   }
1914 }
1915
1916
1917 /**
1918  * We believe we are ready to transmit a message on a queue. Double-checks
1919  * with the queue's "tracker_out" and then gives the message to the
1920  * communicator for transmission (updating the tracker, and re-scheduling
1921  * itself if applicable).
1922  *
1923  * @param cls the `struct Queue` to process transmissions for
1924  */
1925 static void
1926 transmit_on_queue (void *cls);
1927
1928
1929 /**
1930  * Schedule next run of #transmit_on_queue().  Does NOTHING if
1931  * we should run immediately or if the message queue is empty.
1932  * Test for no task being added AND queue not being empty to
1933  * transmit immediately afterwards!  This function must only
1934  * be called if the message queue is non-empty!
1935  *
1936  * @param queue the queue to do scheduling for
1937  */
1938 static void
1939 schedule_transmit_on_queue (struct Queue *queue)
1940 {
1941   struct Neighbour *n = queue->neighbour;
1942   struct PendingMessage *pm = n->pending_msg_head;
1943   struct GNUNET_TIME_Relative out_delay;
1944   unsigned int wsize;
1945
1946   GNUNET_assert (NULL != pm);
1947   if (queue->tc->details.communicator.total_queue_length >=
1948       COMMUNICATOR_TOTAL_QUEUE_LIMIT)
1949   {
1950     GNUNET_STATISTICS_update (GST_stats,
1951                               "# Transmission throttled due to communicator queue limit",
1952                               1,
1953                               GNUNET_NO);
1954     return;
1955   }
1956   if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
1957   {
1958     GNUNET_STATISTICS_update (GST_stats,
1959                               "# Transmission throttled due to queue queue limit",
1960                               1,
1961                               GNUNET_NO);
1962     return;
1963   }
1964
1965   wsize = (0 == queue->mtu)
1966     ? pm->bytes_msg /* FIXME: add overheads? */
1967     : queue->mtu;
1968   out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
1969                                                   wsize);
1970   out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
1971                                         out_delay);
1972   if (0 == out_delay.rel_value_us)
1973     return; /* we should run immediately! */
1974   /* queue has changed since we were scheduled, reschedule again */
1975   queue->transmit_task
1976     = GNUNET_SCHEDULER_add_delayed (out_delay,
1977                                     &transmit_on_queue,
1978                                     queue);
1979   if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
1980     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1981                 "Next transmission on queue `%s' in %s (high delay)\n",
1982                 queue->address,
1983                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1984                                                         GNUNET_YES));
1985   else
1986     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1987                 "Next transmission on queue `%s' in %s\n",
1988                 queue->address,
1989                 GNUNET_STRINGS_relative_time_to_string (out_delay,
1990                                                         GNUNET_YES));
1991 }
1992
1993
1994 /**
1995  * Free @a queue.
1996  *
1997  * @param queue the queue to free
1998  */
1999 static void
2000 free_queue (struct Queue *queue)
2001 {
2002   struct Neighbour *neighbour = queue->neighbour;
2003   struct TransportClient *tc = queue->tc;
2004   struct MonitorEvent me = {
2005     .cs = GNUNET_TRANSPORT_CS_DOWN,
2006     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
2007   };
2008   struct QueueEntry *qe;
2009   int maxxed;
2010
2011   if (NULL != queue->transmit_task)
2012   {
2013     GNUNET_SCHEDULER_cancel (queue->transmit_task);
2014     queue->transmit_task = NULL;
2015   }
2016   GNUNET_CONTAINER_MDLL_remove (neighbour,
2017                                 neighbour->queue_head,
2018                                 neighbour->queue_tail,
2019                                 queue);
2020   GNUNET_CONTAINER_MDLL_remove (client,
2021                                 tc->details.communicator.queue_head,
2022                                 tc->details.communicator.queue_tail,
2023                                 queue);
2024   maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
2025   while (NULL != (qe = queue->queue_head))
2026   {
2027     GNUNET_CONTAINER_DLL_remove (queue->queue_head,
2028                                  queue->queue_tail,
2029                                  qe);
2030     queue->queue_length--;
2031     tc->details.communicator.total_queue_length--;
2032     GNUNET_free (qe);
2033   }
2034   GNUNET_assert (0 == queue->queue_length);
2035   if ( (maxxed) &&
2036        (COMMUNICATOR_TOTAL_QUEUE_LIMIT < tc->details.communicator.total_queue_length) )
2037   {
2038     /* Communicator dropped below threshold, resume all queues */
2039     GNUNET_STATISTICS_update (GST_stats,
2040                               "# Transmission throttled due to communicator queue limit",
2041                               -1,
2042                               GNUNET_NO);
2043     for (struct Queue *s = tc->details.communicator.queue_head;
2044          NULL != s;
2045          s = s->next_client)
2046       schedule_transmit_on_queue (s);
2047   }
2048   notify_monitors (&neighbour->pid,
2049                    queue->address,
2050                    queue->nt,
2051                    &me);
2052   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
2053   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
2054   GNUNET_free (queue);
2055   if (NULL == neighbour->queue_head)
2056   {
2057     cores_send_disconnect_info (&neighbour->pid);
2058     free_neighbour (neighbour);
2059   }
2060 }
2061
2062
2063 /**
2064  * Free @a ale
2065  *
2066  * @param ale address list entry to free
2067  */
2068 static void
2069 free_address_list_entry (struct AddressListEntry *ale)
2070 {
2071   struct TransportClient *tc = ale->tc;
2072
2073   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
2074                                tc->details.communicator.addr_tail,
2075                                ale);
2076   if (NULL != ale->sc)
2077   {
2078     GNUNET_PEERSTORE_store_cancel (ale->sc);
2079     ale->sc = NULL;
2080   }
2081   if (NULL != ale->st)
2082   {
2083     GNUNET_SCHEDULER_cancel (ale->st);
2084     ale->st = NULL;
2085   }
2086   GNUNET_free (ale);
2087 }
2088
2089
2090 /**
2091  * Stop the peer request in @a value.
2092  *
2093  * @param cls a `struct TransportClient` that no longer makes the request
2094  * @param pid the peer's identity
2095  * @param value a `struct PeerRequest`
2096  * @return #GNUNET_YES (always)
2097  */
2098 static int
2099 stop_peer_request (void *cls,
2100                    const struct GNUNET_PeerIdentity *pid,
2101                    void *value)
2102 {
2103   struct TransportClient *tc = cls;
2104   struct PeerRequest *pr = value;
2105
2106   GNUNET_PEERSTORE_watch_cancel (pr->wc);
2107   GNUNET_assert (GNUNET_YES ==
2108                  GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
2109                                                        pid,
2110                                                        pr));
2111   GNUNET_free (pr);
2112
2113   return GNUNET_OK;
2114 }
2115
2116
2117 /**
2118  * Called whenever a client is disconnected.  Frees our
2119  * resources associated with that client.
2120  *
2121  * @param cls closure, NULL
2122  * @param client identification of the client
2123  * @param app_ctx our `struct TransportClient`
2124  */
2125 static void
2126 client_disconnect_cb (void *cls,
2127                       struct GNUNET_SERVICE_Client *client,
2128                       void *app_ctx)
2129 {
2130   struct TransportClient *tc = app_ctx;
2131
2132   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2133               "Client %p disconnected, cleaning up.\n",
2134               tc);
2135   GNUNET_CONTAINER_DLL_remove (clients_head,
2136                                clients_tail,
2137                                tc);
2138   switch (tc->type)
2139   {
2140   case CT_NONE:
2141     break;
2142   case CT_CORE:
2143     {
2144       struct PendingMessage *pm;
2145
2146       while (NULL != (pm = tc->details.core.pending_msg_head))
2147       {
2148         GNUNET_CONTAINER_MDLL_remove (client,
2149                                       tc->details.core.pending_msg_head,
2150                                       tc->details.core.pending_msg_tail,
2151                                       pm);
2152         pm->client = NULL;
2153       }
2154     }
2155     break;
2156   case CT_MONITOR:
2157     break;
2158   case CT_COMMUNICATOR:
2159     {
2160       struct Queue *q;
2161       struct AddressListEntry *ale;
2162
2163       while (NULL != (q = tc->details.communicator.queue_head))
2164         free_queue (q);
2165       while (NULL != (ale = tc->details.communicator.addr_head))
2166         free_address_list_entry (ale);
2167       GNUNET_free (tc->details.communicator.address_prefix);
2168     }
2169     break;
2170   case CT_APPLICATION:
2171     GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
2172                                            &stop_peer_request,
2173                                            tc);
2174     GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
2175     break;
2176   }
2177   GNUNET_free (tc);
2178 }
2179
2180
2181 /**
2182  * Iterator telling new CORE client about all existing
2183  * connections to peers.
2184  *
2185  * @param cls the new `struct TransportClient`
2186  * @param pid a connected peer
2187  * @param value the `struct Neighbour` with more information
2188  * @return #GNUNET_OK (continue to iterate)
2189  */
2190 static int
2191 notify_client_connect_info (void *cls,
2192                             const struct GNUNET_PeerIdentity *pid,
2193                             void *value)
2194 {
2195   struct TransportClient *tc = cls;
2196   struct Neighbour *neighbour = value;
2197
2198   core_send_connect_info (tc,
2199                           pid,
2200                           neighbour->quota_out);
2201   return GNUNET_OK;
2202 }
2203
2204
2205 /**
2206  * Initialize a "CORE" client.  We got a start message from this
2207  * client, so add it to the list of clients for broadcasting of
2208  * inbound messages.
2209  *
2210  * @param cls the client
2211  * @param start the start message that was sent
2212  */
2213 static void
2214 handle_client_start (void *cls,
2215                      const struct StartMessage *start)
2216 {
2217   struct TransportClient *tc = cls;
2218   uint32_t options;
2219
2220   options = ntohl (start->options);
2221   if ( (0 != (1 & options)) &&
2222        (0 !=
2223         memcmp (&start->self,
2224                 &GST_my_identity,
2225                 sizeof (struct GNUNET_PeerIdentity)) ) )
2226   {
2227     /* client thinks this is a different peer, reject */
2228     GNUNET_break (0);
2229     GNUNET_SERVICE_client_drop (tc->client);
2230     return;
2231   }
2232   if (CT_NONE != tc->type)
2233   {
2234     GNUNET_break (0);
2235     GNUNET_SERVICE_client_drop (tc->client);
2236     return;
2237   }
2238   tc->type = CT_CORE;
2239   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2240                                          &notify_client_connect_info,
2241                                          tc);
2242   GNUNET_SERVICE_client_continue (tc->client);
2243 }
2244
2245
2246 /**
2247  * Client asked for transmission to a peer.  Process the request.
2248  *
2249  * @param cls the client
2250  * @param obm the send message that was sent
2251  */
2252 static int
2253 check_client_send (void *cls,
2254                    const struct OutboundMessage *obm)
2255 {
2256   struct TransportClient *tc = cls;
2257   uint16_t size;
2258   const struct GNUNET_MessageHeader *obmm;
2259
2260   if (CT_CORE != tc->type)
2261   {
2262     GNUNET_break (0);
2263     return GNUNET_SYSERR;
2264   }
2265   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
2266   if (size < sizeof (struct GNUNET_MessageHeader))
2267   {
2268     GNUNET_break (0);
2269     return GNUNET_SYSERR;
2270   }
2271   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2272   if (size != ntohs (obmm->size))
2273   {
2274     GNUNET_break (0);
2275     return GNUNET_SYSERR;
2276   }
2277   return GNUNET_OK;
2278 }
2279
2280
2281 /**
2282  * Free fragment tree below @e root, excluding @e root itself.
2283  *
2284  * @param root root of the tree to free
2285  */
2286 static void
2287 free_fragment_tree (struct PendingMessage *root)
2288 {
2289   struct PendingMessage *frag;
2290
2291   while (NULL != (frag = root->head_frag))
2292   {
2293     free_fragment_tree (frag);
2294     GNUNET_CONTAINER_MDLL_remove (frag,
2295                                   root->head_frag,
2296                                   root->tail_frag,
2297                                   frag);
2298     GNUNET_free (frag);
2299   }
2300 }
2301
2302
2303 /**
2304  * Release memory associated with @a pm and remove @a pm from associated
2305  * data structures.  @a pm must be a top-level pending message and not
2306  * a fragment in the tree.  The entire tree is freed (if applicable).
2307  *
2308  * @param pm the pending message to free
2309  */
2310 static void
2311 free_pending_message (struct PendingMessage *pm)
2312 {
2313   struct TransportClient *tc = pm->client;
2314   struct Neighbour *target = pm->target;
2315
2316   if (NULL != tc)
2317   {
2318     GNUNET_CONTAINER_MDLL_remove (client,
2319                                   tc->details.core.pending_msg_head,
2320                                   tc->details.core.pending_msg_tail,
2321                                   pm);
2322   }
2323   GNUNET_CONTAINER_MDLL_remove (neighbour,
2324                                 target->pending_msg_head,
2325                                 target->pending_msg_tail,
2326                                 pm);
2327   free_fragment_tree (pm);
2328   GNUNET_free_non_null (pm->bpm);
2329   GNUNET_free (pm);
2330 }
2331
2332
2333 /**
2334  * Send a response to the @a pm that we have processed a
2335  * "send" request with status @a success. We
2336  * transmitted @a bytes_physical on the actual wire.
2337  * Sends a confirmation to the "core" client responsible
2338  * for the original request and free's @a pm.
2339  *
2340  * @param pm handle to the original pending message
2341  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
2342  *          for transmission failure
2343  * @param bytes_physical amount of bandwidth consumed
2344  */
2345 static void
2346 client_send_response (struct PendingMessage *pm,
2347                       int success,
2348                       uint32_t bytes_physical)
2349 {
2350   struct TransportClient *tc = pm->client;
2351   struct Neighbour *target = pm->target;
2352   struct GNUNET_MQ_Envelope *env;
2353   struct SendOkMessage *som;
2354
2355   if (NULL != tc)
2356   {
2357     env = GNUNET_MQ_msg (som,
2358                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2359     som->success = htonl ((uint32_t) success);
2360     som->bytes_msg = htons (pm->bytes_msg);
2361     som->bytes_physical = htonl (bytes_physical);
2362     som->peer = target->pid;
2363     GNUNET_MQ_send (tc->mq,
2364                     env);
2365   }
2366   free_pending_message (pm);
2367 }
2368
2369
2370 /**
2371  * Checks the message queue for a neighbour for messages that have timed
2372  * out and purges them.
2373  *
2374  * @param cls a `struct Neighbour`
2375  */
2376 static void
2377 check_queue_timeouts (void *cls)
2378 {
2379   struct Neighbour *n = cls;
2380   struct PendingMessage *pm;
2381   struct GNUNET_TIME_Absolute now;
2382   struct GNUNET_TIME_Absolute earliest_timeout;
2383
2384   n->timeout_task = NULL;
2385   earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2386   now = GNUNET_TIME_absolute_get ();
2387   for (struct PendingMessage *pos = n->pending_msg_head;
2388        NULL != pos;
2389        pos = pm)
2390   {
2391     pm = pos->next_neighbour;
2392     if (pos->timeout.abs_value_us <= now.abs_value_us)
2393     {
2394       GNUNET_STATISTICS_update (GST_stats,
2395                                 "# messages dropped (timeout before confirmation)",
2396                                 1,
2397                                 GNUNET_NO);
2398       client_send_response (pm,
2399                             GNUNET_NO,
2400                             0);
2401       continue;
2402     }
2403     earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
2404                                                  pos->timeout);
2405   }
2406   n->earliest_timeout = earliest_timeout;
2407   if (NULL != n->pending_msg_head)
2408     n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
2409                                                &check_queue_timeouts,
2410                                                n);
2411 }
2412
2413
2414 /**
2415  * Client asked for transmission to a peer.  Process the request.
2416  *
2417  * @param cls the client
2418  * @param obm the send message that was sent
2419  */
2420 static void
2421 handle_client_send (void *cls,
2422                     const struct OutboundMessage *obm)
2423 {
2424   struct TransportClient *tc = cls;
2425   struct PendingMessage *pm;
2426   const struct GNUNET_MessageHeader *obmm;
2427   struct Neighbour *target;
2428   uint32_t bytes_msg;
2429   int was_empty;
2430
2431   GNUNET_assert (CT_CORE == tc->type);
2432   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
2433   bytes_msg = ntohs (obmm->size);
2434   target = lookup_neighbour (&obm->peer);
2435   if (NULL == target)
2436   {
2437     /* Failure: don't have this peer as a neighbour (anymore).
2438        Might have gone down asynchronously, so this is NOT
2439        a protocol violation by CORE. Still count the event,
2440        as this should be rare. */
2441     struct GNUNET_MQ_Envelope *env;
2442     struct SendOkMessage *som;
2443
2444     env = GNUNET_MQ_msg (som,
2445                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
2446     som->success = htonl (GNUNET_SYSERR);
2447     som->bytes_msg = htonl (bytes_msg);
2448     som->bytes_physical = htonl (0);
2449     som->peer = obm->peer;
2450     GNUNET_MQ_send (tc->mq,
2451                     env);
2452     GNUNET_SERVICE_client_continue (tc->client);
2453     GNUNET_STATISTICS_update (GST_stats,
2454                               "# messages dropped (neighbour unknown)",
2455                               1,
2456                               GNUNET_NO);
2457     return;
2458   }
2459   was_empty = (NULL == target->pending_msg_head);
2460   pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
2461   pm->client = tc;
2462   pm->target = target;
2463   pm->bytes_msg = bytes_msg;
2464   pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
2465   memcpy (&pm[1],
2466           &obm[1],
2467           bytes_msg);
2468   GNUNET_CONTAINER_MDLL_insert (neighbour,
2469                                 target->pending_msg_head,
2470                                 target->pending_msg_tail,
2471                                 pm);
2472   GNUNET_CONTAINER_MDLL_insert (client,
2473                                 tc->details.core.pending_msg_head,
2474                                 tc->details.core.pending_msg_tail,
2475                                 pm);
2476   if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
2477   {
2478     target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
2479     if (NULL != target->timeout_task)
2480       GNUNET_SCHEDULER_cancel (target->timeout_task);
2481     target->timeout_task
2482       = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
2483                                  &check_queue_timeouts,
2484                                  target);
2485   }
2486   if (! was_empty)
2487     return; /* all queues must already be busy */
2488   for (struct Queue *queue = target->queue_head;
2489        NULL != queue;
2490        queue = queue->next_neighbour)
2491   {
2492     /* try transmission on any queue that is idle */
2493     if (NULL == queue->transmit_task)
2494       queue->transmit_task = GNUNET_SCHEDULER_add_now (&transmit_on_queue,
2495                                                        queue);
2496   }
2497 }
2498
2499
2500 /**
2501  * Communicator started.  Test message is well-formed.
2502  *
2503  * @param cls the client
2504  * @param cam the send message that was sent
2505  */
2506 static int
2507 check_communicator_available (void *cls,
2508                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2509 {
2510   struct TransportClient *tc = cls;
2511   uint16_t size;
2512
2513   if (CT_NONE != tc->type)
2514   {
2515     GNUNET_break (0);
2516     return GNUNET_SYSERR;
2517   }
2518   tc->type = CT_COMMUNICATOR;
2519   size = ntohs (cam->header.size) - sizeof (*cam);
2520   if (0 == size)
2521     return GNUNET_OK; /* receive-only communicator */
2522   GNUNET_MQ_check_zero_termination (cam);
2523   return GNUNET_OK;
2524 }
2525
2526
2527 /**
2528  * Communicator started.  Process the request.
2529  *
2530  * @param cls the client
2531  * @param cam the send message that was sent
2532  */
2533 static void
2534 handle_communicator_available (void *cls,
2535                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
2536 {
2537   struct TransportClient *tc = cls;
2538   uint16_t size;
2539
2540   size = ntohs (cam->header.size) - sizeof (*cam);
2541   if (0 == size)
2542     return; /* receive-only communicator */
2543   tc->details.communicator.address_prefix
2544     = GNUNET_strdup ((const char *) &cam[1]);
2545   tc->details.communicator.cc
2546     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
2547   GNUNET_SERVICE_client_continue (tc->client);
2548 }
2549
2550
2551 /**
2552  * Communicator requests backchannel transmission.  Check the request.
2553  *
2554  * @param cls the client
2555  * @param cb the send message that was sent
2556  * @return #GNUNET_OK if message is well-formed
2557  */
2558 static int
2559 check_communicator_backchannel (void *cls,
2560                                 const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2561 {
2562   const struct GNUNET_MessageHeader *inbox;
2563   const char *is;
2564   uint16_t msize;
2565   uint16_t isize;
2566
2567   msize = ntohs (cb->header.size) - sizeof (*cb);
2568   if (UINT16_MAX - msize >
2569       sizeof (struct TransportBackchannelEncapsulationMessage) +
2570       sizeof (struct TransportBackchannelRequestPayload) )
2571   {
2572     GNUNET_break (0);
2573     return GNUNET_SYSERR;
2574   }
2575   inbox = (const struct GNUNET_MessageHeader *) &cb[1];
2576   isize = ntohs (inbox->size);
2577   if (isize >= msize)
2578   {
2579     GNUNET_break (0);
2580     return GNUNET_SYSERR;
2581   }
2582   is = (const char *) inbox;
2583   is += isize;
2584   msize -= isize;
2585   GNUNET_assert (msize > 0);
2586   if ('\0' != is[msize-1])
2587   {
2588     GNUNET_break (0);
2589     return GNUNET_SYSERR;
2590   }
2591   return GNUNET_OK;
2592 }
2593
2594
2595 /**
2596  * Remove memory used by expired ephemeral keys.
2597  *
2598  * @param cls NULL
2599  */
2600 static void
2601 expire_ephemerals (void *cls)
2602 {
2603   struct EphemeralCacheEntry *ece;
2604
2605   (void) cls;
2606   ephemeral_task = NULL;
2607   while (NULL != (ece = GNUNET_CONTAINER_heap_peek (ephemeral_heap)))
2608   {
2609     if (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us)
2610     {
2611       free_ephemeral (ece);
2612       continue;
2613     }
2614     ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2615                                               &expire_ephemerals,
2616                                               NULL);
2617     return;
2618   }
2619 }
2620
2621
2622 /**
2623  * Lookup ephemeral key in our #ephemeral_map. If no valid one exists, generate
2624  * one, cache it and return it.
2625  *
2626  * @param pid peer to look up ephemeral for
2627  * @param private_key[out] set to the private key
2628  * @param ephemeral_key[out] set to the key
2629  * @param ephemeral_sender_sig[out] set to the signature
2630  * @param ephemeral_validity[out] set to the validity expiration time
2631  */
2632 static void
2633 lookup_ephemeral (const struct GNUNET_PeerIdentity *pid,
2634                   struct GNUNET_CRYPTO_EcdhePrivateKey *private_key,
2635                   struct GNUNET_CRYPTO_EcdhePublicKey *ephemeral_key,
2636                   struct GNUNET_CRYPTO_EddsaSignature *ephemeral_sender_sig,
2637                   struct GNUNET_TIME_Absolute *ephemeral_validity)
2638 {
2639   struct EphemeralCacheEntry *ece;
2640   struct EphemeralConfirmation ec;
2641
2642   ece = GNUNET_CONTAINER_multipeermap_get (ephemeral_map,
2643                                            pid);
2644   if ( (NULL != ece) &&
2645        (0 == GNUNET_TIME_absolute_get_remaining (ece->ephemeral_validity).rel_value_us) )
2646   {
2647     free_ephemeral (ece);
2648     ece = NULL;
2649   }
2650   if (NULL == ece)
2651   {
2652     ece = GNUNET_new (struct EphemeralCacheEntry);
2653     ece->target = *pid;
2654     ece->ephemeral_validity = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get_monotonic (GST_cfg),
2655                                                         EPHEMERAL_VALIDITY);
2656     GNUNET_assert (GNUNET_OK ==
2657                    GNUNET_CRYPTO_ecdhe_key_create2 (&ece->private_key));
2658     GNUNET_CRYPTO_ecdhe_key_get_public (&ece->private_key,
2659                                         &ece->ephemeral_key);
2660     ec.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL);
2661     ec.purpose.size = htonl (sizeof (ec));
2662     ec.target = *pid;
2663     ec.ephemeral_key = ece->ephemeral_key;
2664     GNUNET_assert (GNUNET_OK ==
2665                    GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
2666                                              &ec.purpose,
2667                                              &ece->sender_sig));
2668     ece->hn = GNUNET_CONTAINER_heap_insert (ephemeral_heap,
2669                                             ece,
2670                                             ece->ephemeral_validity.abs_value_us);
2671     GNUNET_assert (GNUNET_OK ==
2672                    GNUNET_CONTAINER_multipeermap_put (ephemeral_map,
2673                                                       &ece->target,
2674                                                       ece,
2675                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2676     if (NULL == ephemeral_task)
2677       ephemeral_task = GNUNET_SCHEDULER_add_at (ece->ephemeral_validity,
2678                                                 &expire_ephemerals,
2679                                                 NULL);
2680   }
2681   *private_key = ece->private_key;
2682   *ephemeral_key = ece->ephemeral_key;
2683   *ephemeral_sender_sig = ece->sender_sig;
2684   *ephemeral_validity = ece->ephemeral_validity;
2685 }
2686
2687
2688 /**
2689  * We need to transmit @a hdr to @a target.  If necessary, this may
2690  * involve DV routing or even broadcasting and fragmentation.
2691  *
2692  * @param target peer to receive @a hdr
2693  * @param hdr header of the message to route
2694  */
2695 static void
2696 route_message (const struct GNUNET_PeerIdentity *target,
2697                struct GNUNET_MessageHeader *hdr)
2698 {
2699   // FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
2700   GNUNET_free (hdr);
2701 }
2702
2703
2704 /**
2705  * Communicator requests backchannel transmission.  Process the request.
2706  *
2707  * @param cls the client
2708  * @param cb the send message that was sent
2709  */
2710 static void
2711 handle_communicator_backchannel (void *cls,
2712                                  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
2713 {
2714   struct TransportClient *tc = cls;
2715   struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
2716   struct GNUNET_TIME_Absolute ephemeral_validity;
2717   struct TransportBackchannelEncapsulationMessage *enc;
2718   struct TransportBackchannelRequestPayload ppay;
2719   char *mpos;
2720   uint16_t msize;
2721
2722   /* encapsulate and encrypt message */
2723   msize = ntohs (cb->header.size) - sizeof (*cb) + sizeof (struct TransportBackchannelRequestPayload);
2724   enc = GNUNET_malloc (sizeof (*enc) + msize);
2725   enc->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION);
2726   enc->header.size = htons (sizeof (*enc) + msize);
2727   enc->target = cb->pid;
2728   lookup_ephemeral (&cb->pid,
2729                     &private_key,
2730                     &enc->ephemeral_key,
2731                     &ppay.sender_sig,
2732                     &ephemeral_validity);
2733   // FIXME: setup 'iv'
2734 #if FIXME
2735   dh_key_derive (&private_key,
2736                  &cb->pid,
2737                  &enc->iv,
2738                  &key);
2739 #endif
2740   ppay.ephemeral_validity = GNUNET_TIME_absolute_hton (ephemeral_validity);
2741   ppay.monotonic_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (GST_cfg));
2742   mpos = (char *) &enc[1];
2743 #if FIXME
2744   encrypt (key,
2745            &ppay,
2746            &mpos,
2747            sizeof (ppay));
2748   encrypt (key,
2749            &cb[1],
2750            &mpos,
2751            ntohs (cb->header.size) - sizeof (*cb));
2752   hmac (key,
2753         &enc->hmac);
2754 #endif
2755   route_message (&cb->pid,
2756                  &enc->header);
2757   GNUNET_SERVICE_client_continue (tc->client);
2758 }
2759
2760
2761 /**
2762  * Address of our peer added.  Test message is well-formed.
2763  *
2764  * @param cls the client
2765  * @param aam the send message that was sent
2766  * @return #GNUNET_OK if message is well-formed
2767  */
2768 static int
2769 check_add_address (void *cls,
2770                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2771 {
2772   struct TransportClient *tc = cls;
2773
2774   if (CT_COMMUNICATOR != tc->type)
2775   {
2776     GNUNET_break (0);
2777     return GNUNET_SYSERR;
2778   }
2779   GNUNET_MQ_check_zero_termination (aam);
2780   return GNUNET_OK;
2781 }
2782
2783
2784 /**
2785  * Ask peerstore to store our address.
2786  *
2787  * @param cls an `struct AddressListEntry *`
2788  */
2789 static void
2790 store_pi (void *cls);
2791
2792
2793 /**
2794  * Function called when peerstore is done storing our address.
2795  */
2796 static void
2797 peerstore_store_cb (void *cls,
2798                     int success)
2799 {
2800   struct AddressListEntry *ale = cls;
2801
2802   ale->sc = NULL;
2803   if (GNUNET_YES != success)
2804     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2805                 "Failed to store our own address `%s' in peerstore!\n",
2806                 ale->address);
2807   /* refresh period is 1/4 of expiration time, that should be plenty
2808      without being excessive. */
2809   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
2810                                                                        4ULL),
2811                                           &store_pi,
2812                                           ale);
2813 }
2814
2815
2816 /**
2817  * Ask peerstore to store our address.
2818  *
2819  * @param cls an `struct AddressListEntry *`
2820  */
2821 static void
2822 store_pi (void *cls)
2823 {
2824   struct AddressListEntry *ale = cls;
2825   void *addr;
2826   size_t addr_len;
2827   struct GNUNET_TIME_Absolute expiration;
2828
2829   ale->st = NULL;
2830   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
2831   GNUNET_HELLO_sign_address (ale->address,
2832                              ale->nt,
2833                              expiration,
2834                              GST_my_private_key,
2835                              &addr,
2836                              &addr_len);
2837   ale->sc = GNUNET_PEERSTORE_store (peerstore,
2838                                     "transport",
2839                                     &GST_my_identity,
2840                                     GNUNET_HELLO_PEERSTORE_KEY,
2841                                     addr,
2842                                     addr_len,
2843                                     expiration,
2844                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
2845                                     &peerstore_store_cb,
2846                                     ale);
2847   GNUNET_free (addr);
2848   if (NULL == ale->sc)
2849   {
2850     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2851                 "Failed to store our address `%s' with peerstore\n",
2852                 ale->address);
2853     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
2854                                             &store_pi,
2855                                             ale);
2856   }
2857 }
2858
2859
2860 /**
2861  * Address of our peer added.  Process the request.
2862  *
2863  * @param cls the client
2864  * @param aam the send message that was sent
2865  */
2866 static void
2867 handle_add_address (void *cls,
2868                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
2869 {
2870   struct TransportClient *tc = cls;
2871   struct AddressListEntry *ale;
2872   size_t slen;
2873
2874   slen = ntohs (aam->header.size) - sizeof (*aam);
2875   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
2876   ale->tc = tc;
2877   ale->address = (const char *) &ale[1];
2878   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
2879   ale->aid = aam->aid;
2880   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
2881   memcpy (&ale[1],
2882           &aam[1],
2883           slen);
2884   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
2885                                tc->details.communicator.addr_tail,
2886                                ale);
2887   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
2888                                       ale);
2889   GNUNET_SERVICE_client_continue (tc->client);
2890 }
2891
2892
2893 /**
2894  * Address of our peer deleted.  Process the request.
2895  *
2896  * @param cls the client
2897  * @param dam the send message that was sent
2898  */
2899 static void
2900 handle_del_address (void *cls,
2901                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
2902 {
2903   struct TransportClient *tc = cls;
2904
2905   if (CT_COMMUNICATOR != tc->type)
2906   {
2907     GNUNET_break (0);
2908     GNUNET_SERVICE_client_drop (tc->client);
2909     return;
2910   }
2911   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
2912        NULL != ale;
2913        ale = ale->next)
2914   {
2915     if (dam->aid != ale->aid)
2916       continue;
2917     GNUNET_assert (ale->tc == tc);
2918     free_address_list_entry (ale);
2919     GNUNET_SERVICE_client_continue (tc->client);
2920   }
2921   GNUNET_break (0);
2922   GNUNET_SERVICE_client_drop (tc->client);
2923 }
2924
2925
2926 /**
2927  * Context from #handle_incoming_msg().  Closure for many
2928  * message handlers below.
2929  */
2930 struct CommunicatorMessageContext
2931 {
2932   /**
2933    * Which communicator provided us with the message.
2934    */
2935   struct TransportClient *tc;
2936
2937   /**
2938    * Additional information for flow control and about the sender.
2939    */
2940   struct GNUNET_TRANSPORT_IncomingMessage im;
2941
2942   /**
2943    * Number of hops the message has travelled (if DV-routed).
2944    * FIXME: make use of this in ACK handling!
2945    */
2946   uint16_t total_hops;
2947 };
2948
2949
2950 /**
2951  * Given an inbound message @a msg from a communicator @a cmc,
2952  * demultiplex it based on the type calling the right handler.
2953  *
2954  * @param cmc context for demultiplexing
2955  * @param msg message to demultiplex
2956  */
2957 static void
2958 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
2959                       const struct GNUNET_MessageHeader *msg);
2960
2961
2962 /**
2963  * Send ACK to communicator (if requested) and free @a cmc.
2964  *
2965  * @param cmc context for which we are done handling the message
2966  */
2967 static void
2968 finish_cmc_handling (struct CommunicatorMessageContext *cmc)
2969 {
2970   if (0 != ntohl (cmc->im.fc_on))
2971   {
2972     /* send ACK when done to communicator for flow control! */
2973     struct GNUNET_MQ_Envelope *env;
2974     struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
2975
2976     env = GNUNET_MQ_msg (ack,
2977                          GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
2978     ack->reserved = htonl (0);
2979     ack->fc_id = cmc->im.fc_id;
2980     ack->sender = cmc->im.sender;
2981     GNUNET_MQ_send (cmc->tc->mq,
2982                     env);
2983   }
2984   GNUNET_SERVICE_client_continue (cmc->tc->client);
2985   GNUNET_free (cmc);
2986 }
2987
2988
2989 /**
2990  * Communicator gave us an unencapsulated message to pass as-is to
2991  * CORE.  Process the request.
2992  *
2993  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
2994  * @param mh the message that was received
2995  */
2996 static void
2997 handle_raw_message (void *cls,
2998                     const struct GNUNET_MessageHeader *mh)
2999 {
3000   struct CommunicatorMessageContext *cmc = cls;
3001   uint16_t size = ntohs (mh->size);
3002
3003   if ( (size > UINT16_MAX - sizeof (struct InboundMessage)) ||
3004        (size < sizeof (struct GNUNET_MessageHeader)) )
3005   {
3006     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3007
3008     GNUNET_break (0);
3009     finish_cmc_handling (cmc);
3010     GNUNET_SERVICE_client_drop (client);
3011     return;
3012   }
3013   /* Forward to all CORE clients */
3014   for (struct TransportClient *tc = clients_head;
3015        NULL != tc;
3016        tc = tc->next)
3017   {
3018     struct GNUNET_MQ_Envelope *env;
3019     struct InboundMessage *im;
3020
3021     if (CT_CORE != tc->type)
3022       continue;
3023     env = GNUNET_MQ_msg_extra (im,
3024                                size,
3025                                GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
3026     im->peer = cmc->im.sender;
3027     memcpy (&im[1],
3028             mh,
3029             size);
3030     GNUNET_MQ_send (tc->mq,
3031                     env);
3032   }
3033   /* FIXME: consider doing this _only_ once the message
3034      was drained from the CORE MQs to extend flow control to CORE!
3035      (basically, increment counter in cmc, decrement on MQ send continuation! */
3036   finish_cmc_handling (cmc);
3037 }
3038
3039
3040 /**
3041  * Communicator gave us a fragment box.  Check the message.
3042  *
3043  * @param cls a `struct CommunicatorMessageContext`
3044  * @param fb the send message that was sent
3045  * @return #GNUNET_YES if message is well-formed
3046  */
3047 static int
3048 check_fragment_box (void *cls,
3049                     const struct TransportFragmentBox *fb)
3050 {
3051   uint16_t size = ntohs (fb->header.size);
3052   uint16_t bsize = size - sizeof (*fb);
3053
3054   if (0 == bsize)
3055   {
3056     GNUNET_break_op (0);
3057     return GNUNET_SYSERR;
3058   }
3059   if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
3060   {
3061     GNUNET_break_op (0);
3062     return GNUNET_SYSERR;
3063   }
3064   if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
3065   {
3066     GNUNET_break_op (0);
3067     return GNUNET_SYSERR;
3068   }
3069   return GNUNET_YES;
3070 }
3071
3072
3073 /**
3074  * Generate a fragment acknowledgement for an @a rc.
3075  *
3076  * @param rc context to generate ACK for, @a rc ACK state is reset
3077  */
3078 static void
3079 send_fragment_ack (struct ReassemblyContext *rc)
3080 {
3081   struct TransportFragmentAckMessage *ack;
3082
3083   ack = GNUNET_new (struct TransportFragmentAckMessage);
3084   ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
3085   ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
3086   ack->frag_uuid = htonl (rc->frag_uuid);
3087   ack->extra_acks = GNUNET_htonll (rc->extra_acks);
3088   ack->msg_uuid = rc->msg_uuid;
3089   ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
3090   if (0 == rc->msg_missing)
3091     ack->reassembly_timeout
3092       = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
3093   else
3094     ack->reassembly_timeout
3095       = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
3096   route_message (&rc->neighbour->pid,
3097                  &ack->header);
3098   rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
3099   rc->num_acks = 0;
3100   rc->extra_acks = 0LLU;
3101 }
3102
3103
3104 /**
3105  * Communicator gave us a fragment.  Process the request.
3106  *
3107  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3108  * @param fb the message that was received
3109  */
3110 static void
3111 handle_fragment_box (void *cls,
3112                      const struct TransportFragmentBox *fb)
3113 {
3114   struct CommunicatorMessageContext *cmc = cls;
3115   struct Neighbour *n;
3116   struct ReassemblyContext *rc;
3117   const struct GNUNET_MessageHeader *msg;
3118   uint16_t msize;
3119   uint16_t fsize;
3120   uint16_t frag_off;
3121   uint32_t frag_uuid;
3122   char *target;
3123   struct GNUNET_TIME_Relative cdelay;
3124   int ack_now;
3125
3126   n = GNUNET_CONTAINER_multipeermap_get (neighbours,
3127                                          &cmc->im.sender);
3128   if (NULL == n)
3129   {
3130     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
3131
3132     GNUNET_break (0);
3133     finish_cmc_handling (cmc);
3134     GNUNET_SERVICE_client_drop (client);
3135     return;
3136   }
3137   if (NULL == n->reassembly_map)
3138   {
3139     n->reassembly_map = GNUNET_CONTAINER_multishortmap_create (8,
3140                                                                GNUNET_YES);
3141     n->reassembly_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3142     n->reassembly_timeout_task = GNUNET_SCHEDULER_add_delayed (REASSEMBLY_EXPIRATION,
3143                                                                &reassembly_cleanup_task,
3144                                                                n);
3145   }
3146   msize = ntohs (fb->msg_size);
3147   rc = GNUNET_CONTAINER_multishortmap_get (n->reassembly_map,
3148                                            &fb->msg_uuid);
3149   if (NULL == rc)
3150   {
3151     rc = GNUNET_malloc (sizeof (*rc) +
3152                         msize + /* reassembly payload buffer */
3153                         (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
3154     rc->msg_uuid = fb->msg_uuid;
3155     rc->neighbour = n;
3156     rc->msg_size = msize;
3157     rc->reassembly_timeout = GNUNET_TIME_relative_to_absolute (REASSEMBLY_EXPIRATION);
3158     rc->last_frag = GNUNET_TIME_absolute_get ();
3159     rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
3160                                            rc,
3161                                            rc->reassembly_timeout.abs_value_us);
3162     GNUNET_assert (GNUNET_OK ==
3163                    GNUNET_CONTAINER_multishortmap_put (n->reassembly_map,
3164                                                        &rc->msg_uuid,
3165                                                        rc,
3166                                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3167     target = (char *) &rc[1];
3168     rc->bitfield = (uint8_t *) (target + rc->msg_size);
3169     rc->msg_missing = rc->msg_size;
3170   }
3171   else
3172   {
3173     target = (char *) &rc[1];
3174   }
3175   if (msize != rc->msg_size)
3176   {
3177     GNUNET_break (0);
3178     finish_cmc_handling (cmc);
3179     return;
3180   }
3181
3182   /* reassemble */
3183   fsize = ntohs (fb->header.size) - sizeof (*fb);
3184   frag_off = ntohs (fb->frag_off);
3185   memcpy (&target[frag_off],
3186           &fb[1],
3187           fsize);
3188   /* update bitfield and msg_missing */
3189   for (unsigned int i=frag_off;i<frag_off+fsize;i++)
3190   {
3191     if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
3192     {
3193       rc->bitfield[i / 8] |= (1 << (i % 8));
3194       rc->msg_missing--;
3195     }
3196   }
3197
3198   /* Compute cummulative ACK */
3199   frag_uuid = ntohl (fb->frag_uuid);
3200   cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
3201   cdelay = GNUNET_TIME_relative_multiply (cdelay,
3202                                           rc->num_acks);
3203   rc->last_frag = GNUNET_TIME_absolute_get ();
3204   rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
3205                                                 cdelay);
3206   ack_now = GNUNET_NO;
3207   if (0 == rc->num_acks)
3208   {
3209     /* case one: first ack */
3210     rc->frag_uuid = frag_uuid;
3211     rc->extra_acks = 0LLU;
3212     rc->num_acks = 1;
3213   }
3214   else if ( (frag_uuid >= rc->frag_uuid) &&
3215             (frag_uuid <= rc->frag_uuid + 64) )
3216   {
3217     /* case two: ack fits after existing min UUID */
3218     if ( (frag_uuid == rc->frag_uuid) ||
3219          (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
3220     {
3221       /* duplicate fragment, ack now! */
3222       ack_now = GNUNET_YES;
3223     }
3224     else
3225     {
3226       rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
3227       rc->num_acks++;
3228     }
3229   }
3230   else if ( (rc->frag_uuid > frag_uuid) &&
3231             ( ( (rc->frag_uuid == frag_uuid + 64) &&
3232                 (0 == rc->extra_acks) ) ||
3233               ( (rc->frag_uuid < frag_uuid + 64) &&
3234                 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
3235   {
3236     /* can fit ack by shifting extra acks and starting at
3237        frag_uid, test above esured that the bits we will
3238        shift 'extra_acks' by are all zero. */
3239     rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
3240     rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
3241     rc->frag_uuid = frag_uuid;
3242     rc->num_acks++;
3243   }
3244   if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
3245     ack_now = GNUNET_YES; /* maximum acks received */
3246   // FIXME: possibly also ACK based on RTT (but for that we'd need to
3247   // determine the queue used for the ACK first!)
3248
3249   /* is reassembly complete? */
3250   if (0 != rc->msg_missing)
3251   {
3252     if (ack_now)
3253       send_fragment_ack (rc);
3254     finish_cmc_handling (cmc);
3255     return;
3256   }
3257   /* reassembly is complete, verify result */
3258   msg = (const struct GNUNET_MessageHeader *) &rc[1];
3259   if (ntohs (msg->size) != rc->msg_size)
3260   {
3261     GNUNET_break (0);
3262     free_reassembly_context (rc);
3263     finish_cmc_handling (cmc);
3264     return;
3265   }
3266   /* successful reassembly */
3267   send_fragment_ack (rc);
3268   demultiplex_with_cmc (cmc,
3269                         msg);
3270   /* FIXME: really free here? Might be bad if fragments are still
3271      en-route and we forget that we finished this reassembly immediately!
3272      -> keep around until timeout?
3273      -> shorten timeout based on ACK? */
3274   free_reassembly_context (rc);
3275 }
3276
3277
3278 /**
3279  * Communicator gave us a fragment acknowledgement.  Process the request.
3280  *
3281  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3282  * @param fa the message that was received
3283  */
3284 static void
3285 handle_fragment_ack (void *cls,
3286                      const struct TransportFragmentAckMessage *fa)
3287 {
3288   struct CommunicatorMessageContext *cmc = cls;
3289
3290   // FIXME: do work: identify original message; then identify fragments being acked;
3291   // remove those from the tree to prevent retransmission;
3292   // compute RTT
3293   // if entire message is ACKed, handle that as well.
3294   finish_cmc_handling (cmc);
3295 }
3296
3297
3298 /**
3299  * Communicator gave us a reliability box.  Check the message.
3300  *
3301  * @param cls a `struct CommunicatorMessageContext`
3302  * @param rb the send message that was sent
3303  * @return #GNUNET_YES if message is well-formed
3304  */
3305 static int
3306 check_reliability_box (void *cls,
3307                        const struct TransportReliabilityBox *rb)
3308 {
3309   GNUNET_MQ_check_boxed_message (rb);
3310   return GNUNET_YES;
3311 }
3312
3313
3314 /**
3315  * Communicator gave us a reliability box.  Process the request.
3316  *
3317  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3318  * @param rb the message that was received
3319  */
3320 static void
3321 handle_reliability_box (void *cls,
3322                         const struct TransportReliabilityBox *rb)
3323 {
3324   struct CommunicatorMessageContext *cmc = cls;
3325   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &rb[1];
3326
3327   if (0 == ntohl (rb->ack_countdown))
3328   {
3329     struct TransportReliabilityAckMessage *ack;
3330
3331     /* FIXME: implement cummulative ACKs and ack_countdown,
3332        then setting the avg_ack_delay field below: */
3333     ack = GNUNET_malloc (sizeof (*ack) +
3334                          sizeof (struct GNUNET_ShortHashCode));
3335     ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
3336     ack->header.size = htons (sizeof (*ack) +
3337                               sizeof (struct GNUNET_ShortHashCode));
3338     memcpy (&ack[1],
3339             &rb->msg_uuid,
3340             sizeof (struct GNUNET_ShortHashCode));
3341     route_message (&cmc->im.sender,
3342                    &ack->header);
3343   }
3344   /* continue with inner message */
3345   demultiplex_with_cmc (cmc,
3346                         inbox);
3347 }
3348
3349
3350 /**
3351  * Communicator gave us a reliability ack.  Process the request.
3352  *
3353  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3354  * @param ra the message that was received
3355  */
3356 static void
3357 handle_reliability_ack (void *cls,
3358                         const struct TransportReliabilityAckMessage *ra)
3359 {
3360   struct CommunicatorMessageContext *cmc = cls;
3361
3362   // FIXME: do work: find message that was acknowledged, and
3363   // remove from transmission queue; update RTT.
3364   finish_cmc_handling (cmc);
3365 }
3366
3367
3368 /**
3369  * Communicator gave us a backchannel encapsulation.  Check the message.
3370  *
3371  * @param cls a `struct CommunicatorMessageContext`
3372  * @param be the send message that was sent
3373  * @return #GNUNET_YES if message is well-formed
3374  */
3375 static int
3376 check_backchannel_encapsulation (void *cls,
3377                                  const struct TransportBackchannelEncapsulationMessage *be)
3378 {
3379   uint16_t size = ntohs (be->header.size);
3380
3381   if (size - sizeof (*be) < sizeof (struct GNUNET_MessageHeader))
3382   {
3383     GNUNET_break_op (0);
3384     return GNUNET_SYSERR;
3385   }
3386   return GNUNET_YES;
3387 }
3388
3389
3390 /**
3391  * Communicator gave us a backchannel encapsulation.  Process the request.
3392  *
3393  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3394  * @param be the message that was received
3395  */
3396 static void
3397 handle_backchannel_encapsulation (void *cls,
3398                                   const struct TransportBackchannelEncapsulationMessage *be)
3399 {
3400   struct CommunicatorMessageContext *cmc = cls;
3401
3402   if (0 != memcmp (&be->target,
3403                    &GST_my_identity,
3404                    sizeof (struct GNUNET_PeerIdentity)))
3405   {
3406     /* not for me, try to route to target */
3407     route_message (&be->target,
3408                    GNUNET_copy_message (&be->header));
3409     finish_cmc_handling (cmc);
3410     return;
3411   }
3412   // FIXME: compute shared secret
3413   // FIXME: check HMAC
3414   // FIXME: decrypt payload
3415   // FIXME: forward to specified communicator!
3416   // (using GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING)
3417   finish_cmc_handling (cmc);
3418 }
3419
3420
3421 /**
3422  * Communicator gave us a DV learn message.  Check the message.
3423  *
3424  * @param cls a `struct CommunicatorMessageContext`
3425  * @param dvl the send message that was sent
3426  * @return #GNUNET_YES if message is well-formed
3427  */
3428 static int
3429 check_dv_learn (void *cls,
3430                 const struct TransportDVLearn *dvl)
3431 {
3432   uint16_t size = ntohs (dvl->header.size);
3433   uint16_t num_hops = ntohs (dvl->num_hops);
3434   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvl[1];
3435
3436   if (size != sizeof (*dvl) + num_hops * sizeof (struct GNUNET_PeerIdentity))
3437   {
3438     GNUNET_break_op (0);
3439     return GNUNET_SYSERR;
3440   }
3441   for (unsigned int i=0;i<num_hops;i++)
3442   {
3443     if (0 == memcmp (&dvl->initiator,
3444                      &hops[i],
3445                      sizeof (struct GNUNET_PeerIdentity)))
3446     {
3447       GNUNET_break_op (0);
3448       return GNUNET_SYSERR;
3449     }
3450     if (0 == memcmp (&GST_my_identity,
3451                      &hops[i],
3452                      sizeof (struct GNUNET_PeerIdentity)))
3453     {
3454       GNUNET_break_op (0);
3455       return GNUNET_SYSERR;
3456     }
3457   }
3458   return GNUNET_YES;
3459 }
3460
3461
3462 /**
3463  * Communicator gave us a DV learn message.  Process the request.
3464  *
3465  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3466  * @param dvl the message that was received
3467  */
3468 static void
3469 handle_dv_learn (void *cls,
3470                  const struct TransportDVLearn *dvl)
3471 {
3472   struct CommunicatorMessageContext *cmc = cls;
3473
3474   // FIXME: learn path from DV message (if bi-directional flags are set)
3475   // FIXME: expand DV message, forward on (unless path is getting too long)
3476   finish_cmc_handling (cmc);
3477 }
3478
3479
3480 /**
3481  * Communicator gave us a DV box.  Check the message.
3482  *
3483  * @param cls a `struct CommunicatorMessageContext`
3484  * @param dvb the send message that was sent
3485  * @return #GNUNET_YES if message is well-formed
3486  */
3487 static int
3488 check_dv_box (void *cls,
3489               const struct TransportDVBox *dvb)
3490 {
3491   uint16_t size = ntohs (dvb->header.size);
3492   uint16_t num_hops = ntohs (dvb->num_hops);
3493   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3494   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3495   uint16_t isize;
3496   uint16_t itype;
3497
3498   if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + sizeof (struct GNUNET_MessageHeader))
3499   {
3500     GNUNET_break_op (0);
3501     return GNUNET_SYSERR;
3502   }
3503   isize = ntohs (inbox->size);
3504   if (size != sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) + isize)
3505   {
3506     GNUNET_break_op (0);
3507     return GNUNET_SYSERR;
3508   }
3509   itype = ntohs (inbox->type);
3510   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX == itype) ||
3511        (GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN == itype) )
3512   {
3513     GNUNET_break_op (0);
3514     return GNUNET_SYSERR;
3515   }
3516   return GNUNET_YES;
3517 }
3518
3519
3520 /**
3521  * Communicator gave us a DV box.  Process the request.
3522  *
3523  * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
3524  * @param dvb the message that was received
3525  */
3526 static void
3527 handle_dv_box (void *cls,
3528                const struct TransportDVBox *dvb)
3529 {
3530   struct CommunicatorMessageContext *cmc = cls;
3531   uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
3532   uint16_t num_hops = ntohs (dvb->num_hops);
3533   const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3534   const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];
3535
3536   if (num_hops > 0)
3537   {
3538     // FIXME: if we are not the target, shorten path and forward along.
3539     // Try from the _end_ of hops array if we know the given
3540     // neighbour (shortening the path!).
3541     // NOTE: increment total_hops!
3542     finish_cmc_handling (cmc);
3543     return;
3544   }
3545   /* We are the target. Unbox and handle message. */
3546   cmc->im.sender = dvb->origin;
3547   cmc->total_hops = ntohs (dvb->total_hops);
3548   demultiplex_with_cmc (cmc,
3549                         inbox);
3550 }
3551
3552
3553 /**
3554  * Client notified us about transmission from a peer.  Process the request.
3555  *
3556  * @param cls a `struct TransportClient` which sent us the message
3557  * @param obm the send message that was sent
3558  * @return #GNUNET_YES if message is well-formed
3559  */
3560 static int
3561 check_incoming_msg (void *cls,
3562                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
3563 {
3564   struct TransportClient *tc = cls;
3565
3566   if (CT_COMMUNICATOR != tc->type)
3567   {
3568     GNUNET_break (0);
3569     return GNUNET_SYSERR;
3570   }
3571   GNUNET_MQ_check_boxed_message (im);
3572   return GNUNET_OK;
3573 }
3574
3575
3576 /**
3577  * Incoming meessage.  Process the request.
3578  *
3579  * @param im the send message that was received
3580  */
3581 static void
3582 handle_incoming_msg (void *cls,
3583                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
3584 {
3585   struct TransportClient *tc = cls;
3586   struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
3587
3588   cmc->tc = tc;
3589   cmc->im = *im;
3590   demultiplex_with_cmc (cmc,
3591                         (const struct GNUNET_MessageHeader *) &im[1]);
3592 }
3593
3594
3595 /**
3596  * Given an inbound message @a msg from a communicator @a cmc,
3597  * demultiplex it based on the type calling the right handler.
3598  *
3599  * @param cmc context for demultiplexing
3600  * @param msg message to demultiplex
3601  */
3602 static void
3603 demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
3604                       const struct GNUNET_MessageHeader *msg)
3605 {
3606   struct GNUNET_MQ_MessageHandler handlers[] = {
3607     GNUNET_MQ_hd_var_size (fragment_box,
3608                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
3609                            struct TransportFragmentBox,
3610                            &cmc),
3611     GNUNET_MQ_hd_fixed_size (fragment_ack,
3612                              GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
3613                              struct TransportFragmentAckMessage,
3614                              &cmc),
3615     GNUNET_MQ_hd_var_size (reliability_box,
3616                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
3617                            struct TransportReliabilityBox,
3618                            &cmc),
3619     GNUNET_MQ_hd_fixed_size (reliability_ack,
3620                              GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
3621                              struct TransportReliabilityAckMessage,
3622                              &cmc),
3623     GNUNET_MQ_hd_var_size (backchannel_encapsulation,
3624                            GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
3625                            struct TransportBackchannelEncapsulationMessage,
3626                            &cmc),
3627     GNUNET_MQ_hd_var_size (dv_learn,
3628                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
3629                            struct TransportDVLearn,
3630                            &cmc),
3631     GNUNET_MQ_hd_var_size (dv_box,
3632                            GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
3633                            struct TransportDVBox,
3634                            &cmc),
3635     GNUNET_MQ_handler_end()
3636   };
3637   int ret;
3638
3639   ret = GNUNET_MQ_handle_message (handlers,
3640                                   msg);
3641   if (GNUNET_SYSERR == ret)
3642   {
3643     GNUNET_break (0);
3644     GNUNET_SERVICE_client_drop (cmc->tc->client);
3645     GNUNET_free (cmc);
3646     return;
3647   }
3648   if (GNUNET_NO == ret)
3649   {
3650     /* unencapsulated 'raw' message */
3651     handle_raw_message (&cmc,
3652                         msg);
3653   }
3654 }
3655
3656
3657 /**
3658  * New queue became available.  Check message.
3659  *
3660  * @param cls the client
3661  * @param aqm the send message that was sent
3662  */
3663 static int
3664 check_add_queue_message (void *cls,
3665                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
3666 {
3667   struct TransportClient *tc = cls;
3668
3669   if (CT_COMMUNICATOR != tc->type)
3670   {
3671     GNUNET_break (0);
3672     return GNUNET_SYSERR;
3673   }
3674   GNUNET_MQ_check_zero_termination (aqm);
3675   return GNUNET_OK;
3676 }
3677
3678
3679 /**
3680  * Bandwidth tracker informs us that the delay until we should receive
3681  * more has changed.
3682  *
3683  * @param cls a `struct Queue` for which the delay changed
3684  */
3685 static void
3686 tracker_update_in_cb (void *cls)
3687 {
3688   struct Queue *queue = cls;
3689   struct GNUNET_TIME_Relative in_delay;
3690   unsigned int rsize;
3691
3692   rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu;
3693   in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in,
3694                                                  rsize);
3695   // FIXME: how exactly do we do inbound flow control?
3696 }
3697
3698
3699 /**
3700  * If necessary, generates the UUID for a @a pm
3701  *
3702  * @param pm pending message to generate UUID for.
3703  */
3704 static void
3705 set_pending_message_uuid (struct PendingMessage *pm)
3706 {
3707   if (pm->msg_uuid_set)
3708     return;
3709   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
3710                               &pm->msg_uuid,
3711                               sizeof (pm->msg_uuid));
3712   pm->msg_uuid_set = GNUNET_YES;
3713 }
3714
3715
3716 /**
3717  * Fragment the given @a pm to the given @a mtu.  Adds
3718  * additional fragments to the neighbour as well. If the
3719  * @a mtu is too small, generates and error for the @a pm
3720  * and returns NULL.
3721  *
3722  * @param pm pending message to fragment for transmission
3723  * @param mtu MTU to apply
3724  * @return new message to transmit
3725  */
3726 static struct PendingMessage *
3727 fragment_message (struct PendingMessage *pm,
3728                   uint16_t mtu)
3729 {
3730   struct PendingMessage *ff;
3731
3732   set_pending_message_uuid (pm);
3733
3734   /* This invariant is established in #handle_add_queue_message() */
3735   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
3736
3737   /* select fragment for transmission, descending the tree if it has
3738      been expanded until we are at a leaf or at a fragment that is small enough */
3739   ff = pm;
3740   while ( ( (ff->bytes_msg > mtu) ||
3741             (pm == ff) ) &&
3742           (ff->frag_off == ff->bytes_msg) &&
3743           (NULL != ff->head_frag) )
3744   {
3745     ff = ff->head_frag; /* descent into fragmented fragments */
3746   }
3747
3748   if ( ( (ff->bytes_msg > mtu) ||
3749          (pm == ff) ) &&
3750        (pm->frag_off < pm->bytes_msg) )
3751   {
3752     /* Did not yet calculate all fragments, calculate next fragment */
3753     struct PendingMessage *frag;
3754     struct TransportFragmentBox tfb;
3755     const char *orig;
3756     char *msg;
3757     uint16_t fragmax;
3758     uint16_t fragsize;
3759     uint16_t msize;
3760     uint16_t xoff = 0;
3761
3762     orig = (const char *) &ff[1];
3763     msize = ff->bytes_msg;
3764     if (pm != ff)
3765     {
3766       const struct TransportFragmentBox *tfbo;
3767
3768       tfbo = (const struct TransportFragmentBox *) orig;
3769       orig += sizeof (struct TransportFragmentBox);
3770       msize -= sizeof (struct TransportFragmentBox);
3771       xoff = ntohs (tfbo->frag_off);
3772     }
3773     fragmax = mtu - sizeof (struct TransportFragmentBox);
3774     fragsize = GNUNET_MIN (msize - ff->frag_off,
3775                            fragmax);
3776     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
3777                           sizeof (struct TransportFragmentBox) +
3778                           fragsize);
3779     frag->target = pm->target;
3780     frag->frag_parent = ff;
3781     frag->timeout = pm->timeout;
3782     frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
3783     frag->pmt = PMT_FRAGMENT_BOX;
3784     msg = (char *) &frag[1];
3785     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
3786     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
3787                              fragsize);
3788     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
3789     tfb.msg_uuid = pm->msg_uuid;
3790     tfb.frag_off = htons (ff->frag_off + xoff);
3791     tfb.msg_size = htons (pm->bytes_msg);
3792     memcpy (msg,
3793             &tfb,
3794             sizeof (tfb));
3795     memcpy (&msg[sizeof (tfb)],
3796             &orig[ff->frag_off],
3797             fragsize);
3798     GNUNET_CONTAINER_MDLL_insert (frag,
3799                                   ff->head_frag,
3800                                   ff->tail_frag,
3801                                   frag);
3802     ff->frag_off += fragsize;
3803     ff = frag;
3804   }
3805
3806   /* Move head to the tail and return it */
3807   GNUNET_CONTAINER_MDLL_remove (frag,
3808                                 ff->frag_parent->head_frag,
3809                                 ff->frag_parent->tail_frag,
3810                                 ff);
3811   GNUNET_CONTAINER_MDLL_insert_tail (frag,
3812                                      ff->frag_parent->head_frag,
3813                                      ff->frag_parent->tail_frag,
3814                                      ff);
3815   return ff;
3816 }
3817
3818
3819 /**
3820  * Reliability-box the given @a pm. On error (can there be any), NULL
3821  * may be returned, otherwise the "replacement" for @a pm (which
3822  * should then be added to the respective neighbour's queue instead of
3823  * @a pm).  If the @a pm is already fragmented or reliability boxed,
3824  * or itself an ACK, this function simply returns @a pm.
3825  *
3826  * @param pm pending message to box for transmission over unreliabile queue
3827  * @return new message to transmit
3828  */
3829 static struct PendingMessage *
3830 reliability_box_message (struct PendingMessage *pm)
3831 {
3832   struct TransportReliabilityBox rbox;
3833   struct PendingMessage *bpm;
3834   char *msg;
3835
3836   if (PMT_CORE != pm->pmt)
3837     return pm;  /* already fragmented or reliability boxed, or control message: do nothing */
3838   if (NULL != pm->bpm)
3839     return pm->bpm; /* already computed earlier: do nothing */
3840   GNUNET_assert (NULL == pm->head_frag);
3841   if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
3842   {
3843     /* failed hard */
3844     GNUNET_break (0);
3845     client_send_response (pm,
3846                           GNUNET_NO,
3847                           0);
3848     return NULL;
3849   }
3850   bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
3851                        sizeof (rbox) +
3852                        pm->bytes_msg);
3853   bpm->target = pm->target;
3854   bpm->frag_parent = pm;
3855   GNUNET_CONTAINER_MDLL_insert (frag,
3856                                 pm->head_frag,
3857                                 pm->tail_frag,
3858                                 bpm);
3859   bpm->timeout = pm->timeout;
3860   bpm->pmt = PMT_RELIABILITY_BOX;
3861   bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
3862   set_pending_message_uuid (bpm);
3863   rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
3864   rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
3865   rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
3866   rbox.msg_uuid = pm->msg_uuid;
3867   msg = (char *) &bpm[1];
3868   memcpy (msg,
3869           &rbox,
3870           sizeof (rbox));
3871   memcpy (&msg[sizeof (rbox)],
3872           &pm[1],
3873           pm->bytes_msg);
3874   pm->bpm = bpm;
3875   return bpm;
3876 }
3877
3878
3879 /**
3880  * We believe we are ready to transmit a message on a queue. Double-checks
3881  * with the queue's "tracker_out" and then gives the message to the
3882  * communicator for transmission (updating the tracker, and re-scheduling
3883  * itself if applicable).
3884  *
3885  * @param cls the `struct Queue` to process transmissions for
3886  */
3887 static void
3888 transmit_on_queue (void *cls)
3889 {
3890   struct Queue *queue = cls;
3891   struct Neighbour *n = queue->neighbour;
3892   struct QueueEntry *qe;
3893   struct PendingMessage *pm;
3894   struct PendingMessage *s;
3895   uint32_t overhead;
3896   struct GNUNET_TRANSPORT_SendMessageTo *smt;
3897   struct GNUNET_MQ_Envelope *env;
3898
3899   queue->transmit_task = NULL;
3900   if (NULL == (pm = n->pending_msg_head))
3901   {
3902     /* no message pending, nothing to do here! */
3903     return;
3904   }
3905   schedule_transmit_on_queue (queue);
3906   if (NULL != queue->transmit_task)
3907     return; /* do it later */
3908   overhead = 0;
3909   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3910     overhead += sizeof (struct TransportReliabilityBox);
3911   s = pm;
3912   if ( ( (0 != queue->mtu) &&
3913          (pm->bytes_msg + overhead > queue->mtu) ) ||
3914        (pm->bytes_msg > UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)) ||
3915        (NULL != pm->head_frag /* fragments already exist, should
3916                                  respect that even if MTU is 0 for
3917                                  this queue */) )
3918     s = fragment_message (s,
3919                           (0 == queue->mtu)
3920                           ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
3921                           : queue->mtu);
3922   if (NULL == s)
3923   {
3924     /* Fragmentation failed, try next message... */
3925     schedule_transmit_on_queue (queue);
3926     return;
3927   }
3928   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
3929     s = reliability_box_message (s);
3930   if (NULL == s)
3931   {
3932     /* Reliability boxing failed, try next message... */
3933     schedule_transmit_on_queue (queue);
3934     return;
3935   }
3936
3937   /* Pass 's' for transission to the communicator */
3938   qe = GNUNET_new (struct QueueEntry);
3939   qe->mid = queue->mid_gen++;
3940   qe->queue = queue;
3941   // qe->pm = s; // FIXME: not so easy, reference management on 'free(s)'!
3942   GNUNET_CONTAINER_DLL_insert (queue->queue_head,
3943                                queue->queue_tail,
3944                                qe);
3945   env = GNUNET_MQ_msg_extra (smt,
3946                              s->bytes_msg,
3947                              GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG);
3948   smt->qid = queue->qid;
3949   smt->mid = qe->mid;
3950   smt->receiver = n->pid;
3951   memcpy (&smt[1],
3952           &s[1],
3953           s->bytes_msg);
3954   GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
3955   queue->queue_length++;
3956   queue->tc->details.communicator.total_queue_length++;
3957   GNUNET_MQ_send (queue->tc->mq,
3958                   env);
3959
3960   // FIXME: do something similar to the logic below
3961   // in defragmentation / reliability ACK handling!
3962
3963   /* Check if this transmission somehow conclusively finished handing 'pm'
3964      even without any explicit ACKs */
3965   if ( (PMT_CORE == s->pmt) &&
3966        (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
3967   {
3968     /* Full message sent, and over reliabile channel */
3969     client_send_response (pm,
3970                           GNUNET_YES,
3971                           pm->bytes_msg);
3972   }
3973   else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
3974             (PMT_FRAGMENT_BOX == s->pmt) )
3975   {
3976     struct PendingMessage *pos;
3977
3978     /* Fragment sent over reliabile channel */
3979     free_fragment_tree (s);
3980     pos = s->frag_parent;
3981     GNUNET_CONTAINER_MDLL_remove (frag,
3982                                   pos->head_frag,
3983                                   pos->tail_frag,
3984                                   s);
3985     GNUNET_free (s);
3986     /* check if subtree is done */
3987     while ( (NULL == pos->head_frag) &&
3988             (pos->frag_off == pos->bytes_msg) &&
3989             (pos != pm) )
3990     {
3991       s = pos;
3992       pos = s->frag_parent;
3993       GNUNET_CONTAINER_MDLL_remove (frag,
3994                                     pos->head_frag,
3995                                     pos->tail_frag,
3996                                     s);
3997       GNUNET_free (s);
3998     }
3999
4000     /* Was this the last applicable fragmment? */
4001     if ( (NULL == pm->head_frag) &&
4002          (pm->frag_off == pm->bytes_msg) )
4003       client_send_response (pm,
4004                             GNUNET_YES,
4005                             pm->bytes_msg /* FIXME: calculate and add overheads! */);
4006   }
4007   else if (PMT_CORE != pm->pmt)
4008   {
4009     /* This was an acknowledgement of some type, always free */
4010     free_pending_message (pm);
4011   }
4012   else
4013   {
4014     /* message not finished, waiting for acknowledgement */
4015     struct Neighbour *neighbour = pm->target;
4016     /* Update time by which we might retransmit 's' based on queue
4017        characteristics (i.e. RTT); it takes one RTT for the message to
4018        arrive and the ACK to come back in the best case; but the other
4019        side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
4020        retransmitting.  Note that in the future this heuristic should
4021        likely be improved further (measure RTT stability, consider
4022        message urgency and size when delaying ACKs, etc.) */
4023     s->next_attempt = GNUNET_TIME_relative_to_absolute
4024       (GNUNET_TIME_relative_multiply (queue->rtt,
4025                                       4));
4026     if (s == pm)
4027     {
4028       struct PendingMessage *pos;
4029
4030       /* re-insert sort in neighbour list */
4031       GNUNET_CONTAINER_MDLL_remove (neighbour,
4032                                     neighbour->pending_msg_head,
4033                                     neighbour->pending_msg_tail,
4034                                     pm);
4035       pos = neighbour->pending_msg_tail;
4036       while ( (NULL != pos) &&
4037               (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
4038         pos = pos->prev_neighbour;
4039       GNUNET_CONTAINER_MDLL_insert_after (neighbour,
4040                                           neighbour->pending_msg_head,
4041                                           neighbour->pending_msg_tail,
4042                                           pos,
4043                                           pm);
4044     }
4045     else
4046     {
4047       /* re-insert sort in fragment list */
4048       struct PendingMessage *fp = s->frag_parent;
4049       struct PendingMessage *pos;
4050
4051       GNUNET_CONTAINER_MDLL_remove (frag,
4052                                     fp->head_frag,
4053                                     fp->tail_frag,
4054                                     s);
4055       pos = fp->tail_frag;
4056       while ( (NULL != pos) &&
4057               (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
4058         pos = pos->prev_frag;
4059       GNUNET_CONTAINER_MDLL_insert_after (frag,
4060                                           fp->head_frag,
4061                                           fp->tail_frag,
4062                                           pos,
4063                                           s);
4064     }
4065   }
4066
4067   /* finally, re-schedule queue transmission task itself */
4068   schedule_transmit_on_queue (queue);
4069 }
4070
4071
4072 /**
4073  * Bandwidth tracker informs us that the delay until we
4074  * can transmit again changed.
4075  *
4076  * @param cls a `struct Queue` for which the delay changed
4077  */
4078 static void
4079 tracker_update_out_cb (void *cls)
4080 {
4081   struct Queue *queue = cls;
4082   struct Neighbour *n = queue->neighbour;
4083
4084   if (NULL == n->pending_msg_head)
4085   {
4086     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4087                 "Bandwidth allocation updated for empty transmission queue `%s'\n",
4088                 queue->address);
4089     return; /* no message pending, nothing to do here! */
4090   }
4091   GNUNET_SCHEDULER_cancel (queue->transmit_task);
4092   queue->transmit_task = NULL;
4093   schedule_transmit_on_queue (queue);
4094 }
4095
4096
4097 /**
4098  * Bandwidth tracker informs us that excessive outbound bandwidth was
4099  * allocated which is not being used.
4100  *
4101  * @param cls a `struct Queue` for which the excess was noted
4102  */
4103 static void
4104 tracker_excess_out_cb (void *cls)
4105 {
4106   /* FIXME: trigger excess bandwidth report to core? Right now,
4107      this is done internally within transport_api2_core already,
4108      but we probably want to change the logic and trigger it
4109      from here via a message instead! */
4110   /* TODO: maybe inform someone at this point? */
4111   GNUNET_STATISTICS_update (GST_stats,
4112                             "# Excess outbound bandwidth reported",
4113                             1,
4114                             GNUNET_NO);
4115 }
4116
4117
4118
4119 /**
4120  * Bandwidth tracker informs us that excessive inbound bandwidth was allocated
4121  * which is not being used.
4122  *
4123  * @param cls a `struct Queue` for which the excess was noted
4124  */
4125 static void
4126 tracker_excess_in_cb (void *cls)
4127 {
4128   /* TODO: maybe inform somone at this point? */
4129   GNUNET_STATISTICS_update (GST_stats,
4130                             "# Excess inbound bandwidth reported",
4131                             1,
4132                             GNUNET_NO);
4133 }
4134
4135
4136 /**
4137  * New queue became available.  Process the request.
4138  *
4139  * @param cls the client
4140  * @param aqm the send message that was sent
4141  */
4142 static void
4143 handle_add_queue_message (void *cls,
4144                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
4145 {
4146   struct TransportClient *tc = cls;
4147   struct Queue *queue;
4148   struct Neighbour *neighbour;
4149   const char *addr;
4150   uint16_t addr_len;
4151
4152   if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
4153   {
4154     /* MTU so small as to be useless for transmissions,
4155        required for #fragment_message()! */
4156     GNUNET_break_op (0);
4157     GNUNET_SERVICE_client_drop (tc->client);
4158     return;
4159   }
4160   neighbour = lookup_neighbour (&aqm->receiver);
4161   if (NULL == neighbour)
4162   {
4163     neighbour = GNUNET_new (struct Neighbour);
4164     neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
4165     neighbour->pid = aqm->receiver;
4166     GNUNET_assert (GNUNET_OK ==
4167                    GNUNET_CONTAINER_multipeermap_put (neighbours,
4168                                                       &neighbour->pid,
4169                                                       neighbour,
4170                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
4171     cores_send_connect_info (&neighbour->pid,
4172                              GNUNET_BANDWIDTH_ZERO);
4173   }
4174   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
4175   addr = (const char *) &aqm[1];
4176
4177   queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
4178   queue->tc = tc;
4179   queue->address = (const char *) &queue[1];
4180   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
4181   queue->qid = aqm->qid;
4182   queue->mtu = ntohl (aqm->mtu);
4183   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
4184   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
4185   queue->neighbour = neighbour;
4186   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
4187                                   &tracker_update_in_cb,
4188                                   queue,
4189                                   GNUNET_BANDWIDTH_ZERO,
4190                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4191                                   &tracker_excess_in_cb,
4192                                   queue);
4193   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
4194                                   &tracker_update_out_cb,
4195                                   queue,
4196                                   GNUNET_BANDWIDTH_ZERO,
4197                                   GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
4198                                   &tracker_excess_out_cb,
4199                                   queue);
4200   memcpy (&queue[1],
4201           addr,
4202           addr_len);
4203   /* notify monitors about new queue */
4204   {
4205     struct MonitorEvent me = {
4206       .rtt = queue->rtt,
4207       .cs = queue->cs
4208     };
4209
4210     notify_monitors (&neighbour->pid,
4211                      queue->address,
4212                      queue->nt,
4213                      &me);
4214   }
4215   GNUNET_CONTAINER_MDLL_insert (neighbour,
4216                                 neighbour->queue_head,
4217                                 neighbour->queue_tail,
4218                                 queue);
4219   GNUNET_CONTAINER_MDLL_insert (client,
4220                                 tc->details.communicator.queue_head,
4221                                 tc->details.communicator.queue_tail,
4222                                 queue);
4223   GNUNET_SERVICE_client_continue (tc->client);
4224 }
4225
4226
4227 /**
4228  * Queue to a peer went down.  Process the request.
4229  *
4230  * @param cls the client
4231  * @param dqm the send message that was sent
4232  */
4233 static void
4234 handle_del_queue_message (void *cls,
4235                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
4236 {
4237   struct TransportClient *tc = cls;
4238
4239   if (CT_COMMUNICATOR != tc->type)
4240   {
4241     GNUNET_break (0);
4242     GNUNET_SERVICE_client_drop (tc->client);
4243     return;
4244   }
4245   for (struct Queue *queue = tc->details.communicator.queue_head;
4246        NULL != queue;
4247        queue = queue->next_client)
4248   {
4249     struct Neighbour *neighbour = queue->neighbour;
4250
4251     if ( (dqm->qid != queue->qid) ||
4252          (0 != memcmp (&dqm->receiver,
4253                        &neighbour->pid,
4254                        sizeof (struct GNUNET_PeerIdentity))) )
4255       continue;
4256     free_queue (queue);
4257     GNUNET_SERVICE_client_continue (tc->client);
4258     return;
4259   }
4260   GNUNET_break (0);
4261   GNUNET_SERVICE_client_drop (tc->client);
4262 }
4263
4264
4265 /**
4266  * Message was transmitted.  Process the request.
4267  *
4268  * @param cls the client
4269  * @param sma the send message that was sent
4270  */
4271 static void
4272 handle_send_message_ack (void *cls,
4273                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
4274 {
4275   struct TransportClient *tc = cls;
4276   struct QueueEntry *qe;
4277
4278   if (CT_COMMUNICATOR != tc->type)
4279   {
4280     GNUNET_break (0);
4281     GNUNET_SERVICE_client_drop (tc->client);
4282     return;
4283   }
4284
4285   /* find our queue entry matching the ACK */
4286   qe = NULL;
4287   for (struct Queue *queue = tc->details.communicator.queue_head;
4288        NULL != queue;
4289        queue = queue->next_client)
4290   {
4291     if (0 != memcmp (&queue->neighbour->pid,
4292                      &sma->receiver,
4293                      sizeof (struct GNUNET_PeerIdentity)))
4294       continue;
4295     for (struct QueueEntry *qep = queue->queue_head;
4296          NULL != qep;
4297          qep = qep->next)
4298     {
4299       if (qep->mid != sma->mid)
4300         continue;
4301       qe = qep;
4302       break;
4303     }
4304     break;
4305   }
4306   if (NULL == qe)
4307   {
4308     /* this should never happen */
4309     GNUNET_break (0);
4310     GNUNET_SERVICE_client_drop (tc->client);
4311     return;
4312   }
4313   GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
4314                                qe->queue->queue_tail,
4315                                qe);
4316   qe->queue->queue_length--;
4317   tc->details.communicator.total_queue_length--;
4318   GNUNET_SERVICE_client_continue (tc->client);
4319
4320   /* if applicable, resume transmissions that waited on ACK */
4321   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 == tc->details.communicator.total_queue_length)
4322   {
4323     /* Communicator dropped below threshold, resume all queues */
4324     GNUNET_STATISTICS_update (GST_stats,
4325                               "# Transmission throttled due to communicator queue limit",
4326                               -1,
4327                               GNUNET_NO);
4328     for (struct Queue *queue = tc->details.communicator.queue_head;
4329          NULL != queue;
4330          queue = queue->next_client)
4331       schedule_transmit_on_queue (queue);
4332   }
4333   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
4334   {
4335     /* queue dropped below threshold; only resume this one queue */
4336     GNUNET_STATISTICS_update (GST_stats,
4337                               "# Transmission throttled due to queue queue limit",
4338                               -1,
4339                               GNUNET_NO);
4340     schedule_transmit_on_queue (qe->queue);
4341   }
4342
4343   /* TODO: we also should react on the status! */
4344   // FIXME: this probably requires queue->pm = s assignment!
4345   // FIXME: react to communicator status about transmission request. We got:
4346   sma->status; // OK success, SYSERR failure
4347
4348   GNUNET_free (qe);
4349 }
4350
4351
4352 /**
4353  * Iterator telling new MONITOR client about all existing
4354  * queues to peers.
4355  *
4356  * @param cls the new `struct TransportClient`
4357  * @param pid a connected peer
4358  * @param value the `struct Neighbour` with more information
4359  * @return #GNUNET_OK (continue to iterate)
4360  */
4361 static int
4362 notify_client_queues (void *cls,
4363                       const struct GNUNET_PeerIdentity *pid,
4364                       void *value)
4365 {
4366   struct TransportClient *tc = cls;
4367   struct Neighbour *neighbour = value;
4368
4369   GNUNET_assert (CT_MONITOR == tc->type);
4370   for (struct Queue *q = neighbour->queue_head;
4371        NULL != q;
4372        q = q->next_neighbour)
4373   {
4374     struct MonitorEvent me = {
4375       .rtt = q->rtt,
4376       .cs = q->cs,
4377       .num_msg_pending = q->num_msg_pending,
4378       .num_bytes_pending = q->num_bytes_pending
4379     };
4380
4381     notify_monitor (tc,
4382                     pid,
4383                     q->address,
4384                     q->nt,
4385                     &me);
4386   }
4387   return GNUNET_OK;
4388 }
4389
4390
4391 /**
4392  * Initialize a monitor client.
4393  *
4394  * @param cls the client
4395  * @param start the start message that was sent
4396  */
4397 static void
4398 handle_monitor_start (void *cls,
4399                       const struct GNUNET_TRANSPORT_MonitorStart *start)
4400 {
4401   struct TransportClient *tc = cls;
4402
4403   if (CT_NONE != tc->type)
4404   {
4405     GNUNET_break (0);
4406     GNUNET_SERVICE_client_drop (tc->client);
4407     return;
4408   }
4409   tc->type = CT_MONITOR;
4410   tc->details.monitor.peer = start->peer;
4411   tc->details.monitor.one_shot = ntohl (start->one_shot);
4412   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4413                                          &notify_client_queues,
4414                                          tc);
4415   GNUNET_SERVICE_client_mark_monitor (tc->client);
4416   GNUNET_SERVICE_client_continue (tc->client);
4417 }
4418
4419
4420 /**
4421  * Find transport client providing communication service
4422  * for the protocol @a prefix.
4423  *
4424  * @param prefix communicator name
4425  * @return NULL if no such transport client is available
4426  */
4427 static struct TransportClient *
4428 lookup_communicator (const char *prefix)
4429 {
4430   for (struct TransportClient *tc = clients_head;
4431        NULL != tc;
4432        tc = tc->next)
4433   {
4434     if (CT_COMMUNICATOR != tc->type)
4435       continue;
4436     if (0 == strcmp (prefix,
4437                      tc->details.communicator.address_prefix))
4438       return tc;
4439   }
4440   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4441               "Somone suggested use of communicator for `%s', but we do not have such a communicator!\n",
4442               prefix);
4443   return NULL;
4444 }
4445
4446
4447 /**
4448  * Signature of a function called with a communicator @a address of a peer
4449  * @a pid that an application wants us to connect to.
4450  *
4451  * @param pid target peer
4452  * @param address the address to try
4453  */
4454 static void
4455 suggest_to_connect (const struct GNUNET_PeerIdentity *pid,
4456                     const char *address)
4457 {
4458   static uint32_t idgen;
4459   struct TransportClient *tc;
4460   char *prefix;
4461   struct GNUNET_TRANSPORT_CreateQueue *cqm;
4462   struct GNUNET_MQ_Envelope *env;
4463   size_t alen;
4464
4465   prefix = GNUNET_HELLO_address_to_prefix (address);
4466   if (NULL == prefix)
4467   {
4468     GNUNET_break (0); /* We got an invalid address!? */
4469     return;
4470   }
4471   tc = lookup_communicator (prefix);
4472   if (NULL == tc)
4473   {
4474     GNUNET_STATISTICS_update (GST_stats,
4475                               "# Suggestions ignored due to missing communicator",
4476                               1,
4477                               GNUNET_NO);
4478     return;
4479   }
4480   /* forward suggestion for queue creation to communicator */
4481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4482               "Request #%u for `%s' communicator to create queue to `%s'\n",
4483               (unsigned int) idgen,
4484               prefix,
4485               address);
4486   alen = strlen (address) + 1;
4487   env = GNUNET_MQ_msg_extra (cqm,
4488                              alen,
4489                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
4490   cqm->request_id = htonl (idgen++);
4491   cqm->receiver = *pid;
4492   memcpy (&cqm[1],
4493           address,
4494           alen);
4495   GNUNET_MQ_send (tc->mq,
4496                   env);
4497 }
4498
4499
4500 /**
4501  * Communicator tells us that our request to create a queue "worked", that
4502  * is setting up the queue is now in process.
4503  *
4504  * @param cls the `struct TransportClient`
4505  * @param cqr confirmation message
4506  */
4507 static void
4508 handle_queue_create_ok (void *cls,
4509                         const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4510 {
4511   struct TransportClient *tc = cls;
4512
4513   if (CT_COMMUNICATOR != tc->type)
4514   {
4515     GNUNET_break (0);
4516     GNUNET_SERVICE_client_drop (tc->client);
4517     return;
4518   }
4519   GNUNET_STATISTICS_update (GST_stats,
4520                             "# Suggestions succeeded at communicator",
4521                             1,
4522                             GNUNET_NO);
4523   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4524               "Request #%u for communicator to create queue succeeded\n",
4525               (unsigned int) ntohs (cqr->request_id));
4526   GNUNET_SERVICE_client_continue (tc->client);
4527 }
4528
4529
4530 /**
4531  * Communicator tells us that our request to create a queue failed. This usually
4532  * indicates that the provided address is simply invalid or that the communicator's
4533  * resources are exhausted.
4534  *
4535  * @param cls the `struct TransportClient`
4536  * @param cqr failure message
4537  */
4538 static void
4539 handle_queue_create_fail (void *cls,
4540                           const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
4541 {
4542   struct TransportClient *tc = cls;
4543
4544   if (CT_COMMUNICATOR != tc->type)
4545   {
4546     GNUNET_break (0);
4547     GNUNET_SERVICE_client_drop (tc->client);
4548     return;
4549   }
4550   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4551               "Request #%u for communicator to create queue failed\n",
4552               (unsigned int) ntohs (cqr->request_id));
4553   GNUNET_STATISTICS_update (GST_stats,
4554                             "# Suggestions failed in queue creation at communicator",
4555                             1,
4556                             GNUNET_NO);
4557   GNUNET_SERVICE_client_continue (tc->client);
4558 }
4559
4560
4561 /**
4562  * Function called by PEERSTORE for each matching record.
4563  *
4564  * @param cls closure
4565  * @param record peerstore record information
4566  * @param emsg error message, or NULL if no errors
4567  */
4568 static void
4569 handle_hello (void *cls,
4570               const struct GNUNET_PEERSTORE_Record *record,
4571               const char *emsg)
4572 {
4573   struct PeerRequest *pr = cls;
4574   const char *val;
4575
4576   if (NULL != emsg)
4577   {
4578     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4579                 "Got failure from PEERSTORE: %s\n",
4580                 emsg);
4581     return;
4582   }
4583   val = record->value;
4584   if ( (0 == record->value_size) ||
4585        ('\0' != val[record->value_size - 1]) )
4586   {
4587     GNUNET_break (0);
4588     return;
4589   }
4590   suggest_to_connect (&pr->pid,
4591                       (const char *) record->value);
4592 }
4593
4594
4595 /**
4596  * We have received a `struct ExpressPreferenceMessage` from an application client.
4597  *
4598  * @param cls handle to the client
4599  * @param msg the start message
4600  */
4601 static void
4602 handle_suggest (void *cls,
4603                 const struct ExpressPreferenceMessage *msg)
4604 {
4605   struct TransportClient *tc = cls;
4606   struct PeerRequest *pr;
4607
4608   if (CT_NONE == tc->type)
4609   {
4610     tc->type = CT_APPLICATION;
4611     tc->details.application.requests
4612       = GNUNET_CONTAINER_multipeermap_create (16,
4613                                               GNUNET_YES);
4614   }
4615   if (CT_APPLICATION != tc->type)
4616   {
4617     GNUNET_break (0);
4618     GNUNET_SERVICE_client_drop (tc->client);
4619     return;
4620   }
4621   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4622               "Client suggested we talk to %s with preference %d at rate %u\n",
4623               GNUNET_i2s (&msg->peer),
4624               (int) ntohl (msg->pk),
4625               (int) ntohl (msg->bw.value__));
4626   pr = GNUNET_new (struct PeerRequest);
4627   pr->tc = tc;
4628   pr->pid = msg->peer;
4629   pr->bw = msg->bw;
4630   pr->pk = (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk);
4631   if (GNUNET_YES !=
4632       GNUNET_CONTAINER_multipeermap_put (tc->details.application.requests,
4633                                          &pr->pid,
4634                                          pr,
4635                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
4636   {
4637     GNUNET_break (0);
4638     GNUNET_free (pr);
4639     GNUNET_SERVICE_client_drop (tc->client);
4640     return;
4641   }
4642   pr->wc = GNUNET_PEERSTORE_watch (peerstore,
4643                                    "transport",
4644                                    &pr->pid,
4645                                    "hello",
4646                                    &handle_hello,
4647                                    pr);
4648   GNUNET_SERVICE_client_continue (tc->client);
4649 }
4650
4651
4652 /**
4653  * We have received a `struct ExpressPreferenceMessage` from an application client.
4654  *
4655  * @param cls handle to the client
4656  * @param msg the start message
4657  */
4658 static void
4659 handle_suggest_cancel (void *cls,
4660                        const struct ExpressPreferenceMessage *msg)
4661 {
4662   struct TransportClient *tc = cls;
4663   struct PeerRequest *pr;
4664
4665   if (CT_APPLICATION != tc->type)
4666   {
4667     GNUNET_break (0);
4668     GNUNET_SERVICE_client_drop (tc->client);
4669     return;
4670   }
4671   pr = GNUNET_CONTAINER_multipeermap_get (tc->details.application.requests,
4672                                           &msg->peer);
4673   if (NULL == pr)
4674   {
4675     GNUNET_break (0);
4676     GNUNET_SERVICE_client_drop (tc->client);
4677     return;
4678   }
4679   (void) stop_peer_request (tc,
4680                             &pr->pid,
4681                             pr);
4682   GNUNET_SERVICE_client_continue (tc->client);
4683 }
4684
4685
4686 /**
4687  * Check #GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY
4688  * messages. We do nothing here, real verification is done later.
4689  *
4690  * @param cls a `struct TransportClient *`
4691  * @param msg message to verify
4692  * @return #GNUNET_OK
4693  */
4694 static int
4695 check_address_consider_verify (void *cls,
4696                                const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
4697 {
4698   (void) cls;
4699   (void) hdr;
4700   return GNUNET_OK;
4701 }
4702
4703
4704 /**
4705  * Given another peers address, consider checking it for validity
4706  * and then adding it to the Peerstore.
4707  *
4708  * @param cls a `struct TransportClient`
4709  * @param hdr message containing the raw address data and
4710  *        signature in the body, see #GNUNET_HELLO_extract_address()
4711  */
4712 static void
4713 handle_address_consider_verify (void *cls,
4714                                 const struct GNUNET_TRANSPORT_AddressToVerify *hdr)
4715 {
4716   char *address;
4717   enum GNUNET_NetworkType nt;
4718   struct GNUNET_TIME_Absolute expiration;
4719
4720   (void) cls;
4721   // FIXME: pre-check: do we know this address already?
4722   // FIXME: pre-check: rate-limit signature verification / validation!
4723   address = GNUNET_HELLO_extract_address (&hdr[1],
4724                                           ntohs (hdr->header.size) - sizeof (*hdr),
4725                                           &hdr->peer,
4726                                           &nt,
4727                                           &expiration);
4728   if (NULL == address)
4729   {
4730     GNUNET_break_op (0);
4731     return;
4732   }
4733   if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
4734     return; /* expired */
4735   // FIXME: do begin actual verification here!
4736   GNUNET_free (address);
4737 }
4738
4739
4740 /**
4741  * Free neighbour entry.
4742  *
4743  * @param cls NULL
4744  * @param pid unused
4745  * @param value a `struct Neighbour`
4746  * @return #GNUNET_OK (always)
4747  */
4748 static int
4749 free_neighbour_cb (void *cls,
4750                    const struct GNUNET_PeerIdentity *pid,
4751                    void *value)
4752 {
4753   struct Neighbour *neighbour = value;
4754
4755   (void) cls;
4756   (void) pid;
4757   GNUNET_break (0); // should this ever happen?
4758   free_neighbour (neighbour);
4759
4760   return GNUNET_OK;
4761 }
4762
4763
4764 /**
4765  * Free DV route entry.
4766  *
4767  * @param cls NULL
4768  * @param pid unused
4769  * @param value a `struct DistanceVector`
4770  * @return #GNUNET_OK (always)
4771  */
4772 static int
4773 free_dv_routes_cb (void *cls,
4774                    const struct GNUNET_PeerIdentity *pid,
4775                    void *value)
4776 {
4777   struct DistanceVector *dv = value;
4778
4779   (void) cls;
4780   (void) pid;
4781   free_dv_route (dv);
4782
4783   return GNUNET_OK;
4784 }
4785
4786
4787 /**
4788  * Free ephemeral entry.
4789  *
4790  * @param cls NULL
4791  * @param pid unused
4792  * @param value a `struct Neighbour`
4793  * @return #GNUNET_OK (always)
4794  */
4795 static int
4796 free_ephemeral_cb (void *cls,
4797                    const struct GNUNET_PeerIdentity *pid,
4798                    void *value)
4799 {
4800   struct EphemeralCacheEntry *ece = value;
4801
4802   (void) cls;
4803   (void) pid;
4804   free_ephemeral (ece);
4805   return GNUNET_OK;
4806 }
4807
4808
4809 /**
4810  * Function called when the service shuts down.  Unloads our plugins
4811  * and cancels pending validations.
4812  *
4813  * @param cls closure, unused
4814  */
4815 static void
4816 do_shutdown (void *cls)
4817 {
4818   (void) cls;
4819
4820   if (NULL != ephemeral_task)
4821   {
4822     GNUNET_SCHEDULER_cancel (ephemeral_task);
4823     ephemeral_task = NULL;
4824   }
4825   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
4826                                          &free_neighbour_cb,
4827                                          NULL);
4828   if (NULL != peerstore)
4829   {
4830     GNUNET_PEERSTORE_disconnect (peerstore,
4831                                  GNUNET_NO);
4832     peerstore = NULL;
4833   }
4834   if (NULL != GST_stats)
4835   {
4836     GNUNET_STATISTICS_destroy (GST_stats,
4837                                GNUNET_NO);
4838     GST_stats = NULL;
4839   }
4840   if (NULL != GST_my_private_key)
4841   {
4842     GNUNET_free (GST_my_private_key);
4843     GST_my_private_key = NULL;
4844   }
4845   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
4846   neighbours = NULL;
4847   GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
4848                                          &free_dv_routes_cb,
4849                                          NULL);
4850   GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
4851   dv_routes = NULL;
4852   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
4853                                          &free_ephemeral_cb,
4854                                          NULL);
4855   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
4856   ephemeral_map = NULL;
4857   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
4858   ephemeral_heap = NULL;
4859 }
4860
4861
4862 /**
4863  * Initiate transport service.
4864  *
4865  * @param cls closure
4866  * @param c configuration to use
4867  * @param service the initialized service
4868  */
4869 static void
4870 run (void *cls,
4871      const struct GNUNET_CONFIGURATION_Handle *c,
4872      struct GNUNET_SERVICE_Handle *service)
4873 {
4874   (void) cls;
4875   /* setup globals */
4876   GST_cfg = c;
4877   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
4878                                                      GNUNET_YES);
4879   dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
4880                                                     GNUNET_YES);
4881   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
4882                                                         GNUNET_YES);
4883   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
4884   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
4885   if (NULL == GST_my_private_key)
4886   {
4887     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4888                 _("Transport service is lacking key configuration settings. Exiting.\n"));
4889     GNUNET_SCHEDULER_shutdown ();
4890     return;
4891   }
4892   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
4893                                       &GST_my_identity.public_key);
4894   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
4895              "My identity is `%s'\n",
4896              GNUNET_i2s_full (&GST_my_identity));
4897   GST_stats = GNUNET_STATISTICS_create ("transport",
4898                                         GST_cfg);
4899   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
4900                                  NULL);
4901   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
4902   if (NULL == peerstore)
4903   {
4904     GNUNET_break (0);
4905     GNUNET_SCHEDULER_shutdown ();
4906     return;
4907   }
4908 }
4909
4910
4911 /**
4912  * Define "main" method using service macro.
4913  */
4914 GNUNET_SERVICE_MAIN
4915 ("transport",
4916  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
4917  &run,
4918  &client_connect_cb,
4919  &client_disconnect_cb,
4920  NULL,
4921  /* communication with applications */
4922  GNUNET_MQ_hd_fixed_size (suggest,
4923                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST,
4924                           struct ExpressPreferenceMessage,
4925                           NULL),
4926  GNUNET_MQ_hd_fixed_size (suggest_cancel,
4927                           GNUNET_MESSAGE_TYPE_TRANSPORT_SUGGEST_CANCEL,
4928                           struct ExpressPreferenceMessage,
4929                           NULL),
4930  /* communication with core */
4931  GNUNET_MQ_hd_fixed_size (client_start,
4932                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
4933                           struct StartMessage,
4934                           NULL),
4935  GNUNET_MQ_hd_var_size (client_send,
4936                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
4937                         struct OutboundMessage,
4938                         NULL),
4939  /* communication with communicators */
4940  GNUNET_MQ_hd_var_size (communicator_available,
4941                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
4942                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
4943                         NULL),
4944  GNUNET_MQ_hd_var_size (communicator_backchannel,
4945                         GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
4946                         struct GNUNET_TRANSPORT_CommunicatorBackchannel,
4947                         NULL),
4948  GNUNET_MQ_hd_var_size (add_address,
4949                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
4950                         struct GNUNET_TRANSPORT_AddAddressMessage,
4951                         NULL),
4952  GNUNET_MQ_hd_fixed_size (del_address,
4953                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
4954                           struct GNUNET_TRANSPORT_DelAddressMessage,
4955                           NULL),
4956  GNUNET_MQ_hd_var_size (incoming_msg,
4957                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
4958                         struct GNUNET_TRANSPORT_IncomingMessage,
4959                         NULL),
4960  GNUNET_MQ_hd_fixed_size (queue_create_ok,
4961                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
4962                           struct GNUNET_TRANSPORT_CreateQueueResponse,
4963                           NULL),
4964  GNUNET_MQ_hd_fixed_size (queue_create_fail,
4965                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
4966                           struct GNUNET_TRANSPORT_CreateQueueResponse,
4967                           NULL),
4968  GNUNET_MQ_hd_var_size (add_queue_message,
4969                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
4970                         struct GNUNET_TRANSPORT_AddQueueMessage,
4971                         NULL),
4972  GNUNET_MQ_hd_var_size (address_consider_verify,
4973                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
4974                         struct GNUNET_TRANSPORT_AddressToVerify,
4975                         NULL),
4976  GNUNET_MQ_hd_fixed_size (del_queue_message,
4977                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
4978                           struct GNUNET_TRANSPORT_DelQueueMessage,
4979                           NULL),
4980  GNUNET_MQ_hd_fixed_size (send_message_ack,
4981                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
4982                           struct GNUNET_TRANSPORT_SendMessageToAck,
4983                           NULL),
4984  /* communication with monitors */
4985  GNUNET_MQ_hd_fixed_size (monitor_start,
4986                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
4987                           struct GNUNET_TRANSPORT_MonitorStart,
4988                           NULL),
4989  GNUNET_MQ_handler_end ());
4990
4991
4992 /* end of file gnunet-service-transport.c */