more work on tng
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 /**
21  * @file transport/gnunet-service-tng.c
22  * @brief main for gnunet-service-tng
23  * @author Christian Grothoff
24  *
25  * TODO:
26  * - figure out how to transmit (selective) ACKs in case of uni-directional
27  *   communicators (with/without core? DV-only?) When do we use ACKs?
28  *   => communicators use selective ACKs for flow control
29  *   => transport uses message-level ACKs for RTT, fragment confirmation
30  *   => integrate DV into transport, use neither core nor communicators
31  *      but rather give communicators transport-encapsulated messages
32  *      (which could be core-data, background-channel traffic, or
33  *       transport-to-transport traffic)
34  *
35  * Implement:
36  * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
37  *
38  * Easy:
39  * - use ATS bandwidth allocation callback and schedule transmissions!
40  *
41  * Plan:
42  * - inform ATS about RTT, goodput/loss, overheads, etc. (GNUNET_ATS_session_update())
43  *
44  * Later:
45  * - change transport-core API to provide proper flow control in both
46  *   directions, allow multiple messages per peer simultaneously (tag
47  *   confirmations with unique message ID), and replace quota-out with
48  *   proper flow control;
49  *
50  * Design realizations / discussion:
51  * - communicators do flow control by calling MQ "notify sent"
52  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
53  *   or explicitly via background channel FC ACKs.  As long as the
54  *   channel is not full, they may 'notify sent' even if the other
55  *   peer has not yet confirmed receipt. The other peer confirming
56  *   is _only_ for FC, not for more reliable transmission; reliable
57  *   transmission (i.e. of fragments) is left to _transport_.
58  * - ACKs sent back in uni-directional communicators are done via
59  *   the background channel API; here transport _may_ initially
60  *   broadcast (with bounded # hops) if no path is known;
61  * - transport should _integrate_ DV-routing and build a view of
62  *   the network; then background channel traffic can be
63  *   routed via DV as well as explicit "DV" traffic.
64  * - background channel is also used for ACKs and NAT traversal support
65  * - transport service is responsible for AEAD'ing the background
66  *   channel, timestamps and monotonic time are used against replay
67  *   of old messages -> peerstore needs to be supplied with
68  *   "latest timestamps seen" data
69  * - if transport implements DV, we likely need a 3rd peermap
70  *   in addition to ephemerals and (direct) neighbours
71  *   => in this data structure, we should track ATS metrics (distance, RTT, etc.)
72  *   as well as latest timestamps seen, goodput, fragments for transmission, etc.
73  *   ==> check if stuff needs to be moved out of "Neighbour"
74  * - transport should encapsualte core-level messages and do its
75  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
76  *   for retransmission
77  */
78 #include "platform.h"
79 #include "gnunet_util_lib.h"
80 #include "gnunet_statistics_service.h"
81 #include "gnunet_transport_monitor_service.h"
82 #include "gnunet_peerstore_service.h"
83 #include "gnunet_hello_lib.h"
84 #include "gnunet_ats_transport_service.h"
85 #include "transport.h"
86
87
88 /**
89  * How many messages can we have pending for a given client process
90  * before we start to drop incoming messages?  We typically should
91  * have only one client and so this would be the primary buffer for
92  * messages, so the number should be chosen rather generously.
93  *
94  * The expectation here is that most of the time the queue is large
95  * enough so that a drop is virtually never required.  Note that
96  * this value must be about as large as 'TOTAL_MSGS' in the
97  * 'test_transport_api_reliability.c', otherwise that testcase may
98  * fail.
99  */
100 #define MAX_PENDING (128 * 1024)
101
102
103 GNUNET_NETWORK_STRUCT_BEGIN
104
105 /**
106  * Outer layer of an encapsulated backchannel message.
107  */
108 struct TransportBackchannelEncapsulationMessage
109 {
110   /**
111    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
112    */
113   struct GNUNET_MessageHeader header;
114
115   /**
116    * Distance the backchannel message has traveled, to be updated at
117    * each hop.  Used to bound the number of hops in case a backchannel
118    * message is broadcast and thus travels without routing
119    * information (during initial backchannel discovery).
120    */
121   uint32_t distance;
122
123   /**
124    * Target's peer identity (as backchannels may be transmitted
125    * indirectly, or even be broadcast).
126    */
127   struct GNUNET_PeerIdentity target;
128
129   /**
130    * Ephemeral key setup by the sender for @e target, used
131    * to encrypt the payload.
132    */
133   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
134
135   /**
136    * HMAC over the ciphertext of the encrypted, variable-size
137    * body that follows.  Verified via DH of @e target and
138    * @e ephemeral_key
139    */
140   struct GNUNET_HashCode hmac;
141
142   /* Followed by encrypted, variable-size payload */
143 };
144
145
146 /**
147  * Message by which a peer confirms that it is using an
148  * ephemeral key.
149  */
150 struct EphemeralConfirmation
151 {
152
153   /**
154    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
155    */
156   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
157
158   /**
159    * How long is this signature over the ephemeral key
160    * valid?
161    */
162   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
163
164   /**
165    * Ephemeral key setup by the sender for @e target, used
166    * to encrypt the payload.
167    */
168   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
169
170 };
171
172 /**
173  * Plaintext of the variable-size payload that is encrypted
174  * within a `struct TransportBackchannelEncapsulationMessage`
175  */
176 struct TransportBackchannelRequestPayload
177 {
178
179   /**
180    * Sender's peer identity.
181    */
182   struct GNUNET_PeerIdentity sender;
183
184   /**
185    * Signature of the sender over an
186    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
187    */
188   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
189
190   /**
191    * How long is this signature over the ephemeral key
192    * valid?
193    */
194   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
195
196   /**
197    * Current monotonic time of the sending transport service.  Used to
198    * detect replayed messages.  Note that the receiver should remember
199    * a list of the recently seen timestamps and only reject messages
200    * if the timestamp is in the list, or the list is "full" and the
201    * timestamp is smaller than the lowest in the list.  This list of
202    * timestamps per peer should be persisted to guard against replays
203    * after restarts.
204    */
205   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
206
207   /* Followed by a `struct GNUNET_MessageHeader` with a message
208      for a communicator */
209
210   /* Followed by a 0-termianted string specifying the name of
211      the communicator which is to receive the message */
212
213 };
214
215 GNUNET_NETWORK_STRUCT_END
216
217
218
219 /**
220  * What type of client is the `struct TransportClient` about?
221  */
222 enum ClientType
223 {
224   /**
225    * We do not know yet (client is fresh).
226    */
227   CT_NONE = 0,
228
229   /**
230    * Is the CORE service, we need to forward traffic to it.
231    */
232   CT_CORE = 1,
233
234   /**
235    * It is a monitor, forward monitor data.
236    */
237   CT_MONITOR = 2,
238
239   /**
240    * It is a communicator, use for communication.
241    */
242   CT_COMMUNICATOR = 3
243 };
244
245
246 /**
247  * Entry in our cache of ephemeral keys we currently use.
248  */
249 struct EphemeralCacheEntry
250 {
251
252   /**
253    * Target's peer identity (we don't re-use ephemerals
254    * to limit linkability of messages).
255    */
256   struct GNUNET_PeerIdentity target;
257
258   /**
259    * Signature affirming @e ephemeral_key of type
260    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
261    */
262   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
263
264   /**
265    * How long is @e sender_sig valid
266    */
267   struct GNUNET_TIME_Absolute ephemeral_validity;
268
269   /**
270    * Our ephemeral key.
271    */
272   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
273
274   /**
275    * Node in the ephemeral cache for this entry.
276    * Used for expiration.
277    */
278   struct GNUNET_CONTAINER_HeapNode *hn;
279 };
280
281
282 /**
283  * Client connected to the transport service.
284  */
285 struct TransportClient;
286
287
288 /**
289  * A neighbour that at least one communicator is connected to.
290  */
291 struct Neighbour;
292
293
294 /**
295  * An ATS session is a message queue provided by a communicator
296  * via which we can reach a particular neighbour.
297  */
298 struct GNUNET_ATS_Session
299 {
300   /**
301    * Kept in a MDLL.
302    */
303   struct GNUNET_ATS_Session *next_neighbour;
304
305   /**
306    * Kept in a MDLL.
307    */
308   struct GNUNET_ATS_Session *prev_neighbour;
309
310   /**
311    * Kept in a MDLL.
312    */
313   struct GNUNET_ATS_Session *prev_client;
314
315   /**
316    * Kept in a MDLL.
317    */
318   struct GNUNET_ATS_Session *next_client;
319
320   /**
321    * Which neighbour is this ATS session for?
322    */
323   struct Neighbour *neighbour;
324
325   /**
326    * Which communicator offers this ATS session?
327    */
328   struct TransportClient *tc;
329
330   /**
331    * Address served by the ATS session.
332    */
333   const char *address;
334
335   /**
336    * Handle by which we inform ATS about this queue.
337    */
338   struct GNUNET_ATS_SessionRecord *sr;
339   
340   /**
341    * Our current RTT estimate for this ATS session.
342    */
343   struct GNUNET_TIME_Relative rtt;
344
345   /**
346    * Unique identifier of this ATS session with the communicator.
347    */
348   uint32_t qid;
349
350   /**
351    * Maximum transmission unit supported by this ATS session.
352    */
353   uint32_t mtu;
354
355   /**
356    * Distance to the target of this ATS session.
357    */
358   uint32_t distance;
359
360   /**
361    * Network type offered by this ATS session.
362    */
363   enum GNUNET_NetworkType nt;
364
365   /**
366    * Connection status for this ATS session.
367    */
368   enum GNUNET_TRANSPORT_ConnectionStatus cs;
369
370   /**
371    * Messages pending.
372    */
373   uint32_t num_msg_pending;
374
375   /**
376    * Bytes pending.
377    */
378   uint32_t num_bytes_pending;
379
380   /**
381    * How much outbound bandwidth do we have available for this session?
382    */
383   struct GNUNET_BANDWIDTH_Tracker tracker_out;
384
385   /**
386    * How much inbound bandwidth do we have available for this session?
387    */
388   struct GNUNET_BANDWIDTH_Tracker tracker_in;
389 };
390
391
392 /**
393  * A neighbour that at least one communicator is connected to.
394  */
395 struct Neighbour
396 {
397
398   /**
399    * Which peer is this about?
400    */
401   struct GNUNET_PeerIdentity pid;
402
403   /**
404    * Head of list of messages pending for this neighbour.
405    */
406   struct PendingMessage *pending_msg_head;
407
408   /**
409    * Tail of list of messages pending for this neighbour.
410    */
411   struct PendingMessage *pending_msg_tail;
412
413   /**
414    * Head of DLL of ATS sessions to this peer.
415    */
416   struct GNUNET_ATS_Session *session_head;
417
418   /**
419    * Tail of DLL of ATS sessions to this peer.
420    */
421   struct GNUNET_ATS_Session *session_tail;
422
423   /**
424    * Quota at which CORE is allowed to transmit to this peer
425    * according to ATS.
426    *
427    * FIXME: not yet used, tricky to get right given multiple queues!
428    *        (=> Idea: let ATS set a quota per queue and we add them up here?)
429    * FIXME: how do we set this value initially when we tell CORE?
430    *    Options: start at a minimum value or at literally zero (before ATS?)
431    *         (=> Current thought: clean would be zero!)
432    */
433   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
434
435 };
436
437
438 /**
439  * Transmission request from CORE that is awaiting delivery.
440  */
441 struct PendingMessage
442 {
443   /**
444    * Kept in a MDLL of messages for this @a target.
445    */
446   struct PendingMessage *next_neighbour;
447
448   /**
449    * Kept in a MDLL of messages for this @a target.
450    */
451   struct PendingMessage *prev_neighbour;
452
453   /**
454    * Kept in a MDLL of messages from this @a client.
455    */
456   struct PendingMessage *next_client;
457
458   /**
459    * Kept in a MDLL of messages from this @a client.
460    */
461   struct PendingMessage *prev_client;
462
463   /**
464    * Target of the request.
465    */
466   struct Neighbour *target;
467
468   /**
469    * Client that issued the transmission request.
470    */
471   struct TransportClient *client;
472
473   /**
474    * Size of the original message.
475    */
476   uint32_t bytes_msg;
477
478 };
479
480
481 /**
482  * One of the addresses of this peer.
483  */
484 struct AddressListEntry
485 {
486
487   /**
488    * Kept in a DLL.
489    */
490   struct AddressListEntry *next;
491
492   /**
493    * Kept in a DLL.
494    */
495   struct AddressListEntry *prev;
496
497   /**
498    * Which communicator provides this address?
499    */
500   struct TransportClient *tc;
501
502   /**
503    * The actual address.
504    */
505   const char *address;
506
507   /**
508    * Current context for storing this address in the peerstore.
509    */
510   struct GNUNET_PEERSTORE_StoreContext *sc;
511
512   /**
513    * Task to periodically do @e st operation.
514    */
515   struct GNUNET_SCHEDULER_Task *st;
516
517   /**
518    * What is a typical lifetime the communicator expects this
519    * address to have? (Always from now.)
520    */
521   struct GNUNET_TIME_Relative expiration;
522
523   /**
524    * Address identifier used by the communicator.
525    */
526   uint32_t aid;
527
528   /**
529    * Network type offered by this address.
530    */
531   enum GNUNET_NetworkType nt;
532
533 };
534
535
536 /**
537  * Client connected to the transport service.
538  */
539 struct TransportClient
540 {
541
542   /**
543    * Kept in a DLL.
544    */
545   struct TransportClient *next;
546
547   /**
548    * Kept in a DLL.
549    */
550   struct TransportClient *prev;
551
552   /**
553    * Handle to the client.
554    */
555   struct GNUNET_SERVICE_Client *client;
556
557   /**
558    * Message queue to the client.
559    */
560   struct GNUNET_MQ_Handle *mq;
561
562   /**
563    * What type of client is this?
564    */
565   enum ClientType type;
566
567   union
568   {
569
570     /**
571      * Information for @e type #CT_CORE.
572      */
573     struct {
574
575       /**
576        * Head of list of messages pending for this client.
577        */
578       struct PendingMessage *pending_msg_head;
579
580       /**
581        * Tail of list of messages pending for this client.
582        */
583       struct PendingMessage *pending_msg_tail;
584
585     } core;
586
587     /**
588      * Information for @e type #CT_MONITOR.
589      */
590     struct {
591
592       /**
593        * Peer identity to monitor the addresses of.
594        * Zero to monitor all neighbours.  Valid if
595        * @e type is #CT_MONITOR.
596        */
597       struct GNUNET_PeerIdentity peer;
598
599       /**
600        * Is this a one-shot monitor?
601        */
602       int one_shot;
603
604     } monitor;
605
606
607     /**
608      * Information for @e type #CT_COMMUNICATOR.
609      */
610     struct {
611       /**
612        * If @e type is #CT_COMMUNICATOR, this communicator
613        * supports communicating using these addresses.
614        */
615       char *address_prefix;
616
617       /**
618        * Head of DLL of queues offered by this communicator.
619        */
620       struct GNUNET_ATS_Session *session_head;
621
622       /**
623        * Tail of DLL of queues offered by this communicator.
624        */
625       struct GNUNET_ATS_Session *session_tail;
626
627       /**
628        * Head of list of the addresses of this peer offered by this communicator.
629        */
630       struct AddressListEntry *addr_head;
631
632       /**
633        * Tail of list of the addresses of this peer offered by this communicator.
634        */
635       struct AddressListEntry *addr_tail;
636
637       /**
638        * Characteristics of this communicator.
639        */
640       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
641
642     } communicator;
643
644   } details;
645
646 };
647
648
649 /**
650  * Head of linked list of all clients to this service.
651  */
652 static struct TransportClient *clients_head;
653
654 /**
655  * Tail of linked list of all clients to this service.
656  */
657 static struct TransportClient *clients_tail;
658
659 /**
660  * Statistics handle.
661  */
662 static struct GNUNET_STATISTICS_Handle *GST_stats;
663
664 /**
665  * Configuration handle.
666  */
667 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
668
669 /**
670  * Our public key.
671  */
672 static struct GNUNET_PeerIdentity GST_my_identity;
673
674 /**
675  * Our private key.
676  */
677 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
678
679 /**
680  * Map from PIDs to `struct Neighbour` entries.  A peer is
681  * a neighbour if we have an MQ to it from some communicator.
682  */
683 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
684
685 /**
686  * Database for peer's HELLOs.
687  */
688 static struct GNUNET_PEERSTORE_Handle *peerstore;
689
690 /**
691  * Heap sorting `struct EphemeralCacheEntry` by their
692  * key/signature validity.
693  */
694 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
695
696 /**
697  * Hash map for looking up `struct EphemeralCacheEntry`s
698  * by peer identity. (We may have ephemerals in our
699  * cache for which we do not have a neighbour entry,
700  * and similar many neighbours may not need ephemerals,
701  * so we use a second map.)
702  */
703 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
704
705 /**
706  * Our connection to ATS for allocation and bootstrapping.
707  */
708 static struct GNUNET_ATS_TransportHandle *ats;
709
710
711 /**
712  * Free cached ephemeral key.
713  *
714  * @param ece cached signature to free
715  */
716 static void
717 free_ephemeral (struct EphemeralCacheEntry *ece)
718 {
719   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
720                                         &ece->target,
721                                         ece);
722   GNUNET_CONTAINER_heap_remove_node (ece->hn);
723   GNUNET_free (ece);
724 }
725
726
727 /**
728  * Lookup neighbour record for peer @a pid.
729  *
730  * @param pid neighbour to look for
731  * @return NULL if we do not have this peer as a neighbour
732  */
733 static struct Neighbour *
734 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
735 {
736   return GNUNET_CONTAINER_multipeermap_get (neighbours,
737                                             pid);
738 }
739
740
741 /**
742  * Details about what to notify monitors about.
743  */
744 struct MonitorEvent
745 {
746   /**
747    * @deprecated To be discussed if we keep these...
748    */
749   struct GNUNET_TIME_Absolute last_validation;
750   struct GNUNET_TIME_Absolute valid_until;
751   struct GNUNET_TIME_Absolute next_validation;
752
753   /**
754    * Current round-trip time estimate.
755    */
756   struct GNUNET_TIME_Relative rtt;
757
758   /**
759    * Connection status.
760    */
761   enum GNUNET_TRANSPORT_ConnectionStatus cs;
762
763   /**
764    * Messages pending.
765    */
766   uint32_t num_msg_pending;
767
768   /**
769    * Bytes pending.
770    */
771   uint32_t num_bytes_pending;
772
773
774 };
775
776
777 /**
778  * Notify monitor @a tc about an event.  That @a tc
779  * cares about the event has already been checked.
780  *
781  * Send @a tc information in @a me about a @a peer's status with
782  * respect to some @a address to all monitors that care.
783  *
784  * @param tc monitor to inform
785  * @param peer peer the information is about
786  * @param address address the information is about
787  * @param nt network type associated with @a address
788  * @param me detailed information to transmit
789  */
790 static void
791 notify_monitor (struct TransportClient *tc,
792                 const struct GNUNET_PeerIdentity *peer,
793                 const char *address,
794                 enum GNUNET_NetworkType nt,
795                 const struct MonitorEvent *me)
796 {
797   struct GNUNET_MQ_Envelope *env;
798   struct GNUNET_TRANSPORT_MonitorData *md;
799   size_t addr_len = strlen (address) + 1;
800
801   env = GNUNET_MQ_msg_extra (md,
802                              addr_len,
803                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
804   md->nt = htonl ((uint32_t) nt);
805   md->peer = *peer;
806   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
807   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
808   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
809   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
810   md->cs = htonl ((uint32_t) me->cs);
811   md->num_msg_pending = htonl (me->num_msg_pending);
812   md->num_bytes_pending = htonl (me->num_bytes_pending);
813   memcpy (&md[1],
814           address,
815           addr_len);
816   GNUNET_MQ_send (tc->mq,
817                   env);
818 }
819
820
821 /**
822  * Send information in @a me about a @a peer's status with respect
823  * to some @a address to all monitors that care.
824  *
825  * @param peer peer the information is about
826  * @param address address the information is about
827  * @param nt network type associated with @a address
828  * @param me detailed information to transmit
829  */
830 static void
831 notify_monitors (const struct GNUNET_PeerIdentity *peer,
832                  const char *address,
833                  enum GNUNET_NetworkType nt,
834                  const struct MonitorEvent *me)
835 {
836   static struct GNUNET_PeerIdentity zero;
837
838   for (struct TransportClient *tc = clients_head;
839        NULL != tc;
840        tc = tc->next)
841   {
842     if (CT_MONITOR != tc->type)
843       continue;
844     if (tc->details.monitor.one_shot)
845       continue;
846     if ( (0 != memcmp (&tc->details.monitor.peer,
847                        &zero,
848                        sizeof (zero))) &&
849          (0 != memcmp (&tc->details.monitor.peer,
850                        peer,
851                        sizeof (*peer))) )
852       continue;
853     notify_monitor (tc,
854                     peer,
855                     address,
856                     nt,
857                     me);
858   }
859 }
860
861
862 /**
863  * Called whenever a client connects.  Allocates our
864  * data structures associated with that client.
865  *
866  * @param cls closure, NULL
867  * @param client identification of the client
868  * @param mq message queue for the client
869  * @return our `struct TransportClient`
870  */
871 static void *
872 client_connect_cb (void *cls,
873                    struct GNUNET_SERVICE_Client *client,
874                    struct GNUNET_MQ_Handle *mq)
875 {
876   struct TransportClient *tc;
877
878   tc = GNUNET_new (struct TransportClient);
879   tc->client = client;
880   tc->mq = mq;
881   GNUNET_CONTAINER_DLL_insert (clients_head,
882                                clients_tail,
883                                tc);
884   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885               "Client %p connected\n",
886               tc);
887   return tc;
888 }
889
890
891 /**
892  * Release memory used by @a neighbour.
893  *
894  * @param neighbour neighbour entry to free
895  */
896 static void
897 free_neighbour (struct Neighbour *neighbour)
898 {
899   GNUNET_assert (NULL == neighbour->session_head);
900   GNUNET_assert (GNUNET_YES ==
901                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
902                                                        &neighbour->pid,
903                                                        neighbour));
904   GNUNET_free (neighbour);
905 }
906
907
908 /**
909  * Send message to CORE clients that we lost a connection.
910  *
911  * @param tc client to inform (must be CORE client)
912  * @param pid peer the connection is for
913  * @param quota_out current quota for the peer
914  */
915 static void
916 core_send_connect_info (struct TransportClient *tc,
917                         const struct GNUNET_PeerIdentity *pid,
918                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
919 {
920   struct GNUNET_MQ_Envelope *env;
921   struct ConnectInfoMessage *cim;
922
923   GNUNET_assert (CT_CORE == tc->type);
924   env = GNUNET_MQ_msg (cim,
925                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
926   cim->quota_out = quota_out;
927   cim->id = *pid;
928   GNUNET_MQ_send (tc->mq,
929                   env);
930 }
931
932
933 /**
934  * Send message to CORE clients that we gained a connection
935  *
936  * @param pid peer the queue was for
937  * @param quota_out current quota for the peer
938  */
939 static void
940 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
941                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
942 {
943   for (struct TransportClient *tc = clients_head;
944        NULL != tc;
945        tc = tc->next)
946   {
947     if (CT_CORE != tc->type)
948       continue;
949     core_send_connect_info (tc,
950                             pid,
951                             quota_out);
952   }
953 }
954
955
956 /**
957  * Send message to CORE clients that we lost a connection.
958  *
959  * @param pid peer the connection was for
960  */
961 static void
962 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
963 {
964   for (struct TransportClient *tc = clients_head;
965        NULL != tc;
966        tc = tc->next)
967   {
968     struct GNUNET_MQ_Envelope *env;
969     struct DisconnectInfoMessage *dim;
970
971     if (CT_CORE != tc->type)
972       continue;
973     env = GNUNET_MQ_msg (dim,
974                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
975     dim->peer = *pid;
976     GNUNET_MQ_send (tc->mq,
977                     env);
978   }
979 }
980
981
982 /**
983  * Free @a queue.
984  *
985  * @param queue the queue to free
986  */
987 static void
988 free_queue (struct GNUNET_ATS_Session *queue)
989 {
990   struct Neighbour *neighbour = queue->neighbour;
991   struct TransportClient *tc = queue->tc;
992   struct MonitorEvent me = {
993     .cs = GNUNET_TRANSPORT_CS_DOWN,
994     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
995   };
996
997   GNUNET_CONTAINER_MDLL_remove (neighbour,
998                                 neighbour->session_head,
999                                 neighbour->session_tail,
1000                                 queue);
1001   GNUNET_CONTAINER_MDLL_remove (client,
1002                                 tc->details.communicator.session_head,
1003                                 tc->details.communicator.session_tail,
1004                                 queue);  
1005   notify_monitors (&neighbour->pid,
1006                    queue->address,
1007                    queue->nt,
1008                    &me);
1009   GNUNET_ATS_session_del (queue->sr);
1010   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
1011   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
1012   GNUNET_free (queue);
1013   if (NULL == neighbour->session_head)
1014   {
1015     cores_send_disconnect_info (&neighbour->pid);
1016     free_neighbour (neighbour);
1017   }
1018 }
1019
1020
1021 /**
1022  * Free @a ale
1023  *
1024  * @param ale address list entry to free
1025  */
1026 static void
1027 free_address_list_entry (struct AddressListEntry *ale)
1028 {
1029   struct TransportClient *tc = ale->tc;
1030
1031   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
1032                                tc->details.communicator.addr_tail,
1033                                ale);
1034   if (NULL != ale->sc)
1035   {
1036     GNUNET_PEERSTORE_store_cancel (ale->sc);
1037     ale->sc = NULL;
1038   }
1039   if (NULL != ale->st)
1040   {
1041     GNUNET_SCHEDULER_cancel (ale->st);
1042     ale->st = NULL;
1043   }
1044   GNUNET_free (ale);
1045 }
1046
1047
1048 /**
1049  * Called whenever a client is disconnected.  Frees our
1050  * resources associated with that client.
1051  *
1052  * @param cls closure, NULL
1053  * @param client identification of the client
1054  * @param app_ctx our `struct TransportClient`
1055  */
1056 static void
1057 client_disconnect_cb (void *cls,
1058                       struct GNUNET_SERVICE_Client *client,
1059                       void *app_ctx)
1060 {
1061   struct TransportClient *tc = app_ctx;
1062
1063   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064               "Client %p disconnected, cleaning up.\n",
1065               tc);
1066   GNUNET_CONTAINER_DLL_remove (clients_head,
1067                                clients_tail,
1068                                tc);
1069   switch (tc->type)
1070   {
1071   case CT_NONE:
1072     break;
1073   case CT_CORE:
1074     {
1075       struct PendingMessage *pm;
1076
1077       while (NULL != (pm = tc->details.core.pending_msg_head))
1078       {
1079         GNUNET_CONTAINER_MDLL_remove (client,
1080                                       tc->details.core.pending_msg_head,
1081                                       tc->details.core.pending_msg_tail,
1082                                       pm);
1083         pm->client = NULL;
1084       }
1085     }
1086     break;
1087   case CT_MONITOR:
1088     break;
1089   case CT_COMMUNICATOR:
1090     {
1091       struct GNUNET_ATS_Session *q;
1092       struct AddressListEntry *ale;
1093
1094       while (NULL != (q = tc->details.communicator.session_head))
1095         free_queue (q);
1096       while (NULL != (ale = tc->details.communicator.addr_head))
1097         free_address_list_entry (ale);
1098       GNUNET_free (tc->details.communicator.address_prefix);
1099     }
1100     break;
1101   }
1102   GNUNET_free (tc);
1103 }
1104
1105
1106 /**
1107  * Iterator telling new CORE client about all existing
1108  * connections to peers.
1109  *
1110  * @param cls the new `struct TransportClient`
1111  * @param pid a connected peer
1112  * @param value the `struct Neighbour` with more information
1113  * @return #GNUNET_OK (continue to iterate)
1114  */
1115 static int
1116 notify_client_connect_info (void *cls,
1117                             const struct GNUNET_PeerIdentity *pid,
1118                             void *value)
1119 {
1120   struct TransportClient *tc = cls;
1121   struct Neighbour *neighbour = value;
1122
1123   core_send_connect_info (tc,
1124                           pid,
1125                           neighbour->quota_out);
1126   return GNUNET_OK;
1127 }
1128
1129
1130 /**
1131  * Initialize a "CORE" client.  We got a start message from this
1132  * client, so add it to the list of clients for broadcasting of
1133  * inbound messages.
1134  *
1135  * @param cls the client
1136  * @param start the start message that was sent
1137  */
1138 static void
1139 handle_client_start (void *cls,
1140                      const struct StartMessage *start)
1141 {
1142   struct TransportClient *tc = cls;
1143   uint32_t options;
1144
1145   options = ntohl (start->options);
1146   if ( (0 != (1 & options)) &&
1147        (0 !=
1148         memcmp (&start->self,
1149                 &GST_my_identity,
1150                 sizeof (struct GNUNET_PeerIdentity)) ) )
1151   {
1152     /* client thinks this is a different peer, reject */
1153     GNUNET_break (0);
1154     GNUNET_SERVICE_client_drop (tc->client);
1155     return;
1156   }
1157   if (CT_NONE != tc->type)
1158   {
1159     GNUNET_break (0);
1160     GNUNET_SERVICE_client_drop (tc->client);
1161     return;
1162   }
1163   tc->type = CT_CORE;
1164   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1165                                          &notify_client_connect_info,
1166                                          tc);
1167   GNUNET_SERVICE_client_continue (tc->client);
1168 }
1169
1170
1171 /**
1172  * Client asked for transmission to a peer.  Process the request.
1173  *
1174  * @param cls the client
1175  * @param obm the send message that was sent
1176  */
1177 static int
1178 check_client_send (void *cls,
1179                    const struct OutboundMessage *obm)
1180 {
1181   struct TransportClient *tc = cls;
1182   uint16_t size;
1183   const struct GNUNET_MessageHeader *obmm;
1184
1185   if (CT_CORE != tc->type)
1186   {
1187     GNUNET_break (0);
1188     return GNUNET_SYSERR;
1189   }
1190   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
1191   if (size < sizeof (struct GNUNET_MessageHeader))
1192   {
1193     GNUNET_break (0);
1194     return GNUNET_SYSERR;
1195   }
1196   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1197   if (size != ntohs (obmm->size))
1198   {
1199     GNUNET_break (0);
1200     return GNUNET_SYSERR;
1201   }
1202   return GNUNET_OK;
1203 }
1204
1205
1206 /**
1207  * Send a response to the @a pm that we have processed a
1208  * "send" request with status @a success. We
1209  * transmitted @a bytes_physical on the actual wire.
1210  * Sends a confirmation to the "core" client responsible
1211  * for the original request and free's @a pm.
1212  *
1213  * @param pm handle to the original pending message
1214  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
1215  *          for transmission failure
1216  * @param bytes_physical amount of bandwidth consumed
1217  */
1218 static void
1219 client_send_response (struct PendingMessage *pm,
1220                       int success,
1221                       uint32_t bytes_physical)
1222 {
1223   struct TransportClient *tc = pm->client;
1224   struct Neighbour *target = pm->target;
1225   struct GNUNET_MQ_Envelope *env;
1226   struct SendOkMessage *som;
1227
1228   if (NULL != tc)
1229   {
1230     env = GNUNET_MQ_msg (som,
1231                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1232     som->success = htonl ((uint32_t) success);
1233     som->bytes_msg = htonl (pm->bytes_msg);
1234     som->bytes_physical = htonl (bytes_physical);
1235     som->peer = target->pid;
1236     GNUNET_MQ_send (tc->mq,
1237                     env);
1238     GNUNET_CONTAINER_MDLL_remove (client,
1239                                   tc->details.core.pending_msg_head,
1240                                   tc->details.core.pending_msg_tail,
1241                                   pm);
1242   }
1243   GNUNET_CONTAINER_MDLL_remove (neighbour,
1244                                 target->pending_msg_head,
1245                                 target->pending_msg_tail,
1246                                 pm);
1247   GNUNET_free (pm);
1248 }
1249
1250
1251 /**
1252  * Client asked for transmission to a peer.  Process the request.
1253  *
1254  * @param cls the client
1255  * @param obm the send message that was sent
1256  */
1257 static void
1258 handle_client_send (void *cls,
1259                     const struct OutboundMessage *obm)
1260 {
1261   struct TransportClient *tc = cls;
1262   struct PendingMessage *pm;
1263   const struct GNUNET_MessageHeader *obmm;
1264   struct Neighbour *target;
1265   uint32_t bytes_msg;
1266
1267   GNUNET_assert (CT_CORE == tc->type);
1268   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1269   bytes_msg = ntohs (obmm->size);
1270   target = lookup_neighbour (&obm->peer);
1271   if (NULL == target)
1272   {
1273     /* Failure: don't have this peer as a neighbour (anymore).
1274        Might have gone down asynchronously, so this is NOT
1275        a protocol violation by CORE. Still count the event,
1276        as this should be rare. */
1277     struct GNUNET_MQ_Envelope *env;
1278     struct SendOkMessage *som;
1279
1280     env = GNUNET_MQ_msg (som,
1281                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1282     som->success = htonl (GNUNET_SYSERR);
1283     som->bytes_msg = htonl (bytes_msg);
1284     som->bytes_physical = htonl (0);
1285     som->peer = obm->peer;
1286     GNUNET_MQ_send (tc->mq,
1287                     env);
1288     GNUNET_SERVICE_client_continue (tc->client);
1289     GNUNET_STATISTICS_update (GST_stats,
1290                               "# messages dropped (neighbour unknown)",
1291                               1,
1292                               GNUNET_NO);
1293     return;
1294   }
1295   pm = GNUNET_new (struct PendingMessage);
1296   pm->client = tc;
1297   pm->target = target;
1298   pm->bytes_msg = bytes_msg;
1299   GNUNET_CONTAINER_MDLL_insert (neighbour,
1300                                 target->pending_msg_head,
1301                                 target->pending_msg_tail,
1302                                 pm);
1303   GNUNET_CONTAINER_MDLL_insert (client,
1304                                 tc->details.core.pending_msg_head,
1305                                 tc->details.core.pending_msg_tail,
1306                                 pm);
1307   // FIXME: do the work, final continuation with call to:
1308   client_send_response (pm,
1309                         GNUNET_NO,
1310                         0);
1311 }
1312
1313
1314 /**
1315  * Communicator started.  Test message is well-formed.
1316  *
1317  * @param cls the client
1318  * @param cam the send message that was sent
1319  */
1320 static int
1321 check_communicator_available (void *cls,
1322                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
1323 {
1324   struct TransportClient *tc = cls;
1325   uint16_t size;
1326
1327   if (CT_NONE != tc->type)
1328   {
1329     GNUNET_break (0);
1330     return GNUNET_SYSERR;
1331   }
1332   tc->type = CT_COMMUNICATOR;
1333   size = ntohs (cam->header.size) - sizeof (*cam);
1334   if (0 == size)
1335     return GNUNET_OK; /* receive-only communicator */
1336   GNUNET_MQ_check_zero_termination (cam);
1337   return GNUNET_OK;
1338 }
1339
1340
1341 /**
1342  * Communicator started.  Process the request.
1343  *
1344  * @param cls the client
1345  * @param cam the send message that was sent
1346  */
1347 static void
1348 handle_communicator_available (void *cls,
1349                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
1350 {
1351   struct TransportClient *tc = cls;
1352   uint16_t size;
1353
1354   size = ntohs (cam->header.size) - sizeof (*cam);
1355   if (0 == size)
1356     return; /* receive-only communicator */
1357   tc->details.communicator.address_prefix
1358     = GNUNET_strdup ((const char *) &cam[1]);
1359   tc->details.communicator.cc
1360     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
1361   GNUNET_SERVICE_client_continue (tc->client);
1362 }
1363
1364
1365 /**
1366  * Address of our peer added.  Test message is well-formed.
1367  *
1368  * @param cls the client
1369  * @param aam the send message that was sent
1370  */
1371 static int
1372 check_add_address (void *cls,
1373                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
1374 {
1375   struct TransportClient *tc = cls;
1376
1377   if (CT_COMMUNICATOR != tc->type)
1378   {
1379     GNUNET_break (0);
1380     return GNUNET_SYSERR;
1381   }
1382   GNUNET_MQ_check_zero_termination (aam);
1383   return GNUNET_OK;
1384 }
1385
1386
1387 /**
1388  * Ask peerstore to store our address.
1389  *
1390  * @param cls an `struct AddressListEntry *`
1391  */
1392 static void
1393 store_pi (void *cls);
1394
1395
1396 /**
1397  * Function called when peerstore is done storing our address.
1398  */
1399 static void
1400 peerstore_store_cb (void *cls,
1401                     int success)
1402 {
1403   struct AddressListEntry *ale = cls;
1404
1405   ale->sc = NULL;
1406   if (GNUNET_YES != success)
1407     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1408                 "Failed to store our own address `%s' in peerstore!\n",
1409                 ale->address);
1410   /* refresh period is 1/4 of expiration time, that should be plenty
1411      without being excessive. */
1412   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
1413                                                                        4ULL),
1414                                           &store_pi,
1415                                           ale);
1416 }
1417
1418
1419 /**
1420  * Ask peerstore to store our address.
1421  *
1422  * @param cls an `struct AddressListEntry *`
1423  */
1424 static void
1425 store_pi (void *cls)
1426 {
1427   struct AddressListEntry *ale = cls;
1428   void *addr;
1429   size_t addr_len;
1430   struct GNUNET_TIME_Absolute expiration;
1431
1432   ale->st = NULL;
1433   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
1434   GNUNET_HELLO_sign_address (ale->address,
1435                              ale->nt,
1436                              expiration,
1437                              GST_my_private_key,
1438                              &addr,
1439                              &addr_len);
1440   ale->sc = GNUNET_PEERSTORE_store (peerstore,
1441                                     "transport",
1442                                     &GST_my_identity,
1443                                     GNUNET_HELLO_PEERSTORE_KEY,
1444                                     addr,
1445                                     addr_len,
1446                                     expiration,
1447                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
1448                                     &peerstore_store_cb,
1449                                     ale);
1450   GNUNET_free (addr);
1451   if (NULL == ale->sc)
1452   {
1453     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1454                 "Failed to store our address `%s' with peerstore\n",
1455                 ale->address);
1456     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1457                                             &store_pi,
1458                                             ale);
1459   }
1460 }
1461
1462
1463 /**
1464  * Address of our peer added.  Process the request.
1465  *
1466  * @param cls the client
1467  * @param aam the send message that was sent
1468  */
1469 static void
1470 handle_add_address (void *cls,
1471                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
1472 {
1473   struct TransportClient *tc = cls;
1474   struct AddressListEntry *ale;
1475   size_t slen;
1476
1477   slen = ntohs (aam->header.size) - sizeof (*aam);
1478   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
1479   ale->tc = tc;
1480   ale->address = (const char *) &ale[1];
1481   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
1482   ale->aid = aam->aid;
1483   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
1484   memcpy (&ale[1],
1485           &aam[1],
1486           slen);
1487   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
1488                                tc->details.communicator.addr_tail,
1489                                ale);
1490   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
1491                                       ale);
1492   GNUNET_SERVICE_client_continue (tc->client);
1493 }
1494
1495
1496 /**
1497  * Address of our peer deleted.  Process the request.
1498  *
1499  * @param cls the client
1500  * @param dam the send message that was sent
1501  */
1502 static void
1503 handle_del_address (void *cls,
1504                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
1505 {
1506   struct TransportClient *tc = cls;
1507
1508   if (CT_COMMUNICATOR != tc->type)
1509   {
1510     GNUNET_break (0);
1511     GNUNET_SERVICE_client_drop (tc->client);
1512     return;
1513   }
1514   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
1515        NULL != ale;
1516        ale = ale->next)
1517   {
1518     if (dam->aid != ale->aid)
1519       continue;
1520     GNUNET_assert (ale->tc == tc);
1521     free_address_list_entry (ale);
1522     GNUNET_SERVICE_client_continue (tc->client);
1523   }
1524   GNUNET_break (0);
1525   GNUNET_SERVICE_client_drop (tc->client);
1526 }
1527
1528
1529 /**
1530  * Client notified us about transmission from a peer.  Process the request.
1531  *
1532  * @param cls the client
1533  * @param obm the send message that was sent
1534  */
1535 static int
1536 check_incoming_msg (void *cls,
1537                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
1538 {
1539   struct TransportClient *tc = cls;
1540   uint16_t size;
1541   const struct GNUNET_MessageHeader *obmm;
1542
1543   if (CT_COMMUNICATOR != tc->type)
1544   {
1545     GNUNET_break (0);
1546     return GNUNET_SYSERR;
1547   }
1548   size = ntohs (im->header.size) - sizeof (*im);
1549   if (size < sizeof (struct GNUNET_MessageHeader))
1550   {
1551     GNUNET_break (0);
1552     return GNUNET_SYSERR;
1553   }
1554   obmm = (const struct GNUNET_MessageHeader *) &im[1];
1555   if (size != ntohs (obmm->size))
1556   {
1557     GNUNET_break (0);
1558     return GNUNET_SYSERR;
1559   }
1560   return GNUNET_OK;
1561 }
1562
1563
1564 /**
1565  * Incoming meessage.  Process the request.
1566  *
1567  * @param cls the client
1568  * @param im the send message that was received
1569  */
1570 static void
1571 handle_incoming_msg (void *cls,
1572                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
1573 {
1574   struct TransportClient *tc = cls;
1575
1576   GNUNET_SERVICE_client_continue (tc->client);
1577 }
1578
1579
1580 /**
1581  * New queue became available.  Check message.
1582  *
1583  * @param cls the client
1584  * @param aqm the send message that was sent
1585  */
1586 static int
1587 check_add_queue_message (void *cls,
1588                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
1589 {
1590   struct TransportClient *tc = cls;
1591
1592   if (CT_COMMUNICATOR != tc->type)
1593   {
1594     GNUNET_break (0);
1595     return GNUNET_SYSERR;
1596   }
1597   GNUNET_MQ_check_zero_termination (aqm);
1598   return GNUNET_OK;
1599 }
1600
1601
1602 /**
1603  * Bandwidth tracker informs us that the delay until we
1604  * can transmit again changed.
1605  *
1606  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
1607  */
1608 static void
1609 tracker_update_cb (void *cls)
1610 {
1611   struct GNUNET_ATS_Session *queue = cls;
1612
1613   // FIXME: re-schedule transmission tasks if applicable!
1614 }
1615
1616
1617 /**
1618  * Bandwidth tracker informs us that excessive bandwidth was allocated
1619  * which is not being used.
1620  *
1621  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
1622  */
1623 static void
1624 tracker_excess_cb (void *cls)
1625 {
1626   /* FIXME: what do we do? */
1627 }
1628
1629
1630 /**
1631  * New queue became available.  Process the request.
1632  *
1633  * @param cls the client
1634  * @param aqm the send message that was sent
1635  */
1636 static void
1637 handle_add_queue_message (void *cls,
1638                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
1639 {
1640   struct TransportClient *tc = cls;
1641   struct GNUNET_ATS_Session *queue;
1642   struct Neighbour *neighbour;
1643   const char *addr;
1644   uint16_t addr_len;
1645
1646   neighbour = lookup_neighbour (&aqm->receiver);
1647   if (NULL == neighbour)
1648   {
1649     neighbour = GNUNET_new (struct Neighbour);
1650     neighbour->pid = aqm->receiver;
1651     GNUNET_assert (GNUNET_OK ==
1652                    GNUNET_CONTAINER_multipeermap_put (neighbours,
1653                                                       &neighbour->pid,
1654                                                       neighbour,
1655                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1656     cores_send_connect_info (&neighbour->pid,
1657                              GNUNET_BANDWIDTH_ZERO);
1658   }
1659   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
1660   addr = (const char *) &aqm[1];
1661
1662   queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
1663   queue->tc = tc;
1664   queue->address = (const char *) &queue[1];
1665   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
1666   queue->qid = aqm->qid;
1667   queue->mtu = ntohl (aqm->mtu);
1668   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
1669   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
1670   queue->neighbour = neighbour;
1671   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
1672                                   &tracker_update_cb,
1673                                   queue,
1674                                   GNUNET_BANDWIDTH_ZERO,
1675                                   0 /* FIXME: max carry in seconds! */,
1676                                   &tracker_excess_cb,
1677                                   queue);
1678   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
1679                                   &tracker_update_cb,
1680                                   queue,
1681                                   GNUNET_BANDWIDTH_ZERO,
1682                                   0 /* FIXME: max carry in seconds! */,
1683                                   &tracker_excess_cb,
1684                                   queue);
1685   memcpy (&queue[1],
1686           addr,
1687           addr_len);
1688   /* notify ATS about new queue */
1689   {
1690     struct GNUNET_ATS_Properties prop = {
1691       .delay = GNUNET_TIME_UNIT_FOREVER_REL,
1692       .mtu = queue->mtu,
1693       .nt = queue->nt,
1694       .cc = tc->details.communicator.cc
1695     };
1696     
1697     queue->sr = GNUNET_ATS_session_add (ats,
1698                                         &neighbour->pid,
1699                                         queue->address,
1700                                         queue,
1701                                         &prop);
1702     if  (NULL == queue->sr)
1703     {
1704       /* This can only happen if the 'address' was way too long for ATS
1705          (approaching 64k in strlen()!). In this case, the communicator
1706          must be buggy and we drop it. */
1707       GNUNET_break (0);
1708       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
1709       GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
1710       GNUNET_free (queue);
1711       if (NULL == neighbour->session_head)
1712       {
1713         cores_send_disconnect_info (&neighbour->pid);
1714         free_neighbour (neighbour);
1715       }
1716       GNUNET_SERVICE_client_drop (tc->client);
1717       return;
1718     }
1719   }
1720   /* notify monitors about new queue */
1721   {
1722     struct MonitorEvent me = {
1723       .rtt = queue->rtt,
1724       .cs = queue->cs
1725     };
1726
1727     notify_monitors (&neighbour->pid,
1728                      queue->address,
1729                      queue->nt,
1730                      &me);
1731   }
1732   GNUNET_CONTAINER_MDLL_insert (neighbour,
1733                                 neighbour->session_head,
1734                                 neighbour->session_tail,
1735                                 queue);
1736   GNUNET_CONTAINER_MDLL_insert (client,
1737                                 tc->details.communicator.session_head,
1738                                 tc->details.communicator.session_tail,
1739                                 queue);
1740   GNUNET_SERVICE_client_continue (tc->client);
1741 }
1742
1743
1744 /**
1745  * Queue to a peer went down.  Process the request.
1746  *
1747  * @param cls the client
1748  * @param dqm the send message that was sent
1749  */
1750 static void
1751 handle_del_queue_message (void *cls,
1752                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
1753 {
1754   struct TransportClient *tc = cls;
1755
1756   if (CT_COMMUNICATOR != tc->type)
1757   {
1758     GNUNET_break (0);
1759     GNUNET_SERVICE_client_drop (tc->client);
1760     return;
1761   }
1762   for (struct GNUNET_ATS_Session *queue = tc->details.communicator.session_head;
1763        NULL != queue;
1764        queue = queue->next_client)
1765   {
1766     struct Neighbour *neighbour = queue->neighbour;
1767
1768     if ( (dqm->qid != queue->qid) ||
1769          (0 != memcmp (&dqm->receiver,
1770                        &neighbour->pid,
1771                        sizeof (struct GNUNET_PeerIdentity))) )
1772       continue;
1773     free_queue (queue);
1774     GNUNET_SERVICE_client_continue (tc->client);
1775     return;
1776   }
1777   GNUNET_break (0);
1778   GNUNET_SERVICE_client_drop (tc->client);
1779 }
1780
1781
1782 /**
1783  * Message was transmitted.  Process the request.
1784  *
1785  * @param cls the client
1786  * @param sma the send message that was sent
1787  */
1788 static void
1789 handle_send_message_ack (void *cls,
1790                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
1791 {
1792   struct TransportClient *tc = cls;
1793
1794   if (CT_COMMUNICATOR != tc->type)
1795   {
1796     GNUNET_break (0);
1797     GNUNET_SERVICE_client_drop (tc->client);
1798     return;
1799   }
1800   // FIXME: react to communicator status about transmission request. We got:
1801   sma->status; // OK success, SYSERR failure
1802   sma->mid; // message ID of original message
1803   sma->receiver; // receiver of original message
1804
1805   
1806   GNUNET_SERVICE_client_continue (tc->client);
1807 }
1808
1809
1810 /**
1811  * Iterator telling new MONITOR client about all existing
1812  * queues to peers.
1813  *
1814  * @param cls the new `struct TransportClient`
1815  * @param pid a connected peer
1816  * @param value the `struct Neighbour` with more information
1817  * @return #GNUNET_OK (continue to iterate)
1818  */
1819 static int
1820 notify_client_queues (void *cls,
1821                       const struct GNUNET_PeerIdentity *pid,
1822                       void *value)
1823 {
1824   struct TransportClient *tc = cls;
1825   struct Neighbour *neighbour = value;
1826
1827   GNUNET_assert (CT_MONITOR == tc->type);
1828   for (struct GNUNET_ATS_Session *q = neighbour->session_head;
1829        NULL != q;
1830        q = q->next_neighbour)
1831   {
1832     struct MonitorEvent me = {
1833       .rtt = q->rtt,
1834       .cs = q->cs,
1835       .num_msg_pending = q->num_msg_pending,
1836       .num_bytes_pending = q->num_bytes_pending
1837     };
1838
1839     notify_monitor (tc,
1840                     pid,
1841                     q->address,
1842                     q->nt,
1843                     &me);
1844   }
1845   return GNUNET_OK;
1846 }
1847
1848
1849 /**
1850  * Initialize a monitor client.
1851  *
1852  * @param cls the client
1853  * @param start the start message that was sent
1854  */
1855 static void
1856 handle_monitor_start (void *cls,
1857                       const struct GNUNET_TRANSPORT_MonitorStart *start)
1858 {
1859   struct TransportClient *tc = cls;
1860
1861   if (CT_NONE != tc->type)
1862   {
1863     GNUNET_break (0);
1864     GNUNET_SERVICE_client_drop (tc->client);
1865     return;
1866   }
1867   tc->type = CT_MONITOR;
1868   tc->details.monitor.peer = start->peer;
1869   tc->details.monitor.one_shot = ntohl (start->one_shot);
1870   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1871                                          &notify_client_queues,
1872                                          tc);
1873   GNUNET_SERVICE_client_mark_monitor (tc->client);
1874   GNUNET_SERVICE_client_continue (tc->client);
1875 }
1876
1877
1878 /**
1879  * Signature of a function called by ATS with the current bandwidth
1880  * allocation to be used as determined by ATS.
1881  *
1882  * @param cls closure, NULL
1883  * @param session session this is about
1884  * @param bandwidth_out assigned outbound bandwidth for the connection,
1885  *        0 to signal disconnect
1886  * @param bandwidth_in assigned inbound bandwidth for the connection,
1887  *        0 to signal disconnect
1888  */
1889 static void
1890 ats_allocation_cb (void *cls,
1891                    struct GNUNET_ATS_Session *session,
1892                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1893                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
1894 {
1895   (void) cls;
1896   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
1897                                          bandwidth_out);
1898   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
1899                                          bandwidth_in);
1900 }
1901
1902
1903 /**
1904  * Find transport client providing communication service
1905  * for the protocol @a prefix.
1906  *
1907  * @param prefix communicator name
1908  * @return NULL if no such transport client is available
1909  */
1910 static struct TransportClient *
1911 lookup_communicator (const char *prefix)
1912 {
1913   for (struct TransportClient *tc = clients_head;
1914        NULL != tc;
1915        tc = tc->next)
1916   {
1917     if (CT_COMMUNICATOR != tc->type)
1918       continue;
1919     if (0 == strcmp (prefix,
1920                      tc->details.communicator.address_prefix))
1921       return tc;
1922   }
1923   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1924               "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
1925               prefix);
1926   return NULL;
1927 }
1928
1929
1930 /**
1931  * Signature of a function called by ATS suggesting transport to
1932  * try connecting with a particular address.
1933  *
1934  * @param cls closure, NULL
1935  * @param pid target peer
1936  * @param address the address to try
1937  */
1938 static void
1939 ats_suggestion_cb (void *cls,
1940                    const struct GNUNET_PeerIdentity *pid,
1941                    const char *address)
1942 {
1943   struct TransportClient *tc;
1944   char *prefix;
1945
1946   (void) cls;
1947   prefix = GNUNET_HELLO_address_to_prefix (address);
1948   if (NULL == prefix)
1949   {
1950     GNUNET_break (0); /* ATS gave invalid address!? */
1951     return;
1952   }
1953   tc = lookup_communicator (prefix);
1954   if (NULL == tc)
1955   {
1956     GNUNET_STATISTICS_update (GST_stats,
1957                               "# ATS suggestions ignored due to missing communicator",
1958                               1,
1959                               GNUNET_NO);
1960     
1961     return;
1962   }
1963   // FIXME: forward suggestion to tc
1964 }
1965
1966
1967 /**
1968  * Free neighbour entry.
1969  *
1970  * @param cls NULL
1971  * @param pid unused
1972  * @param value a `struct Neighbour`
1973  * @return #GNUNET_OK (always)
1974  */
1975 static int
1976 free_neighbour_cb (void *cls,
1977                    const struct GNUNET_PeerIdentity *pid,
1978                    void *value)
1979 {
1980   struct Neighbour *neighbour = value;
1981
1982   (void) cls;
1983   (void) pid;
1984   GNUNET_break (0); // should this ever happen?
1985   free_neighbour (neighbour);
1986
1987   return GNUNET_OK;
1988 }
1989
1990
1991 /**
1992  * Free ephemeral entry.
1993  *
1994  * @param cls NULL
1995  * @param pid unused
1996  * @param value a `struct Neighbour`
1997  * @return #GNUNET_OK (always)
1998  */
1999 static int
2000 free_ephemeral_cb (void *cls,
2001                    const struct GNUNET_PeerIdentity *pid,
2002                    void *value)
2003 {
2004   struct EphemeralCacheEntry *ece = value;
2005
2006   (void) cls;
2007   (void) pid;
2008   free_ephemeral (ece);
2009   return GNUNET_OK;
2010 }
2011
2012
2013 /**
2014  * Function called when the service shuts down.  Unloads our plugins
2015  * and cancels pending validations.
2016  *
2017  * @param cls closure, unused
2018  */
2019 static void
2020 do_shutdown (void *cls)
2021 {
2022   (void) cls;
2023
2024   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
2025                                          &free_neighbour_cb,
2026                                          NULL);
2027   if (NULL != ats)
2028   {
2029     GNUNET_ATS_transport_done (ats);
2030     ats = NULL;
2031   }
2032   if (NULL != peerstore)
2033   {
2034     GNUNET_PEERSTORE_disconnect (peerstore,
2035                                  GNUNET_NO);
2036     peerstore = NULL;
2037   }
2038   if (NULL != GST_stats)
2039   {
2040     GNUNET_STATISTICS_destroy (GST_stats,
2041                                GNUNET_NO);
2042     GST_stats = NULL;
2043   }
2044   if (NULL != GST_my_private_key)
2045   {
2046     GNUNET_free (GST_my_private_key);
2047     GST_my_private_key = NULL;
2048   }
2049   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
2050   neighbours = NULL;
2051   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
2052                                          &free_ephemeral_cb,
2053                                          NULL);
2054   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
2055   ephemeral_map = NULL;
2056   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
2057   ephemeral_heap = NULL;
2058 }
2059
2060
2061 /**
2062  * Initiate transport service.
2063  *
2064  * @param cls closure
2065  * @param c configuration to use
2066  * @param service the initialized service
2067  */
2068 static void
2069 run (void *cls,
2070      const struct GNUNET_CONFIGURATION_Handle *c,
2071      struct GNUNET_SERVICE_Handle *service)
2072 {
2073   (void) cls;
2074   /* setup globals */
2075   GST_cfg = c;
2076   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
2077                                                      GNUNET_YES);
2078   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
2079                                                         GNUNET_YES);
2080   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2081   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
2082   if (NULL == GST_my_private_key)
2083   {
2084     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2085                 _("Transport service is lacking key configuration settings. Exiting.\n"));
2086     GNUNET_SCHEDULER_shutdown ();
2087     return;
2088   }
2089   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
2090                                       &GST_my_identity.public_key);
2091   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
2092              "My identity is `%s'\n",
2093              GNUNET_i2s_full (&GST_my_identity));
2094   GST_stats = GNUNET_STATISTICS_create ("transport",
2095                                         GST_cfg);
2096   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
2097                                  NULL);
2098   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
2099   if (NULL == peerstore)
2100   {
2101     GNUNET_break (0);
2102     GNUNET_SCHEDULER_shutdown ();
2103     return;
2104   }
2105   ats = GNUNET_ATS_transport_init (GST_cfg,
2106                                    &ats_allocation_cb,
2107                                    NULL,
2108                                    &ats_suggestion_cb,
2109                                    NULL);
2110   if (NULL == ats)
2111   {
2112     GNUNET_break (0);
2113     GNUNET_SCHEDULER_shutdown ();
2114     return;
2115   }
2116 }
2117
2118
2119 /**
2120  * Define "main" method using service macro.
2121  */
2122 GNUNET_SERVICE_MAIN
2123 ("transport",
2124  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
2125  &run,
2126  &client_connect_cb,
2127  &client_disconnect_cb,
2128  NULL,
2129  /* communication with core */
2130  GNUNET_MQ_hd_fixed_size (client_start,
2131                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
2132                           struct StartMessage,
2133                           NULL),
2134  GNUNET_MQ_hd_var_size (client_send,
2135                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
2136                         struct OutboundMessage,
2137                         NULL),
2138  /* communication with communicators */
2139  GNUNET_MQ_hd_var_size (communicator_available,
2140                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
2141                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
2142                         NULL),
2143  GNUNET_MQ_hd_var_size (add_address,
2144                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
2145                         struct GNUNET_TRANSPORT_AddAddressMessage,
2146                         NULL),
2147  GNUNET_MQ_hd_fixed_size (del_address,
2148                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
2149                           struct GNUNET_TRANSPORT_DelAddressMessage,
2150                           NULL),
2151  GNUNET_MQ_hd_var_size (incoming_msg,
2152                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
2153                         struct GNUNET_TRANSPORT_IncomingMessage,
2154                         NULL),
2155  GNUNET_MQ_hd_var_size (add_queue_message,
2156                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
2157                         struct GNUNET_TRANSPORT_AddQueueMessage,
2158                         NULL),
2159  GNUNET_MQ_hd_fixed_size (del_queue_message,
2160                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
2161                           struct GNUNET_TRANSPORT_DelQueueMessage,
2162                           NULL),
2163  GNUNET_MQ_hd_fixed_size (send_message_ack,
2164                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
2165                           struct GNUNET_TRANSPORT_SendMessageToAck,
2166                           NULL),
2167  /* communication with monitors */
2168  GNUNET_MQ_hd_fixed_size (monitor_start,
2169                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
2170                           struct GNUNET_TRANSPORT_MonitorStart,
2171                           NULL),
2172  GNUNET_MQ_handler_end ());
2173
2174
2175 /* end of file gnunet-service-transport.c */