more work on tng
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 /**
19  * @file transport/gnunet-service-tng.c
20  * @brief main for gnunet-service-tng
21  * @author Christian Grothoff
22  *
23  * TODO:
24  * - figure out how to transmit (selective) ACKs in case of uni-directional
25  *   communicators (with/without core? DV-only?) When do we use ACKs?
26  *   => communicators use selective ACKs for flow control
27  *   => transport uses message-level ACKs for RTT, fragment confirmation
28  *   => integrate DV into transport, use neither core nor communicators
29  *      but rather give communicators transport-encapsulated messages
30  *      (which could be core-data, background-channel traffic, or
31  *       transport-to-transport traffic)
32  *
33  * Implement:
34  * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
35  *
36  * Easy:
37  * - use ATS bandwidth allocation callback and schedule transmissions!
38  *
39  * Plan:
40  * - inform ATS about RTT, goodput/loss, overheads, etc.
41  *
42  * Later:
43  * - change transport-core API to provide proper flow control in both
44  *   directions, allow multiple messages per peer simultaneously (tag
45  *   confirmations with unique message ID), and replace quota-out with
46  *   proper flow control;
47  *
48  * Design realizations / discussion:
49  * - communicators do flow control by calling MQ "notify sent"
50  *   when 'ready'. They determine flow implicitly (i.e. TCP blocking)
51  *   or explicitly via background channel FC ACKs.  As long as the
52  *   channel is not full, they may 'notify sent' even if the other
53  *   peer has not yet confirmed receipt. The other peer confirming
54  *   is _only_ for FC, not for more reliable transmission; reliable
55  *   transmission (i.e. of fragments) is left to _transport_.
56  * - ACKs sent back in uni-directional communicators are done via
57  *   the background channel API; here transport _may_ initially
58  *   broadcast (with bounded # hops) if no path is known;
59  * - transport should _integrate_ DV-routing and build a view of
60  *   the network; then background channel traffic can be
61  *   routed via DV as well as explicit "DV" traffic.
62  * - background channel is also used for ACKs and NAT traversal support
63  * - transport service is responsible for AEAD'ing the background
64  *   channel, timestamps and monotonic time are used against replay
65  *   of old messages -> peerstore needs to be supplied with
66  *   "latest timestamps seen" data
67  * - if transport implements DV, we likely need a 3rd peermap
68  *   in addition to ephemerals and (direct) neighbours
69  *   => in this data structure, we should track ATS metrics (distance, RTT, etc.)
70  *   as well as latest timestamps seen, goodput, fragments for transmission, etc.
71  *   ==> check if stuff needs to be moved out of "Neighbour"
72  * - transport should encapsualte core-level messages and do its
73  *   own ACKing for RTT/goodput/loss measurements _and_ fragment
74  *   for retransmission
75  */
76 #include "platform.h"
77 #include "gnunet_util_lib.h"
78 #include "gnunet_statistics_service.h"
79 #include "gnunet_transport_monitor_service.h"
80 #include "gnunet_peerstore_service.h"
81 #include "gnunet_hello_lib.h"
82 #include "gnunet_ats_transport_service.h"
83 #include "transport.h"
84
85
86 /**
87  * How many messages can we have pending for a given client process
88  * before we start to drop incoming messages?  We typically should
89  * have only one client and so this would be the primary buffer for
90  * messages, so the number should be chosen rather generously.
91  *
92  * The expectation here is that most of the time the queue is large
93  * enough so that a drop is virtually never required.  Note that
94  * this value must be about as large as 'TOTAL_MSGS' in the
95  * 'test_transport_api_reliability.c', otherwise that testcase may
96  * fail.
97  */
98 #define MAX_PENDING (128 * 1024)
99
100
101 GNUNET_NETWORK_STRUCT_BEGIN
102
103 /**
104  * Outer layer of an encapsulated backchannel message.
105  */
106 struct TransportBackchannelEncapsulationMessage
107 {
108   /**
109    * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
110    */
111   struct GNUNET_MessageHeader header;
112
113   /**
114    * Distance the backchannel message has traveled, to be updated at
115    * each hop.  Used to bound the number of hops in case a backchannel
116    * message is broadcast and thus travels without routing
117    * information (during initial backchannel discovery).
118    */
119   uint32_t distance;
120
121   /**
122    * Target's peer identity (as backchannels may be transmitted
123    * indirectly, or even be broadcast).
124    */
125   struct GNUNET_PeerIdentity target;
126
127   /**
128    * Ephemeral key setup by the sender for @e target, used
129    * to encrypt the payload.
130    */
131   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
132
133   /**
134    * HMAC over the ciphertext of the encrypted, variable-size
135    * body that follows.  Verified via DH of @e target and
136    * @e ephemeral_key
137    */
138   struct GNUNET_HashCode hmac;
139
140   /* Followed by encrypted, variable-size payload */
141 };
142
143
144 /**
145  * Message by which a peer confirms that it is using an
146  * ephemeral key.
147  */
148 struct EphemeralConfirmation
149 {
150
151   /**
152    * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
153    */
154   struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
155
156   /**
157    * How long is this signature over the ephemeral key
158    * valid?
159    */
160   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
161
162   /**
163    * Ephemeral key setup by the sender for @e target, used
164    * to encrypt the payload.
165    */
166   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
167
168 };
169
170 /**
171  * Plaintext of the variable-size payload that is encrypted
172  * within a `struct TransportBackchannelEncapsulationMessage`
173  */
174 struct TransportBackchannelRequestPayload
175 {
176
177   /**
178    * Sender's peer identity.
179    */
180   struct GNUNET_PeerIdentity sender;
181
182   /**
183    * Signature of the sender over an
184    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
185    */
186   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
187
188   /**
189    * How long is this signature over the ephemeral key
190    * valid?
191    */
192   struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
193
194   /**
195    * Current monotonic time of the sending transport service.  Used to
196    * detect replayed messages.  Note that the receiver should remember
197    * a list of the recently seen timestamps and only reject messages
198    * if the timestamp is in the list, or the list is "full" and the
199    * timestamp is smaller than the lowest in the list.  This list of
200    * timestamps per peer should be persisted to guard against replays
201    * after restarts.
202    */
203   struct GNUNET_TIME_AbsoluteNBO monotonic_time;
204
205   /* Followed by a `struct GNUNET_MessageHeader` with a message
206      for a communicator */
207
208   /* Followed by a 0-termianted string specifying the name of
209      the communicator which is to receive the message */
210
211 };
212
213 GNUNET_NETWORK_STRUCT_END
214
215
216
217 /**
218  * What type of client is the `struct TransportClient` about?
219  */
220 enum ClientType
221 {
222   /**
223    * We do not know yet (client is fresh).
224    */
225   CT_NONE = 0,
226
227   /**
228    * Is the CORE service, we need to forward traffic to it.
229    */
230   CT_CORE = 1,
231
232   /**
233    * It is a monitor, forward monitor data.
234    */
235   CT_MONITOR = 2,
236
237   /**
238    * It is a communicator, use for communication.
239    */
240   CT_COMMUNICATOR = 3
241 };
242
243
244 /**
245  * Entry in our cache of ephemeral keys we currently use.
246  */
247 struct EphemeralCacheEntry
248 {
249
250   /**
251    * Target's peer identity (we don't re-use ephemerals
252    * to limit linkability of messages).
253    */
254   struct GNUNET_PeerIdentity target;
255
256   /**
257    * Signature affirming @e ephemeral_key of type
258    * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
259    */
260   struct GNUNET_CRYPTO_EddsaSignature sender_sig;
261
262   /**
263    * How long is @e sender_sig valid
264    */
265   struct GNUNET_TIME_Absolute ephemeral_validity;
266
267   /**
268    * Our ephemeral key.
269    */
270   struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
271
272   /**
273    * Node in the ephemeral cache for this entry.
274    * Used for expiration.
275    */
276   struct GNUNET_CONTAINER_HeapNode *hn;
277 };
278
279
280 /**
281  * Client connected to the transport service.
282  */
283 struct TransportClient;
284
285
286 /**
287  * A neighbour that at least one communicator is connected to.
288  */
289 struct Neighbour;
290
291
292 /**
293  * An ATS session is a message queue provided by a communicator
294  * via which we can reach a particular neighbour.
295  */
296 struct GNUNET_ATS_Session
297 {
298   /**
299    * Kept in a MDLL.
300    */
301   struct GNUNET_ATS_Session *next_neighbour;
302
303   /**
304    * Kept in a MDLL.
305    */
306   struct GNUNET_ATS_Session *prev_neighbour;
307
308   /**
309    * Kept in a MDLL.
310    */
311   struct GNUNET_ATS_Session *prev_client;
312
313   /**
314    * Kept in a MDLL.
315    */
316   struct GNUNET_ATS_Session *next_client;
317
318   /**
319    * Which neighbour is this ATS session for?
320    */
321   struct Neighbour *neighbour;
322
323   /**
324    * Which communicator offers this ATS session?
325    */
326   struct TransportClient *tc;
327
328   /**
329    * Address served by the ATS session.
330    */
331   const char *address;
332
333   /**
334    * Our current RTT estimate for this ATS session.
335    */
336   struct GNUNET_TIME_Relative rtt;
337
338   /**
339    * Unique identifier of this ATS session with the communicator.
340    */
341   uint32_t qid;
342
343   /**
344    * Maximum transmission unit supported by this ATS session.
345    */
346   uint32_t mtu;
347
348   /**
349    * Distance to the target of this ATS session.
350    */
351   uint32_t distance;
352
353   /**
354    * Network type offered by this ATS session.
355    */
356   enum GNUNET_NetworkType nt;
357
358   /**
359    * Connection status for this ATS session.
360    */
361   enum GNUNET_TRANSPORT_ConnectionStatus cs;
362
363   /**
364    * Messages pending.
365    */
366   uint32_t num_msg_pending;
367
368   /**
369    * Bytes pending.
370    */
371   uint32_t num_bytes_pending;
372
373   /**
374    * How much outbound bandwidth do we have available for this session?
375    */
376   struct GNUNET_BANDWIDTH_Tracker tracker_out;
377
378   /**
379    * How much inbound bandwidth do we have available for this session?
380    */
381   struct GNUNET_BANDWIDTH_Tracker tracker_in;
382 };
383
384
385 /**
386  * A neighbour that at least one communicator is connected to.
387  */
388 struct Neighbour
389 {
390
391   /**
392    * Which peer is this about?
393    */
394   struct GNUNET_PeerIdentity pid;
395
396   /**
397    * Head of list of messages pending for this neighbour.
398    */
399   struct PendingMessage *pending_msg_head;
400
401   /**
402    * Tail of list of messages pending for this neighbour.
403    */
404   struct PendingMessage *pending_msg_tail;
405
406   /**
407    * Head of DLL of ATS sessions to this peer.
408    */
409   struct GNUNET_ATS_Session *session_head;
410
411   /**
412    * Tail of DLL of ATS sessions to this peer.
413    */
414   struct GNUNET_ATS_Session *session_tail;
415
416   /**
417    * Quota at which CORE is allowed to transmit to this peer
418    * according to ATS.
419    *
420    * FIXME: not yet used, tricky to get right given multiple queues!
421    *        (=> Idea: let ATS set a quota per queue and we add them up here?)
422    * FIXME: how do we set this value initially when we tell CORE?
423    *    Options: start at a minimum value or at literally zero (before ATS?)
424    *         (=> Current thought: clean would be zero!)
425    */
426   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
427
428 };
429
430
431 /**
432  * Transmission request from CORE that is awaiting delivery.
433  */
434 struct PendingMessage
435 {
436   /**
437    * Kept in a MDLL of messages for this @a target.
438    */
439   struct PendingMessage *next_neighbour;
440
441   /**
442    * Kept in a MDLL of messages for this @a target.
443    */
444   struct PendingMessage *prev_neighbour;
445
446   /**
447    * Kept in a MDLL of messages from this @a client.
448    */
449   struct PendingMessage *next_client;
450
451   /**
452    * Kept in a MDLL of messages from this @a client.
453    */
454   struct PendingMessage *prev_client;
455
456   /**
457    * Target of the request.
458    */
459   struct Neighbour *target;
460
461   /**
462    * Client that issued the transmission request.
463    */
464   struct TransportClient *client;
465
466   /**
467    * Size of the original message.
468    */
469   uint32_t bytes_msg;
470
471 };
472
473
474 /**
475  * One of the addresses of this peer.
476  */
477 struct AddressListEntry
478 {
479
480   /**
481    * Kept in a DLL.
482    */
483   struct AddressListEntry *next;
484
485   /**
486    * Kept in a DLL.
487    */
488   struct AddressListEntry *prev;
489
490   /**
491    * Which communicator provides this address?
492    */
493   struct TransportClient *tc;
494
495   /**
496    * The actual address.
497    */
498   const char *address;
499
500   /**
501    * Current context for storing this address in the peerstore.
502    */
503   struct GNUNET_PEERSTORE_StoreContext *sc;
504
505   /**
506    * Task to periodically do @e st operation.
507    */
508   struct GNUNET_SCHEDULER_Task *st;
509
510   /**
511    * What is a typical lifetime the communicator expects this
512    * address to have? (Always from now.)
513    */
514   struct GNUNET_TIME_Relative expiration;
515
516   /**
517    * Address identifier used by the communicator.
518    */
519   uint32_t aid;
520
521   /**
522    * Network type offered by this address.
523    */
524   enum GNUNET_NetworkType nt;
525
526 };
527
528
529 /**
530  * Client connected to the transport service.
531  */
532 struct TransportClient
533 {
534
535   /**
536    * Kept in a DLL.
537    */
538   struct TransportClient *next;
539
540   /**
541    * Kept in a DLL.
542    */
543   struct TransportClient *prev;
544
545   /**
546    * Handle to the client.
547    */
548   struct GNUNET_SERVICE_Client *client;
549
550   /**
551    * Message queue to the client.
552    */
553   struct GNUNET_MQ_Handle *mq;
554
555   /**
556    * What type of client is this?
557    */
558   enum ClientType type;
559
560   union
561   {
562
563     /**
564      * Information for @e type #CT_CORE.
565      */
566     struct {
567
568       /**
569        * Head of list of messages pending for this client.
570        */
571       struct PendingMessage *pending_msg_head;
572
573       /**
574        * Tail of list of messages pending for this client.
575        */
576       struct PendingMessage *pending_msg_tail;
577
578     } core;
579
580     /**
581      * Information for @e type #CT_MONITOR.
582      */
583     struct {
584
585       /**
586        * Peer identity to monitor the addresses of.
587        * Zero to monitor all neighbours.  Valid if
588        * @e type is #CT_MONITOR.
589        */
590       struct GNUNET_PeerIdentity peer;
591
592       /**
593        * Is this a one-shot monitor?
594        */
595       int one_shot;
596
597     } monitor;
598
599
600     /**
601      * Information for @e type #CT_COMMUNICATOR.
602      */
603     struct {
604       /**
605        * If @e type is #CT_COMMUNICATOR, this communicator
606        * supports communicating using these addresses.
607        */
608       char *address_prefix;
609
610       /**
611        * Head of DLL of queues offered by this communicator.
612        */
613       struct GNUNET_ATS_Session *session_head;
614
615       /**
616        * Tail of DLL of queues offered by this communicator.
617        */
618       struct GNUNET_ATS_Session *session_tail;
619
620       /**
621        * Head of list of the addresses of this peer offered by this communicator.
622        */
623       struct AddressListEntry *addr_head;
624
625       /**
626        * Tail of list of the addresses of this peer offered by this communicator.
627        */
628       struct AddressListEntry *addr_tail;
629
630       /**
631        * Characteristics of this communicator.
632        */
633       enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
634
635     } communicator;
636
637   } details;
638
639 };
640
641
642 /**
643  * Head of linked list of all clients to this service.
644  */
645 static struct TransportClient *clients_head;
646
647 /**
648  * Tail of linked list of all clients to this service.
649  */
650 static struct TransportClient *clients_tail;
651
652 /**
653  * Statistics handle.
654  */
655 static struct GNUNET_STATISTICS_Handle *GST_stats;
656
657 /**
658  * Configuration handle.
659  */
660 static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
661
662 /**
663  * Our public key.
664  */
665 static struct GNUNET_PeerIdentity GST_my_identity;
666
667 /**
668  * Our private key.
669  */
670 static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
671
672 /**
673  * Map from PIDs to `struct Neighbour` entries.  A peer is
674  * a neighbour if we have an MQ to it from some communicator.
675  */
676 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
677
678 /**
679  * Database for peer's HELLOs.
680  */
681 static struct GNUNET_PEERSTORE_Handle *peerstore;
682
683 /**
684  * Heap sorting `struct EphemeralCacheEntry` by their
685  * key/signature validity.
686  */
687 static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
688
689 /**
690  * Hash map for looking up `struct EphemeralCacheEntry`s
691  * by peer identity. (We may have ephemerals in our
692  * cache for which we do not have a neighbour entry,
693  * and similar many neighbours may not need ephemerals,
694  * so we use a second map.)
695  */
696 static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
697
698 /**
699  * Our connection to ATS for allocation and bootstrapping.
700  */
701 static struct GNUNET_ATS_TransportHandle *ats;
702
703
704 /**
705  * Free cached ephemeral key.
706  *
707  * @param ece cached signature to free
708  */
709 static void
710 free_ephemeral (struct EphemeralCacheEntry *ece)
711 {
712   GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
713                                         &ece->target,
714                                         ece);
715   GNUNET_CONTAINER_heap_remove_node (ece->hn);
716   GNUNET_free (ece);
717 }
718
719
720 /**
721  * Lookup neighbour record for peer @a pid.
722  *
723  * @param pid neighbour to look for
724  * @return NULL if we do not have this peer as a neighbour
725  */
726 static struct Neighbour *
727 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
728 {
729   return GNUNET_CONTAINER_multipeermap_get (neighbours,
730                                             pid);
731 }
732
733
734 /**
735  * Details about what to notify monitors about.
736  */
737 struct MonitorEvent
738 {
739   /**
740    * @deprecated To be discussed if we keep these...
741    */
742   struct GNUNET_TIME_Absolute last_validation;
743   struct GNUNET_TIME_Absolute valid_until;
744   struct GNUNET_TIME_Absolute next_validation;
745
746   /**
747    * Current round-trip time estimate.
748    */
749   struct GNUNET_TIME_Relative rtt;
750
751   /**
752    * Connection status.
753    */
754   enum GNUNET_TRANSPORT_ConnectionStatus cs;
755
756   /**
757    * Messages pending.
758    */
759   uint32_t num_msg_pending;
760
761   /**
762    * Bytes pending.
763    */
764   uint32_t num_bytes_pending;
765
766
767 };
768
769
770 /**
771  * Notify monitor @a tc about an event.  That @a tc
772  * cares about the event has already been checked.
773  *
774  * Send @a tc information in @a me about a @a peer's status with
775  * respect to some @a address to all monitors that care.
776  *
777  * @param tc monitor to inform
778  * @param peer peer the information is about
779  * @param address address the information is about
780  * @param nt network type associated with @a address
781  * @param me detailed information to transmit
782  */
783 static void
784 notify_monitor (struct TransportClient *tc,
785                 const struct GNUNET_PeerIdentity *peer,
786                 const char *address,
787                 enum GNUNET_NetworkType nt,
788                 const struct MonitorEvent *me)
789 {
790   struct GNUNET_MQ_Envelope *env;
791   struct GNUNET_TRANSPORT_MonitorData *md;
792   size_t addr_len = strlen (address) + 1;
793
794   env = GNUNET_MQ_msg_extra (md,
795                              addr_len,
796                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
797   md->nt = htonl ((uint32_t) nt);
798   md->peer = *peer;
799   md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
800   md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
801   md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
802   md->rtt = GNUNET_TIME_relative_hton (me->rtt);
803   md->cs = htonl ((uint32_t) me->cs);
804   md->num_msg_pending = htonl (me->num_msg_pending);
805   md->num_bytes_pending = htonl (me->num_bytes_pending);
806   memcpy (&md[1],
807           address,
808           addr_len);
809   GNUNET_MQ_send (tc->mq,
810                   env);
811 }
812
813
814 /**
815  * Send information in @a me about a @a peer's status with respect
816  * to some @a address to all monitors that care.
817  *
818  * @param peer peer the information is about
819  * @param address address the information is about
820  * @param nt network type associated with @a address
821  * @param me detailed information to transmit
822  */
823 static void
824 notify_monitors (const struct GNUNET_PeerIdentity *peer,
825                  const char *address,
826                  enum GNUNET_NetworkType nt,
827                  const struct MonitorEvent *me)
828 {
829   static struct GNUNET_PeerIdentity zero;
830
831   for (struct TransportClient *tc = clients_head;
832        NULL != tc;
833        tc = tc->next)
834   {
835     if (CT_MONITOR != tc->type)
836       continue;
837     if (tc->details.monitor.one_shot)
838       continue;
839     if ( (0 != memcmp (&tc->details.monitor.peer,
840                        &zero,
841                        sizeof (zero))) &&
842          (0 != memcmp (&tc->details.monitor.peer,
843                        peer,
844                        sizeof (*peer))) )
845       continue;
846     notify_monitor (tc,
847                     peer,
848                     address,
849                     nt,
850                     me);
851   }
852 }
853
854
855 /**
856  * Called whenever a client connects.  Allocates our
857  * data structures associated with that client.
858  *
859  * @param cls closure, NULL
860  * @param client identification of the client
861  * @param mq message queue for the client
862  * @return our `struct TransportClient`
863  */
864 static void *
865 client_connect_cb (void *cls,
866                    struct GNUNET_SERVICE_Client *client,
867                    struct GNUNET_MQ_Handle *mq)
868 {
869   struct TransportClient *tc;
870
871   tc = GNUNET_new (struct TransportClient);
872   tc->client = client;
873   tc->mq = mq;
874   GNUNET_CONTAINER_DLL_insert (clients_head,
875                                clients_tail,
876                                tc);
877   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878               "Client %p connected\n",
879               tc);
880   return tc;
881 }
882
883
884 /**
885  * Release memory used by @a neighbour.
886  *
887  * @param neighbour neighbour entry to free
888  */
889 static void
890 free_neighbour (struct Neighbour *neighbour)
891 {
892   GNUNET_assert (NULL == neighbour->session_head);
893   GNUNET_assert (GNUNET_YES ==
894                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
895                                                        &neighbour->pid,
896                                                        neighbour));
897   GNUNET_free (neighbour);
898 }
899
900
901 /**
902  * Send message to CORE clients that we lost a connection.
903  *
904  * @param tc client to inform (must be CORE client)
905  * @param pid peer the connection is for
906  * @param quota_out current quota for the peer
907  */
908 static void
909 core_send_connect_info (struct TransportClient *tc,
910                         const struct GNUNET_PeerIdentity *pid,
911                         struct GNUNET_BANDWIDTH_Value32NBO quota_out)
912 {
913   struct GNUNET_MQ_Envelope *env;
914   struct ConnectInfoMessage *cim;
915
916   GNUNET_assert (CT_CORE == tc->type);
917   env = GNUNET_MQ_msg (cim,
918                        GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
919   cim->quota_out = quota_out;
920   cim->id = *pid;
921   GNUNET_MQ_send (tc->mq,
922                   env);
923 }
924
925
926 /**
927  * Send message to CORE clients that we gained a connection
928  *
929  * @param pid peer the queue was for
930  * @param quota_out current quota for the peer
931  */
932 static void
933 cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
934                          struct GNUNET_BANDWIDTH_Value32NBO quota_out)
935 {
936   for (struct TransportClient *tc = clients_head;
937        NULL != tc;
938        tc = tc->next)
939   {
940     if (CT_CORE != tc->type)
941       continue;
942     core_send_connect_info (tc,
943                             pid,
944                             quota_out);
945   }
946 }
947
948
949 /**
950  * Send message to CORE clients that we lost a connection.
951  *
952  * @param pid peer the connection was for
953  */
954 static void
955 cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
956 {
957   for (struct TransportClient *tc = clients_head;
958        NULL != tc;
959        tc = tc->next)
960   {
961     struct GNUNET_MQ_Envelope *env;
962     struct DisconnectInfoMessage *dim;
963
964     if (CT_CORE != tc->type)
965       continue;
966     env = GNUNET_MQ_msg (dim,
967                          GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
968     dim->peer = *pid;
969     GNUNET_MQ_send (tc->mq,
970                     env);
971   }
972 }
973
974
975 /**
976  * Free @a queue.
977  *
978  * @param queue the queue to free
979  */
980 static void
981 free_queue (struct GNUNET_ATS_Session *queue)
982 {
983   struct Neighbour *neighbour = queue->neighbour;
984   struct TransportClient *tc = queue->tc;
985   struct MonitorEvent me = {
986     .cs = GNUNET_TRANSPORT_CS_DOWN,
987     .rtt = GNUNET_TIME_UNIT_FOREVER_REL
988   };
989
990   GNUNET_CONTAINER_MDLL_remove (neighbour,
991                                 neighbour->session_head,
992                                 neighbour->session_tail,
993                                 queue);
994   GNUNET_CONTAINER_MDLL_remove (client,
995                                 tc->details.communicator.session_head,
996                                 tc->details.communicator.session_tail,
997                                 queue);
998
999   notify_monitors (&neighbour->pid,
1000                    queue->address,
1001                    queue->nt,
1002                    &me);
1003   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
1004   GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
1005   GNUNET_free (queue);
1006   if (NULL == neighbour->session_head)
1007     {
1008       cores_send_disconnect_info (&neighbour->pid);
1009       free_neighbour (neighbour);
1010     }
1011 }
1012
1013
1014 /**
1015  * Free @a ale
1016  *
1017  * @param ale address list entry to free
1018  */
1019 static void
1020 free_address_list_entry (struct AddressListEntry *ale)
1021 {
1022   struct TransportClient *tc = ale->tc;
1023
1024   GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
1025                                tc->details.communicator.addr_tail,
1026                                ale);
1027   if (NULL != ale->sc)
1028   {
1029     GNUNET_PEERSTORE_store_cancel (ale->sc);
1030     ale->sc = NULL;
1031   }
1032   if (NULL != ale->st)
1033   {
1034     GNUNET_SCHEDULER_cancel (ale->st);
1035     ale->st = NULL;
1036   }
1037   GNUNET_free (ale);
1038 }
1039
1040
1041 /**
1042  * Called whenever a client is disconnected.  Frees our
1043  * resources associated with that client.
1044  *
1045  * @param cls closure, NULL
1046  * @param client identification of the client
1047  * @param app_ctx our `struct TransportClient`
1048  */
1049 static void
1050 client_disconnect_cb (void *cls,
1051                       struct GNUNET_SERVICE_Client *client,
1052                       void *app_ctx)
1053 {
1054   struct TransportClient *tc = app_ctx;
1055
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057               "Client %p disconnected, cleaning up.\n",
1058               tc);
1059   GNUNET_CONTAINER_DLL_remove (clients_head,
1060                                clients_tail,
1061                                tc);
1062   switch (tc->type)
1063   {
1064   case CT_NONE:
1065     break;
1066   case CT_CORE:
1067     {
1068       struct PendingMessage *pm;
1069
1070       while (NULL != (pm = tc->details.core.pending_msg_head))
1071       {
1072         GNUNET_CONTAINER_MDLL_remove (client,
1073                                       tc->details.core.pending_msg_head,
1074                                       tc->details.core.pending_msg_tail,
1075                                       pm);
1076         pm->client = NULL;
1077       }
1078     }
1079     break;
1080   case CT_MONITOR:
1081     break;
1082   case CT_COMMUNICATOR:
1083     {
1084       struct GNUNET_ATS_Session *q;
1085       struct AddressListEntry *ale;
1086
1087       while (NULL != (q = tc->details.communicator.session_head))
1088         free_queue (q);
1089       while (NULL != (ale = tc->details.communicator.addr_head))
1090         free_address_list_entry (ale);
1091       GNUNET_free (tc->details.communicator.address_prefix);
1092     }
1093     break;
1094   }
1095   GNUNET_free (tc);
1096 }
1097
1098
1099 /**
1100  * Iterator telling new CORE client about all existing
1101  * connections to peers.
1102  *
1103  * @param cls the new `struct TransportClient`
1104  * @param pid a connected peer
1105  * @param value the `struct Neighbour` with more information
1106  * @return #GNUNET_OK (continue to iterate)
1107  */
1108 static int
1109 notify_client_connect_info (void *cls,
1110                             const struct GNUNET_PeerIdentity *pid,
1111                             void *value)
1112 {
1113   struct TransportClient *tc = cls;
1114   struct Neighbour *neighbour = value;
1115
1116   core_send_connect_info (tc,
1117                           pid,
1118                           neighbour->quota_out);
1119   return GNUNET_OK;
1120 }
1121
1122
1123 /**
1124  * Initialize a "CORE" client.  We got a start message from this
1125  * client, so add it to the list of clients for broadcasting of
1126  * inbound messages.
1127  *
1128  * @param cls the client
1129  * @param start the start message that was sent
1130  */
1131 static void
1132 handle_client_start (void *cls,
1133                      const struct StartMessage *start)
1134 {
1135   struct TransportClient *tc = cls;
1136   uint32_t options;
1137
1138   options = ntohl (start->options);
1139   if ( (0 != (1 & options)) &&
1140        (0 !=
1141         memcmp (&start->self,
1142                 &GST_my_identity,
1143                 sizeof (struct GNUNET_PeerIdentity)) ) )
1144   {
1145     /* client thinks this is a different peer, reject */
1146     GNUNET_break (0);
1147     GNUNET_SERVICE_client_drop (tc->client);
1148     return;
1149   }
1150   if (CT_NONE != tc->type)
1151   {
1152     GNUNET_break (0);
1153     GNUNET_SERVICE_client_drop (tc->client);
1154     return;
1155   }
1156   tc->type = CT_CORE;
1157   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1158                                          &notify_client_connect_info,
1159                                          tc);
1160   GNUNET_SERVICE_client_continue (tc->client);
1161 }
1162
1163
1164 /**
1165  * Client asked for transmission to a peer.  Process the request.
1166  *
1167  * @param cls the client
1168  * @param obm the send message that was sent
1169  */
1170 static int
1171 check_client_send (void *cls,
1172                    const struct OutboundMessage *obm)
1173 {
1174   struct TransportClient *tc = cls;
1175   uint16_t size;
1176   const struct GNUNET_MessageHeader *obmm;
1177
1178   if (CT_CORE != tc->type)
1179   {
1180     GNUNET_break (0);
1181     return GNUNET_SYSERR;
1182   }
1183   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
1184   if (size < sizeof (struct GNUNET_MessageHeader))
1185   {
1186     GNUNET_break (0);
1187     return GNUNET_SYSERR;
1188   }
1189   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1190   if (size != ntohs (obmm->size))
1191   {
1192     GNUNET_break (0);
1193     return GNUNET_SYSERR;
1194   }
1195   return GNUNET_OK;
1196 }
1197
1198
1199 /**
1200  * Send a response to the @a pm that we have processed a
1201  * "send" request with status @a success. We
1202  * transmitted @a bytes_physical on the actual wire.
1203  * Sends a confirmation to the "core" client responsible
1204  * for the original request and free's @a pm.
1205  *
1206  * @param pm handle to the original pending message
1207  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
1208  *          for transmission failure
1209  * @param bytes_physical amount of bandwidth consumed
1210  */
1211 static void
1212 client_send_response (struct PendingMessage *pm,
1213                       int success,
1214                       uint32_t bytes_physical)
1215 {
1216   struct TransportClient *tc = pm->client;
1217   struct Neighbour *target = pm->target;
1218   struct GNUNET_MQ_Envelope *env;
1219   struct SendOkMessage *som;
1220
1221   if (NULL != tc)
1222   {
1223     env = GNUNET_MQ_msg (som,
1224                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1225     som->success = htonl ((uint32_t) success);
1226     som->bytes_msg = htonl (pm->bytes_msg);
1227     som->bytes_physical = htonl (bytes_physical);
1228     som->peer = target->pid;
1229     GNUNET_MQ_send (tc->mq,
1230                     env);
1231     GNUNET_CONTAINER_MDLL_remove (client,
1232                                   tc->details.core.pending_msg_head,
1233                                   tc->details.core.pending_msg_tail,
1234                                   pm);
1235   }
1236   GNUNET_CONTAINER_MDLL_remove (neighbour,
1237                                 target->pending_msg_head,
1238                                 target->pending_msg_tail,
1239                                 pm);
1240   GNUNET_free (pm);
1241 }
1242
1243
1244 /**
1245  * Client asked for transmission to a peer.  Process the request.
1246  *
1247  * @param cls the client
1248  * @param obm the send message that was sent
1249  */
1250 static void
1251 handle_client_send (void *cls,
1252                     const struct OutboundMessage *obm)
1253 {
1254   struct TransportClient *tc = cls;
1255   struct PendingMessage *pm;
1256   const struct GNUNET_MessageHeader *obmm;
1257   struct Neighbour *target;
1258   uint32_t bytes_msg;
1259
1260   GNUNET_assert (CT_CORE == tc->type);
1261   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
1262   bytes_msg = ntohs (obmm->size);
1263   target = lookup_neighbour (&obm->peer);
1264   if (NULL == target)
1265   {
1266     /* Failure: don't have this peer as a neighbour (anymore).
1267        Might have gone down asynchronously, so this is NOT
1268        a protocol violation by CORE. Still count the event,
1269        as this should be rare. */
1270     struct GNUNET_MQ_Envelope *env;
1271     struct SendOkMessage *som;
1272
1273     env = GNUNET_MQ_msg (som,
1274                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
1275     som->success = htonl (GNUNET_SYSERR);
1276     som->bytes_msg = htonl (bytes_msg);
1277     som->bytes_physical = htonl (0);
1278     som->peer = obm->peer;
1279     GNUNET_MQ_send (tc->mq,
1280                     env);
1281     GNUNET_SERVICE_client_continue (tc->client);
1282     GNUNET_STATISTICS_update (GST_stats,
1283                               "# messages dropped (neighbour unknown)",
1284                               1,
1285                               GNUNET_NO);
1286     return;
1287   }
1288   pm = GNUNET_new (struct PendingMessage);
1289   pm->client = tc;
1290   pm->target = target;
1291   pm->bytes_msg = bytes_msg;
1292   GNUNET_CONTAINER_MDLL_insert (neighbour,
1293                                 target->pending_msg_head,
1294                                 target->pending_msg_tail,
1295                                 pm);
1296   GNUNET_CONTAINER_MDLL_insert (client,
1297                                 tc->details.core.pending_msg_head,
1298                                 tc->details.core.pending_msg_tail,
1299                                 pm);
1300   // FIXME: do the work, final continuation with call to:
1301   client_send_response (pm,
1302                         GNUNET_NO,
1303                         0);
1304 }
1305
1306
1307 /**
1308  * Communicator started.  Test message is well-formed.
1309  *
1310  * @param cls the client
1311  * @param cam the send message that was sent
1312  */
1313 static int
1314 check_communicator_available (void *cls,
1315                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
1316 {
1317   struct TransportClient *tc = cls;
1318   uint16_t size;
1319
1320   if (CT_NONE != tc->type)
1321   {
1322     GNUNET_break (0);
1323     return GNUNET_SYSERR;
1324   }
1325   tc->type = CT_COMMUNICATOR;
1326   size = ntohs (cam->header.size) - sizeof (*cam);
1327   if (0 == size)
1328     return GNUNET_OK; /* receive-only communicator */
1329   GNUNET_MQ_check_zero_termination (cam);
1330   return GNUNET_OK;
1331 }
1332
1333
1334 /**
1335  * Communicator started.  Process the request.
1336  *
1337  * @param cls the client
1338  * @param cam the send message that was sent
1339  */
1340 static void
1341 handle_communicator_available (void *cls,
1342                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
1343 {
1344   struct TransportClient *tc = cls;
1345   uint16_t size;
1346
1347   size = ntohs (cam->header.size) - sizeof (*cam);
1348   if (0 == size)
1349     return; /* receive-only communicator */
1350   tc->details.communicator.address_prefix
1351     = GNUNET_strdup ((const char *) &cam[1]);
1352   tc->details.communicator.cc
1353     = (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
1354   GNUNET_SERVICE_client_continue (tc->client);
1355 }
1356
1357
1358 /**
1359  * Address of our peer added.  Test message is well-formed.
1360  *
1361  * @param cls the client
1362  * @param aam the send message that was sent
1363  */
1364 static int
1365 check_add_address (void *cls,
1366                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
1367 {
1368   struct TransportClient *tc = cls;
1369
1370   if (CT_COMMUNICATOR != tc->type)
1371   {
1372     GNUNET_break (0);
1373     return GNUNET_SYSERR;
1374   }
1375   GNUNET_MQ_check_zero_termination (aam);
1376   return GNUNET_OK;
1377 }
1378
1379
1380 /**
1381  * Ask peerstore to store our address.
1382  *
1383  * @param cls an `struct AddressListEntry *`
1384  */
1385 static void
1386 store_pi (void *cls);
1387
1388
1389 /**
1390  * Function called when peerstore is done storing our address.
1391  */
1392 static void
1393 peerstore_store_cb (void *cls,
1394                     int success)
1395 {
1396   struct AddressListEntry *ale = cls;
1397
1398   ale->sc = NULL;
1399   if (GNUNET_YES != success)
1400     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1401                 "Failed to store our own address `%s' in peerstore!\n",
1402                 ale->address);
1403   /* refresh period is 1/4 of expiration time, that should be plenty
1404      without being excessive. */
1405   ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (ale->expiration,
1406                                                                        4ULL),
1407                                           &store_pi,
1408                                           ale);
1409 }
1410
1411
1412 /**
1413  * Ask peerstore to store our address.
1414  *
1415  * @param cls an `struct AddressListEntry *`
1416  */
1417 static void
1418 store_pi (void *cls)
1419 {
1420   struct AddressListEntry *ale = cls;
1421   void *addr;
1422   size_t addr_len;
1423   struct GNUNET_TIME_Absolute expiration;
1424
1425   ale->st = NULL;
1426   expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
1427   GNUNET_HELLO_sign_address (ale->address,
1428                              ale->nt,
1429                              expiration,
1430                              GST_my_private_key,
1431                              &addr,
1432                              &addr_len);
1433   ale->sc = GNUNET_PEERSTORE_store (peerstore,
1434                                     "transport",
1435                                     &GST_my_identity,
1436                                     GNUNET_HELLO_PEERSTORE_KEY,
1437                                     addr,
1438                                     addr_len,
1439                                     expiration,
1440                                     GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
1441                                     &peerstore_store_cb,
1442                                     ale);
1443   GNUNET_free (addr);
1444   if (NULL == ale->sc)
1445   {
1446     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1447                 "Failed to store our address `%s' with peerstore\n",
1448                 ale->address);
1449     ale->st = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
1450                                             &store_pi,
1451                                             ale);
1452   }
1453 }
1454
1455
1456 /**
1457  * Address of our peer added.  Process the request.
1458  *
1459  * @param cls the client
1460  * @param aam the send message that was sent
1461  */
1462 static void
1463 handle_add_address (void *cls,
1464                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
1465 {
1466   struct TransportClient *tc = cls;
1467   struct AddressListEntry *ale;
1468   size_t slen;
1469
1470   slen = ntohs (aam->header.size) - sizeof (*aam);
1471   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
1472   ale->tc = tc;
1473   ale->address = (const char *) &ale[1];
1474   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
1475   ale->aid = aam->aid;
1476   ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
1477   memcpy (&ale[1],
1478           &aam[1],
1479           slen);
1480   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
1481                                tc->details.communicator.addr_tail,
1482                                ale);
1483   ale->st = GNUNET_SCHEDULER_add_now (&store_pi,
1484                                       ale);
1485   GNUNET_SERVICE_client_continue (tc->client);
1486 }
1487
1488
1489 /**
1490  * Address of our peer deleted.  Process the request.
1491  *
1492  * @param cls the client
1493  * @param dam the send message that was sent
1494  */
1495 static void
1496 handle_del_address (void *cls,
1497                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
1498 {
1499   struct TransportClient *tc = cls;
1500
1501   if (CT_COMMUNICATOR != tc->type)
1502   {
1503     GNUNET_break (0);
1504     GNUNET_SERVICE_client_drop (tc->client);
1505     return;
1506   }
1507   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
1508        NULL != ale;
1509        ale = ale->next)
1510   {
1511     if (dam->aid != ale->aid)
1512       continue;
1513     GNUNET_assert (ale->tc == tc);
1514     free_address_list_entry (ale);
1515     GNUNET_SERVICE_client_continue (tc->client);
1516   }
1517   GNUNET_break (0);
1518   GNUNET_SERVICE_client_drop (tc->client);
1519 }
1520
1521
1522 /**
1523  * Client notified us about transmission from a peer.  Process the request.
1524  *
1525  * @param cls the client
1526  * @param obm the send message that was sent
1527  */
1528 static int
1529 check_incoming_msg (void *cls,
1530                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
1531 {
1532   struct TransportClient *tc = cls;
1533   uint16_t size;
1534   const struct GNUNET_MessageHeader *obmm;
1535
1536   if (CT_COMMUNICATOR != tc->type)
1537   {
1538     GNUNET_break (0);
1539     return GNUNET_SYSERR;
1540   }
1541   size = ntohs (im->header.size) - sizeof (*im);
1542   if (size < sizeof (struct GNUNET_MessageHeader))
1543   {
1544     GNUNET_break (0);
1545     return GNUNET_SYSERR;
1546   }
1547   obmm = (const struct GNUNET_MessageHeader *) &im[1];
1548   if (size != ntohs (obmm->size))
1549   {
1550     GNUNET_break (0);
1551     return GNUNET_SYSERR;
1552   }
1553   return GNUNET_OK;
1554 }
1555
1556
1557 /**
1558  * Incoming meessage.  Process the request.
1559  *
1560  * @param cls the client
1561  * @param im the send message that was received
1562  */
1563 static void
1564 handle_incoming_msg (void *cls,
1565                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
1566 {
1567   struct TransportClient *tc = cls;
1568
1569   GNUNET_SERVICE_client_continue (tc->client);
1570 }
1571
1572
1573 /**
1574  * New queue became available.  Check message.
1575  *
1576  * @param cls the client
1577  * @param aqm the send message that was sent
1578  */
1579 static int
1580 check_add_queue_message (void *cls,
1581                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
1582 {
1583   struct TransportClient *tc = cls;
1584
1585   if (CT_COMMUNICATOR != tc->type)
1586   {
1587     GNUNET_break (0);
1588     return GNUNET_SYSERR;
1589   }
1590   GNUNET_MQ_check_zero_termination (aqm);
1591   return GNUNET_OK;
1592 }
1593
1594
1595 /**
1596  * Bandwidth tracker informs us that the delay until we
1597  * can transmit again changed.
1598  *
1599  * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
1600  */
1601 static void
1602 tracker_update_cb (void *cls)
1603 {
1604   struct GNUNET_ATS_Session *queue = cls;
1605
1606   // FIXME: re-schedule transmission tasks if applicable!
1607 }
1608
1609
1610 /**
1611  * Bandwidth tracker informs us that excessive bandwidth was allocated
1612  * which is not being used.
1613  *
1614  * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
1615  */
1616 static void
1617 tracker_excess_cb (void *cls)
1618 {
1619   /* FIXME: what do we do? */
1620 }
1621
1622
1623 /**
1624  * New queue became available.  Process the request.
1625  *
1626  * @param cls the client
1627  * @param aqm the send message that was sent
1628  */
1629 static void
1630 handle_add_queue_message (void *cls,
1631                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
1632 {
1633   struct TransportClient *tc = cls;
1634   struct GNUNET_ATS_Session *queue;
1635   struct Neighbour *neighbour;
1636   const char *addr;
1637   uint16_t addr_len;
1638
1639   neighbour = lookup_neighbour (&aqm->receiver);
1640   if (NULL == neighbour)
1641   {
1642     neighbour = GNUNET_new (struct Neighbour);
1643     neighbour->pid = aqm->receiver;
1644     GNUNET_assert (GNUNET_OK ==
1645                    GNUNET_CONTAINER_multipeermap_put (neighbours,
1646                                                       &neighbour->pid,
1647                                                       neighbour,
1648                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1649     cores_send_connect_info (&neighbour->pid,
1650                              GNUNET_BANDWIDTH_ZERO);
1651     // FIXME: notify ATS!
1652   }
1653   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
1654   addr = (const char *) &aqm[1];
1655
1656   queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
1657   queue->tc = tc;
1658   queue->address = (const char *) &queue[1];
1659   queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
1660   queue->qid = aqm->qid;
1661   queue->mtu = ntohl (aqm->mtu);
1662   queue->distance = ntohl (aqm->distance);
1663   queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
1664   queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
1665   queue->neighbour = neighbour;
1666   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
1667                                   &tracker_update_cb,
1668                                   queue,
1669                                   GNUNET_BANDWIDTH_ZERO,
1670                                   0 /* FIXME: max carry in seconds! */,
1671                                   &tracker_excess_cb,
1672                                   queue);
1673   GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
1674                                   &tracker_update_cb,
1675                                   queue,
1676                                   GNUNET_BANDWIDTH_ZERO,
1677                                   0 /* FIXME: max carry in seconds! */,
1678                                   &tracker_excess_cb,
1679                                   queue);
1680   memcpy (&queue[1],
1681           addr,
1682           addr_len);
1683   /* notify monitors about new queue */
1684   {
1685     struct MonitorEvent me = {
1686       .rtt = queue->rtt,
1687       .cs = queue->cs
1688     };
1689
1690     notify_monitors (&neighbour->pid,
1691                      queue->address,
1692                      queue->nt,
1693                      &me);
1694   }
1695   GNUNET_CONTAINER_MDLL_insert (neighbour,
1696                                 neighbour->session_head,
1697                                 neighbour->session_tail,
1698                                 queue);
1699   GNUNET_CONTAINER_MDLL_insert (client,
1700                                 tc->details.communicator.session_head,
1701                                 tc->details.communicator.session_tail,
1702                                 queue);
1703   // FIXME: possibly transmit queued messages?
1704   GNUNET_SERVICE_client_continue (tc->client);
1705 }
1706
1707
1708 /**
1709  * Queue to a peer went down.  Process the request.
1710  *
1711  * @param cls the client
1712  * @param dqm the send message that was sent
1713  */
1714 static void
1715 handle_del_queue_message (void *cls,
1716                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
1717 {
1718   struct TransportClient *tc = cls;
1719
1720   if (CT_COMMUNICATOR != tc->type)
1721   {
1722     GNUNET_break (0);
1723     GNUNET_SERVICE_client_drop (tc->client);
1724     return;
1725   }
1726   for (struct GNUNET_ATS_Session *queue = tc->details.communicator.session_head;
1727        NULL != queue;
1728        queue = queue->next_client)
1729   {
1730     struct Neighbour *neighbour = queue->neighbour;
1731
1732     if ( (dqm->qid != queue->qid) ||
1733          (0 != memcmp (&dqm->receiver,
1734                        &neighbour->pid,
1735                        sizeof (struct GNUNET_PeerIdentity))) )
1736       continue;
1737     free_queue (queue);
1738     GNUNET_SERVICE_client_continue (tc->client);
1739     return;
1740   }
1741   GNUNET_break (0);
1742   GNUNET_SERVICE_client_drop (tc->client);
1743 }
1744
1745
1746 /**
1747  * Message was transmitted.  Process the request.
1748  *
1749  * @param cls the client
1750  * @param sma the send message that was sent
1751  */
1752 static void
1753 handle_send_message_ack (void *cls,
1754                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
1755 {
1756   struct TransportClient *tc = cls;
1757
1758   if (CT_COMMUNICATOR != tc->type)
1759   {
1760     GNUNET_break (0);
1761     GNUNET_SERVICE_client_drop (tc->client);
1762     return;
1763   }
1764   GNUNET_SERVICE_client_continue (tc->client);
1765 }
1766
1767
1768 /**
1769  * Iterator telling new MONITOR client about all existing
1770  * queues to peers.
1771  *
1772  * @param cls the new `struct TransportClient`
1773  * @param pid a connected peer
1774  * @param value the `struct Neighbour` with more information
1775  * @return #GNUNET_OK (continue to iterate)
1776  */
1777 static int
1778 notify_client_queues (void *cls,
1779                       const struct GNUNET_PeerIdentity *pid,
1780                       void *value)
1781 {
1782   struct TransportClient *tc = cls;
1783   struct Neighbour *neighbour = value;
1784
1785   GNUNET_assert (CT_MONITOR == tc->type);
1786   for (struct GNUNET_ATS_Session *q = neighbour->session_head;
1787        NULL != q;
1788        q = q->next_neighbour)
1789   {
1790     struct MonitorEvent me = {
1791       .rtt = q->rtt,
1792       .cs = q->cs,
1793       .num_msg_pending = q->num_msg_pending,
1794       .num_bytes_pending = q->num_bytes_pending
1795     };
1796
1797     notify_monitor (tc,
1798                     pid,
1799                     q->address,
1800                     q->nt,
1801                     &me);
1802   }
1803   return GNUNET_OK;
1804 }
1805
1806
1807 /**
1808  * Initialize a monitor client.
1809  *
1810  * @param cls the client
1811  * @param start the start message that was sent
1812  */
1813 static void
1814 handle_monitor_start (void *cls,
1815                       const struct GNUNET_TRANSPORT_MonitorStart *start)
1816 {
1817   struct TransportClient *tc = cls;
1818
1819   if (CT_NONE != tc->type)
1820   {
1821     GNUNET_break (0);
1822     GNUNET_SERVICE_client_drop (tc->client);
1823     return;
1824   }
1825   tc->type = CT_MONITOR;
1826   tc->details.monitor.peer = start->peer;
1827   tc->details.monitor.one_shot = ntohl (start->one_shot);
1828   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1829                                          &notify_client_queues,
1830                                          tc);
1831   GNUNET_SERVICE_client_mark_monitor (tc->client);
1832   GNUNET_SERVICE_client_continue (tc->client);
1833 }
1834
1835
1836 /**
1837  * Signature of a function called by ATS with the current bandwidth
1838  * allocation to be used as determined by ATS.
1839  *
1840  * @param cls closure, NULL
1841  * @param session session this is about
1842  * @param bandwidth_out assigned outbound bandwidth for the connection,
1843  *        0 to signal disconnect
1844  * @param bandwidth_in assigned inbound bandwidth for the connection,
1845  *        0 to signal disconnect
1846  */
1847 static void
1848 ats_allocation_cb (void *cls,
1849                    struct GNUNET_ATS_Session *session,
1850                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1851                    struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
1852 {
1853   (void) cls;
1854   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
1855                                          bandwidth_out);
1856   GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
1857                                          bandwidth_in);
1858 }
1859
1860
1861 /**
1862  * Find transport client providing communication service
1863  * for the protocol @a prefix.
1864  *
1865  * @param prefix communicator name
1866  * @return NULL if no such transport client is available
1867  */
1868 static struct TransportClient *
1869 lookup_communicator (const char *prefix)
1870 {
1871   GNUNET_break (0); // FIXME: implement
1872   return NULL;
1873 }
1874
1875
1876 /**
1877  * Signature of a function called by ATS suggesting transport to
1878  * try connecting with a particular address.
1879  *
1880  * @param cls closure, NULL
1881  * @param pid target peer
1882  * @param address the address to try
1883  */
1884 static void
1885 ats_suggestion_cb (void *cls,
1886                    const struct GNUNET_PeerIdentity *pid,
1887                    const char *address)
1888 {
1889   struct TransportClient *tc;
1890   char *prefix;
1891
1892   (void) cls;
1893   prefix = NULL; // FIXME
1894   tc = lookup_communicator (prefix);
1895   if (NULL == tc)
1896   {
1897     // STATS...
1898     return;
1899   }
1900   // FIXME: forward suggestion to tc
1901 }
1902
1903
1904 /**
1905  * Free neighbour entry.
1906  *
1907  * @param cls NULL
1908  * @param pid unused
1909  * @param value a `struct Neighbour`
1910  * @return #GNUNET_OK (always)
1911  */
1912 static int
1913 free_neighbour_cb (void *cls,
1914                    const struct GNUNET_PeerIdentity *pid,
1915                    void *value)
1916 {
1917   struct Neighbour *neighbour = value;
1918
1919   (void) cls;
1920   (void) pid;
1921   GNUNET_break (0); // should this ever happen?
1922   free_neighbour (neighbour);
1923
1924   return GNUNET_OK;
1925 }
1926
1927
1928 /**
1929  * Free ephemeral entry.
1930  *
1931  * @param cls NULL
1932  * @param pid unused
1933  * @param value a `struct Neighbour`
1934  * @return #GNUNET_OK (always)
1935  */
1936 static int
1937 free_ephemeral_cb (void *cls,
1938                    const struct GNUNET_PeerIdentity *pid,
1939                    void *value)
1940 {
1941   struct EphemeralCacheEntry *ece = value;
1942
1943   (void) cls;
1944   (void) pid;
1945   free_ephemeral (ece);
1946   return GNUNET_OK;
1947 }
1948
1949
1950 /**
1951  * Function called when the service shuts down.  Unloads our plugins
1952  * and cancels pending validations.
1953  *
1954  * @param cls closure, unused
1955  */
1956 static void
1957 do_shutdown (void *cls)
1958 {
1959   (void) cls;
1960
1961   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1962                                          &free_neighbour_cb,
1963                                          NULL);
1964   if (NULL != ats)
1965   {
1966     GNUNET_ATS_transport_done (ats);
1967     ats = NULL;
1968   }
1969   if (NULL != peerstore)
1970   {
1971     GNUNET_PEERSTORE_disconnect (peerstore,
1972                                  GNUNET_NO);
1973     peerstore = NULL;
1974   }
1975   if (NULL != GST_stats)
1976   {
1977     GNUNET_STATISTICS_destroy (GST_stats,
1978                                GNUNET_NO);
1979     GST_stats = NULL;
1980   }
1981   if (NULL != GST_my_private_key)
1982   {
1983     GNUNET_free (GST_my_private_key);
1984     GST_my_private_key = NULL;
1985   }
1986   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
1987   neighbours = NULL;
1988   GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
1989                                          &free_ephemeral_cb,
1990                                          NULL);
1991   GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
1992   ephemeral_map = NULL;
1993   GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
1994   ephemeral_heap = NULL;
1995 }
1996
1997
1998 /**
1999  * Initiate transport service.
2000  *
2001  * @param cls closure
2002  * @param c configuration to use
2003  * @param service the initialized service
2004  */
2005 static void
2006 run (void *cls,
2007      const struct GNUNET_CONFIGURATION_Handle *c,
2008      struct GNUNET_SERVICE_Handle *service)
2009 {
2010   (void) cls;
2011   /* setup globals */
2012   GST_cfg = c;
2013   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
2014                                                      GNUNET_YES);
2015   ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
2016                                                         GNUNET_YES);
2017   ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2018   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
2019   if (NULL == GST_my_private_key)
2020   {
2021     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2022                 _("Transport service is lacking key configuration settings. Exiting.\n"));
2023     GNUNET_SCHEDULER_shutdown ();
2024     return;
2025   }
2026   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
2027                                       &GST_my_identity.public_key);
2028   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
2029              "My identity is `%s'\n",
2030              GNUNET_i2s_full (&GST_my_identity));
2031   GST_stats = GNUNET_STATISTICS_create ("transport",
2032                                         GST_cfg);
2033   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
2034                                  NULL);
2035   peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
2036   if (NULL == peerstore)
2037   {
2038     GNUNET_break (0);
2039     GNUNET_SCHEDULER_shutdown ();
2040     return;
2041   }
2042   ats = GNUNET_ATS_transport_init (GST_cfg,
2043                                    &ats_allocation_cb,
2044                                    NULL,
2045                                    &ats_suggestion_cb,
2046                                    NULL);
2047   if (NULL == ats)
2048   {
2049     GNUNET_break (0);
2050     GNUNET_SCHEDULER_shutdown ();
2051     return;
2052   }
2053 }
2054
2055
2056 /**
2057  * Define "main" method using service macro.
2058  */
2059 GNUNET_SERVICE_MAIN
2060 ("transport",
2061  GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
2062  &run,
2063  &client_connect_cb,
2064  &client_disconnect_cb,
2065  NULL,
2066  /* communication with core */
2067  GNUNET_MQ_hd_fixed_size (client_start,
2068                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
2069                           struct StartMessage,
2070                           NULL),
2071  GNUNET_MQ_hd_var_size (client_send,
2072                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
2073                         struct OutboundMessage,
2074                         NULL),
2075  /* communication with communicators */
2076  GNUNET_MQ_hd_var_size (communicator_available,
2077                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
2078                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
2079                         NULL),
2080  GNUNET_MQ_hd_var_size (add_address,
2081                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
2082                         struct GNUNET_TRANSPORT_AddAddressMessage,
2083                         NULL),
2084  GNUNET_MQ_hd_fixed_size (del_address,
2085                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
2086                           struct GNUNET_TRANSPORT_DelAddressMessage,
2087                           NULL),
2088  GNUNET_MQ_hd_var_size (incoming_msg,
2089                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
2090                         struct GNUNET_TRANSPORT_IncomingMessage,
2091                         NULL),
2092  GNUNET_MQ_hd_var_size (add_queue_message,
2093                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
2094                         struct GNUNET_TRANSPORT_AddQueueMessage,
2095                         NULL),
2096  GNUNET_MQ_hd_fixed_size (del_queue_message,
2097                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
2098                           struct GNUNET_TRANSPORT_DelQueueMessage,
2099                           NULL),
2100  GNUNET_MQ_hd_fixed_size (send_message_ack,
2101                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
2102                           struct GNUNET_TRANSPORT_SendMessageToAck,
2103                           NULL),
2104  /* communication with monitors */
2105  GNUNET_MQ_hd_fixed_size (monitor_start,
2106                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
2107                           struct GNUNET_TRANSPORT_MonitorStart,
2108                           NULL),
2109  GNUNET_MQ_handler_end ());
2110
2111
2112 /* end of file gnunet-service-transport.c */