Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / transport / gnunet-service-tng.c
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018 GNUnet e.V.
4
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  Affero General Public License for more details.
14
15  You should have received a copy of the GNU Affero General Public License
16  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 /**
19  * @file transport/gnunet-service-transport.c
20  * @brief main for gnunet-service-transport
21  * @author Christian Grothoff
22  *
23  * TODO:
24  * - make *our* collected addresses available somehow somewhere
25  *   => Choices: in peerstore or revive/keep peerinfo?
26  * - MTU information is missing for queues!
27  * - start supporting monitor logic (add functions to signal monitors!)
28  * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
29  * - ask ATS about bandwidth allocation
30  * -
31  */
32 #include "platform.h"
33 #include "gnunet_util_lib.h"
34 #include "gnunet_statistics_service.h"
35 #include "gnunet_transport_service.h"
36 #include "gnunet_peerinfo_service.h"
37 #include "gnunet_ats_service.h"
38 #include "gnunet-service-transport.h"
39 #include "transport.h"
40
41
42 /**
43  * How many messages can we have pending for a given client process
44  * before we start to drop incoming messages?  We typically should
45  * have only one client and so this would be the primary buffer for
46  * messages, so the number should be chosen rather generously.
47  *
48  * The expectation here is that most of the time the queue is large
49  * enough so that a drop is virtually never required.  Note that
50  * this value must be about as large as 'TOTAL_MSGS' in the
51  * 'test_transport_api_reliability.c', otherwise that testcase may
52  * fail.
53  */
54 #define MAX_PENDING (128 * 1024)
55
56
57 /**
58  * What type of client is the `struct TransportClient` about?
59  */
60 enum ClientType
61 {
62   /**
63    * We do not know yet (client is fresh).
64    */
65   CT_NONE = 0,
66
67   /**
68    * Is the CORE service, we need to forward traffic to it.
69    */
70   CT_CORE = 1,
71
72   /**
73    * It is a monitor, forward monitor data.
74    */
75   CT_MONITOR = 2,
76
77   /**
78    * It is a communicator, use for communication.
79    */
80   CT_COMMUNICATOR = 3
81 };
82
83
84 /**
85  * Client connected to the transport service.
86  */
87 struct TransportClient;
88
89
90 /**
91  * A neighbour that at least one communicator is connected to.
92  */
93 struct Neighbour;
94
95
96 /**
97  * List of available queues for a particular neighbour.
98  */
99 struct Queue
100 {
101   /**
102    * Kept in a MDLL.
103    */
104   struct Queue *next_neighbour;
105
106   /**
107    * Kept in a MDLL.
108    */
109   struct Queue *prev_neighbour;
110
111   /**
112    * Kept in a MDLL.
113    */
114   struct Queue *prev_client;
115
116   /**
117    * Kept in a MDLL.
118    */
119   struct Queue *next_client;
120
121   /**
122    * Which neighbour is this queue for?
123    */
124   struct Neighbour *neighbour;
125
126   /**
127    * Which communicator offers this queue?
128    */
129   struct TransportClient *tc;
130
131   /**
132    * Address served by the queue.
133    */
134   const char *address;
135
136   /**
137    * Unique identifier of this queue with the communicator.
138    */
139   uint32_t qid;
140
141   /**
142    * Network type offered by this queue.
143    */
144   enum GNUNET_ATS_Network_Type nt;
145
146   // FIXME: add ATS-specific fields here!
147 };
148
149
150 /**
151  * A neighbour that at least one communicator is connected to.
152  */
153 struct Neighbour
154 {
155
156   /**
157    * Which peer is this about?
158    */
159   struct GNUNET_PeerIdentity pid;
160
161   /**
162    * Head of list of messages pending for this neighbour.
163    */
164   struct PendingMessage *pending_msg_head;
165
166   /**
167    * Tail of list of messages pending for this neighbour.
168    */
169   struct PendingMessage *pending_msg_tail;
170
171   /**
172    * Head of DLL of queues to this peer.
173    */
174   struct Queue *queue_head;
175
176   /**
177    * Tail of DLL of queues to this peer.
178    */
179   struct Queue *queue_tail;
180
181   /**
182    * Quota at which CORE is allowed to transmit to this peer
183    * according to ATS.
184    *
185    * FIXME: not yet used, tricky to get right given multiple queues!
186    *        (=> Idea: let ATS set a quota per queue and we add them up here?)
187    * FIXME: how do we set this value initially when we tell CORE?
188    *    Options: start at a minimum value or at literally zero (before ATS?)
189    *         (=> Current thought: clean would be zero!)
190    */
191   struct GNUNET_BANDWIDTH_Value32NBO quota_out;
192
193 };
194
195
196 /**
197  * Transmission request from CORE that is awaiting delivery.
198  */
199 struct PendingMessage
200 {
201   /**
202    * Kept in a MDLL of messages for this @a target.
203    */
204   struct PendingMessage *next_neighbour;
205
206   /**
207    * Kept in a MDLL of messages for this @a target.
208    */
209   struct PendingMessage *prev_neighbour;
210
211   /**
212    * Kept in a MDLL of messages from this @a client.
213    */
214   struct PendingMessage *next_client;
215
216   /**
217    * Kept in a MDLL of messages from this @a client.
218    */
219   struct PendingMessage *prev_client;
220
221   /**
222    * Target of the request.
223    */
224   struct Neighbour *target;
225
226   /**
227    * Client that issued the transmission request.
228    */
229   struct TransportClient *client;
230
231   /**
232    * Size of the original message.
233    */
234   uint32_t bytes_msg;
235
236 };
237
238
239 /**
240  * One of the addresses of this peer.
241  */
242 struct AddressListEntry
243 {
244
245   /**
246    * Kept in a DLL.
247    */
248   struct AddressListEntry *next;
249
250   /**
251    * Kept in a DLL.
252    */
253   struct AddressListEntry *prev;
254
255   /**
256    * Which communicator provides this address?
257    */
258   struct TransportClient *tc;
259
260   /**
261    * The actual address.
262    */
263   const char *address;
264
265   /**
266    * What is a typical lifetime the communicator expects this
267    * address to have? (Always from now.)
268    */
269   struct GNUNET_TIME_Relative expiration;
270
271   /**
272    * Address identifier used by the communicator.
273    */
274   uint32_t aid;
275
276   /**
277    * Network type offered by this address.
278    */
279   enum GNUNET_ATS_Network_Type nt;
280
281 };
282
283
284 /**
285  * Client connected to the transport service.
286  */
287 struct TransportClient
288 {
289
290   /**
291    * Kept in a DLL.
292    */
293   struct TransportClient *next;
294
295   /**
296    * Kept in a DLL.
297    */
298   struct TransportClient *prev;
299
300   /**
301    * Handle to the client.
302    */
303   struct GNUNET_SERVICE_Client *client;
304
305   /**
306    * Message queue to the client.
307    */
308   struct GNUNET_MQ_Handle *mq;
309
310   /**
311    * What type of client is this?
312    */
313   enum ClientType type;
314
315   union
316   {
317
318     /**
319      * Information for @e type #CT_CORE.
320      */
321     struct {
322
323       /**
324        * Head of list of messages pending for this client.
325        */
326       struct PendingMessage *pending_msg_head;
327
328       /**
329        * Tail of list of messages pending for this client.
330        */
331       struct PendingMessage *pending_msg_tail;
332
333     } core;
334
335     /**
336      * Information for @e type #CT_MONITOR.
337      */
338     struct {
339
340       /**
341        * Peer identity to monitor the addresses of.
342        * Zero to monitor all neighbours.  Valid if
343        * @e type is #CT_MONITOR.
344        */
345       struct GNUNET_PeerIdentity peer;
346
347       /**
348        * Is this a one-shot monitor?
349        */
350       int one_shot;
351
352     } monitor;
353
354
355     /**
356      * Information for @e type #CT_COMMUNICATOR.
357      */
358     struct {
359       /**
360        * If @e type is #CT_COMMUNICATOR, this communicator
361        * supports communicating using these addresses.
362        */
363       char *address_prefix;
364
365       /**
366        * Head of DLL of queues offered by this communicator.
367        */
368       struct Queue *queue_head;
369
370       /**
371        * Tail of DLL of queues offered by this communicator.
372        */
373       struct Queue *queue_tail;
374
375       /**
376        * Head of list of the addresses of this peer offered by this communicator.
377        */
378       struct AddressListEntry *addr_head;
379
380       /**
381        * Tail of list of the addresses of this peer offered by this communicator.
382        */
383       struct AddressListEntry *addr_tail;
384
385     } communicator;
386
387   } details;
388
389 };
390
391
392 /**
393  * Head of linked list of all clients to this service.
394  */
395 static struct TransportClient *clients_head;
396
397 /**
398  * Tail of linked list of all clients to this service.
399  */
400 static struct TransportClient *clients_tail;
401
402 /**
403  * Statistics handle.
404  */
405 struct GNUNET_STATISTICS_Handle *GST_stats;
406
407 /**
408  * Configuration handle.
409  */
410 const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
411
412 /**
413  * Our public key.
414  */
415 struct GNUNET_PeerIdentity GST_my_identity;
416
417 /**
418  * Our private key.
419  */
420 struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
421
422 /**
423  * Map from PIDs to `struct Neighbour` entries.  A peer is
424  * a neighbour if we have an MQ to it from some communicator.
425  */
426 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
427
428
429 /**
430  * Lookup neighbour record for peer @a pid.
431  *
432  * @param pid neighbour to look for
433  * @return NULL if we do not have this peer as a neighbour
434  */
435 static struct Neighbour *
436 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
437 {
438   return GNUNET_CONTAINER_multipeermap_get (neighbours,
439                                             pid);
440 }
441
442
443 /**
444  * Called whenever a client connects.  Allocates our
445  * data structures associated with that client.
446  *
447  * @param cls closure, NULL
448  * @param client identification of the client
449  * @param mq message queue for the client
450  * @return our `struct TransportClient`
451  */
452 static void *
453 client_connect_cb (void *cls,
454                    struct GNUNET_SERVICE_Client *client,
455                    struct GNUNET_MQ_Handle *mq)
456 {
457   struct TransportClient *tc;
458
459   tc = GNUNET_new (struct TransportClient);
460   tc->client = client;
461   tc->mq = mq;
462   GNUNET_CONTAINER_DLL_insert (clients_head,
463                                clients_tail,
464                                tc);
465   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
466               "Client %p connected\n",
467               tc);
468   return tc;
469 }
470
471
472 /**
473  * Called whenever a client is disconnected.  Frees our
474  * resources associated with that client.
475  *
476  * @param cls closure, NULL
477  * @param client identification of the client
478  * @param app_ctx our `struct TransportClient`
479  */
480 static void
481 client_disconnect_cb (void *cls,
482                       struct GNUNET_SERVICE_Client *client,
483                       void *app_ctx)
484 {
485   struct TransportClient *tc = app_ctx;
486
487   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
488               "Client %p disconnected, cleaning up.\n",
489               tc);
490   GNUNET_CONTAINER_DLL_remove (clients_head,
491                                clients_tail,
492                                tc);
493   switch (tc->type)
494   {
495   case CT_NONE:
496     break;
497   case CT_CORE:
498     {
499       struct PendingMessage *pm;
500
501       while (NULL != (pm = tc->details.core.pending_msg_head))
502       {
503         GNUNET_CONTAINER_MDLL_remove (client,
504                                       tc->details.core.pending_msg_head,
505                                       tc->details.core.pending_msg_tail,
506                                       pm);
507         pm->client = NULL;
508       }
509     }
510     break;
511   case CT_MONITOR:
512     break;
513   case CT_COMMUNICATOR:
514     GNUNET_free (tc->details.communicator.address_prefix);
515     break;
516   }
517   GNUNET_free (tc);
518 }
519
520
521 /**
522  * Initialize a "CORE" client.  We got a start message from this
523  * client, so add it to the list of clients for broadcasting of
524  * inbound messages.
525  *
526  * @param cls the client
527  * @param start the start message that was sent
528  */
529 static void
530 handle_client_start (void *cls,
531                      const struct StartMessage *start)
532 {
533   struct TransportClient *tc = cls;
534   uint32_t options;
535
536   options = ntohl (start->options);
537   if ( (0 != (1 & options)) &&
538        (0 !=
539         memcmp (&start->self,
540                 &GST_my_identity,
541                 sizeof (struct GNUNET_PeerIdentity)) ) )
542   {
543     /* client thinks this is a different peer, reject */
544     GNUNET_break (0);
545     GNUNET_SERVICE_client_drop (tc->client);
546     return;
547   }
548   if (CT_NONE != tc->type)
549   {
550     GNUNET_break (0);
551     GNUNET_SERVICE_client_drop (tc->client);
552     return;
553   }
554   tc->type = CT_CORE;
555   GNUNET_SERVICE_client_continue (tc->client);
556 }
557
558
559 /**
560  * Client asked for transmission to a peer.  Process the request.
561  *
562  * @param cls the client
563  * @param obm the send message that was sent
564  */
565 static int
566 check_client_send (void *cls,
567                    const struct OutboundMessage *obm)
568 {
569   struct TransportClient *tc = cls;
570   uint16_t size;
571   const struct GNUNET_MessageHeader *obmm;
572
573   if (CT_CORE != tc->type)
574   {
575     GNUNET_break (0);
576     return GNUNET_SYSERR;
577   }
578   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
579   if (size < sizeof (struct GNUNET_MessageHeader))
580   {
581     GNUNET_break (0);
582     return GNUNET_SYSERR;
583   }
584   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
585   if (size != ntohs (obmm->size))
586   {
587     GNUNET_break (0);
588     return GNUNET_SYSERR;
589   }
590   return GNUNET_OK;
591 }
592
593
594 /**
595  * Send a response to the @a pm that we have processed a
596  * "send" request with status @a success. We
597  * transmitted @a bytes_physical on the actual wire.
598  * Sends a confirmation to the "core" client responsible
599  * for the original request and free's @a pm.
600  *
601  * @param pm handle to the original pending message
602  * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
603  *          for transmission failure
604  * @param bytes_physical amount of bandwidth consumed
605  */
606 static void
607 client_send_response (struct PendingMessage *pm,
608                       int success,
609                       uint32_t bytes_physical)
610 {
611   struct TransportClient *tc = pm->client;
612   struct Neighbour *target = pm->target;
613   struct GNUNET_MQ_Envelope *env;
614   struct SendOkMessage *som;
615
616   if (NULL != tc)
617   {
618     env = GNUNET_MQ_msg (som,
619                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
620     som->success = htonl ((uint32_t) success);
621     som->bytes_msg = htonl (pm->bytes_msg);
622     som->bytes_physical = htonl (bytes_physical);
623     som->peer = target->pid;
624     GNUNET_MQ_send (tc->mq,
625                     env);
626     GNUNET_CONTAINER_MDLL_remove (client,
627                                   tc->details.core.pending_msg_head,
628                                   tc->details.core.pending_msg_tail,
629                                   pm);
630   }
631   GNUNET_CONTAINER_MDLL_remove (neighbour,
632                                 target->pending_msg_head,
633                                 target->pending_msg_tail,
634                                 pm);
635   GNUNET_free (pm);
636 }
637
638
639 /**
640  * Client asked for transmission to a peer.  Process the request.
641  *
642  * @param cls the client
643  * @param obm the send message that was sent
644  */
645 static void
646 handle_client_send (void *cls,
647                     const struct OutboundMessage *obm)
648 {
649   struct TransportClient *tc = cls;
650   struct PendingMessage *pm;
651   const struct GNUNET_MessageHeader *obmm;
652   struct Neighbour *target;
653   uint32_t bytes_msg;
654
655   GNUNET_assert (CT_CORE == tc->type);
656   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
657   bytes_msg = ntohs (obmm->size);
658   target = lookup_neighbour (&obm->peer);
659   if (NULL == target)
660   {
661     /* Failure: don't have this peer as a neighbour (anymore).
662        Might have gone down asynchronously, so this is NOT
663        a protocol violation by CORE. Still count the event,
664        as this should be rare. */
665     struct GNUNET_MQ_Envelope *env;
666     struct SendOkMessage *som;
667
668     env = GNUNET_MQ_msg (som,
669                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
670     som->success = htonl (GNUNET_SYSERR);
671     som->bytes_msg = htonl (bytes_msg);
672     som->bytes_physical = htonl (0);
673     som->peer = obm->peer;
674     GNUNET_MQ_send (tc->mq,
675                     env);
676     GNUNET_SERVICE_client_continue (tc->client);
677     GNUNET_STATISTICS_update (GST_stats,
678                               "# messages dropped (neighbour unknown)",
679                               1,
680                               GNUNET_NO);
681     return;
682   }
683   pm = GNUNET_new (struct PendingMessage);
684   pm->client = tc;
685   pm->target = target;
686   pm->bytes_msg = bytes_msg;
687   GNUNET_CONTAINER_MDLL_insert (neighbour,
688                                 target->pending_msg_head,
689                                 target->pending_msg_tail,
690                                 pm);
691   GNUNET_CONTAINER_MDLL_insert (client,
692                                 tc->details.core.pending_msg_head,
693                                 tc->details.core.pending_msg_tail,
694                                 pm);
695   // FIXME: do the work, continuation with:
696   client_send_response (pm,
697                         GNUNET_NO,
698                         0);
699 }
700
701
702 /**
703  * Communicator started.  Test message is well-formed.
704  *
705  * @param cls the client
706  * @param cam the send message that was sent
707  */
708 static int
709 check_communicator_available (void *cls,
710                               const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
711 {
712   struct TransportClient *tc = cls;
713   const char *addr;
714   uint16_t size;
715
716   if (CT_NONE != tc->type)
717   {
718     GNUNET_break (0);
719     return GNUNET_SYSERR;
720   }
721   tc->type = CT_COMMUNICATOR;
722   size = ntohs (cam->header.size) - sizeof (*cam);
723   if (0 == size)
724     return GNUNET_OK; /* receive-only communicator */
725   addr = (const char *) &cam[1];
726   if ('\0' != addr[size-1])
727   {
728     GNUNET_break (0);
729     return GNUNET_SYSERR;
730   }
731   return GNUNET_OK;
732 }
733
734
735 /**
736  * Communicator started.  Process the request.
737  *
738  * @param cls the client
739  * @param cam the send message that was sent
740  */
741 static void
742 handle_communicator_available (void *cls,
743                                const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
744 {
745   struct TransportClient *tc = cls;
746   uint16_t size;
747
748   size = ntohs (cam->header.size) - sizeof (*cam);
749   if (0 == size)
750     return; /* receive-only communicator */
751   tc->details.communicator.address_prefix = GNUNET_strdup ((const char *) &cam[1]);
752   GNUNET_SERVICE_client_continue (tc->client);
753 }
754
755
756 /**
757  * Address of our peer added.  Test message is well-formed.
758  *
759  * @param cls the client
760  * @param aam the send message that was sent
761  */
762 static int
763 check_add_address (void *cls,
764                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
765 {
766   struct TransportClient *tc = cls;
767   const char *addr;
768   uint16_t size;
769
770   if (CT_COMMUNICATOR != tc->type)
771   {
772     GNUNET_break (0);
773     return GNUNET_SYSERR;
774   }
775   size = ntohs (aam->header.size) - sizeof (*aam);
776   if (0 == size)
777   {
778     GNUNET_break (0);
779     return GNUNET_SYSERR;
780   }
781   addr = (const char *) &aam[1];
782   if ('\0' != addr[size-1])
783   {
784     GNUNET_break (0);
785     return GNUNET_SYSERR;
786   }
787   return GNUNET_OK;
788 }
789
790
791 /**
792  * Address of our peer added.  Process the request.
793  *
794  * @param cls the client
795  * @param aam the send message that was sent
796  */
797 static void
798 handle_add_address (void *cls,
799                     const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
800 {
801   struct TransportClient *tc = cls;
802   struct AddressListEntry *ale;
803   size_t slen;
804
805   slen = ntohs (aam->header.size) - sizeof (*aam);
806   ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
807   ale->tc = tc;
808   ale->address = (const char *) &ale[1];
809   ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
810   ale->aid = aam->aid;
811   ale->nt = (enum GNUNET_ATS_Network_Type) ntohl (aam->nt);
812   memcpy (&ale[1],
813           &aam[1],
814           slen);
815   GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
816                                tc->details.communicator.addr_tail,
817                                ale);
818   // FIXME: notify somebody?!
819   GNUNET_SERVICE_client_continue (tc->client);
820 }
821
822
823 /**
824  * Address of our peer deleted.  Process the request.
825  *
826  * @param cls the client
827  * @param dam the send message that was sent
828  */
829 static void
830 handle_del_address (void *cls,
831                     const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
832 {
833   struct TransportClient *tc = cls;
834
835   if (CT_COMMUNICATOR != tc->type)
836   {
837     GNUNET_break (0);
838     GNUNET_SERVICE_client_drop (tc->client);
839     return;
840   }
841   for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
842        NULL != ale;
843        ale = ale->next)
844   {
845     if (dam->aid != ale->aid)
846       continue;
847     GNUNET_assert (ale->tc == tc);
848     GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
849                                  tc->details.communicator.addr_tail,
850                                  ale);
851     // FIXME: notify somebody?
852     GNUNET_free (ale);
853     GNUNET_SERVICE_client_continue (tc->client);
854   }
855   GNUNET_break (0);
856   GNUNET_SERVICE_client_drop (tc->client);
857 }
858
859
860 /**
861  * Client notified us about transmission from a peer.  Process the request.
862  *
863  * @param cls the client
864  * @param obm the send message that was sent
865  */
866 static int
867 check_incoming_msg (void *cls,
868                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
869 {
870   struct TransportClient *tc = cls;
871   uint16_t size;
872   const struct GNUNET_MessageHeader *obmm;
873
874   if (CT_COMMUNICATOR != tc->type)
875   {
876     GNUNET_break (0);
877     return GNUNET_SYSERR;
878   }
879   size = ntohs (im->header.size) - sizeof (*im);
880   if (size < sizeof (struct GNUNET_MessageHeader))
881   {
882     GNUNET_break (0);
883     return GNUNET_SYSERR;
884   }
885   obmm = (const struct GNUNET_MessageHeader *) &im[1];
886   if (size != ntohs (obmm->size))
887   {
888     GNUNET_break (0);
889     return GNUNET_SYSERR;
890   }
891   return GNUNET_OK;
892 }
893
894
895 /**
896  * Incoming meessage.  Process the request.
897  *
898  * @param cls the client
899  * @param im the send message that was received
900  */
901 static void
902 handle_incoming_msg (void *cls,
903                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
904 {
905   struct TransportClient *tc = cls;
906
907   GNUNET_SERVICE_client_continue (tc->client);
908 }
909
910
911 /**
912  * New queue became available.  Check message.
913  *
914  * @param cls the client
915  * @param aqm the send message that was sent
916  */
917 static int
918 check_add_queue_message (void *cls,
919                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
920 {
921   struct TransportClient *tc = cls;
922   const char *addr;
923   uint16_t size;
924
925   if (CT_COMMUNICATOR != tc->type)
926   {
927     GNUNET_break (0);
928     return GNUNET_SYSERR;
929   }
930   size = ntohs (aqm->header.size) - sizeof (*aqm);
931   if (0 == size)
932   {
933     GNUNET_break (0);
934     return GNUNET_SYSERR;
935   }
936   addr = (const char *) &aqm[1];
937   if ('\0' != addr[size-1])
938   {
939     GNUNET_break (0);
940     return GNUNET_SYSERR;
941   }
942   return GNUNET_OK;
943 }
944
945
946 /**
947  * New queue became available.  Process the request.
948  *
949  * @param cls the client
950  * @param aqm the send message that was sent
951  */
952 static void
953 handle_add_queue_message (void *cls,
954                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
955 {
956   struct TransportClient *tc = cls;
957   struct Queue *queue;
958   struct Neighbour *neighbour;
959   const char *addr;
960   uint16_t addr_len;
961
962   neighbour = lookup_neighbour (&aqm->receiver);
963   if (NULL == neighbour)
964   {
965     neighbour = GNUNET_new (struct Neighbour);
966     neighbour->pid = aqm->receiver;
967     GNUNET_assert (GNUNET_OK ==
968                    GNUNET_CONTAINER_multipeermap_put (neighbours,
969                                                       &neighbour->pid,
970                                                       neighbour,
971                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
972     // FIXME: notify ATS/COREs/monitors!
973   }
974   addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
975   addr = (const char *) &aqm[1];
976
977   queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
978   queue->qid = aqm->qid;
979   queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt);
980   queue->tc = tc;
981   queue->neighbour = neighbour;
982   queue->address = (const char *) &queue[1];
983   memcpy (&queue[1],
984           addr,
985           addr_len);
986   GNUNET_CONTAINER_MDLL_insert (neighbour,
987                                 neighbour->queue_head,
988                                 neighbour->queue_tail,
989                                 queue);
990   GNUNET_CONTAINER_MDLL_insert (client,
991                                 tc->details.communicator.queue_head,
992                                 tc->details.communicator.queue_tail,
993                                 queue);
994   // FIXME: possibly transmit queued messages?
995   GNUNET_SERVICE_client_continue (tc->client);
996 }
997
998
999 /**
1000  * Release memory used by @a neighbour.
1001  *
1002  * @param neighbour neighbour entry to free
1003  */
1004 static void
1005 free_neighbour (struct Neighbour *neighbour)
1006 {
1007   GNUNET_assert (NULL == neighbour->queue_head);
1008   GNUNET_assert (GNUNET_YES ==
1009                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
1010                                                        &neighbour->pid,
1011                                                        neighbour));
1012   GNUNET_free (neighbour);
1013 }
1014
1015
1016 /**
1017  * Queue to a peer went down.  Process the request.
1018  *
1019  * @param cls the client
1020  * @param dqm the send message that was sent
1021  */
1022 static void
1023 handle_del_queue_message (void *cls,
1024                           const struct GNUNET_TRANSPORT_DelQueueMessage *dqm)
1025 {
1026   struct TransportClient *tc = cls;
1027
1028   if (CT_COMMUNICATOR != tc->type)
1029   {
1030     GNUNET_break (0);
1031     GNUNET_SERVICE_client_drop (tc->client);
1032     return;
1033   }
1034   for (struct Queue *queue = tc->details.communicator.queue_head;
1035        NULL != queue;
1036        queue = queue->next_client)
1037   {
1038     struct Neighbour *neighbour = queue->neighbour;
1039
1040     if ( (dqm->qid != queue->qid) ||
1041          (0 != memcmp (&dqm->receiver,
1042                        &neighbour->pid,
1043                        sizeof (struct GNUNET_PeerIdentity))) )
1044       continue;
1045     GNUNET_CONTAINER_MDLL_remove (neighbour,
1046                                   neighbour->queue_head,
1047                                   neighbour->queue_tail,
1048                                   queue);
1049     GNUNET_CONTAINER_MDLL_remove (client,
1050                                   tc->details.communicator.queue_head,
1051                                   tc->details.communicator.queue_tail,
1052                                   queue);
1053     GNUNET_free (queue);
1054     if (NULL == neighbour->queue_head)
1055     {
1056       // FIXME: notify cores/monitors!
1057       free_neighbour (neighbour);
1058     }
1059     GNUNET_SERVICE_client_continue (tc->client);
1060     return;
1061   }
1062   GNUNET_break (0);
1063   GNUNET_SERVICE_client_drop (tc->client);
1064 }
1065
1066
1067 /**
1068  * Message was transmitted.  Process the request.
1069  *
1070  * @param cls the client
1071  * @param sma the send message that was sent
1072  */
1073 static void
1074 handle_send_message_ack (void *cls,
1075                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
1076 {
1077   struct TransportClient *tc = cls;
1078
1079   if (CT_COMMUNICATOR != tc->type)
1080   {
1081     GNUNET_break (0);
1082     GNUNET_SERVICE_client_drop (tc->client);
1083     return;
1084   }
1085   GNUNET_SERVICE_client_continue (tc->client);
1086 }
1087
1088
1089 /**
1090  * Initialize a monitor client.
1091  *
1092  * @param cls the client
1093  * @param start the start message that was sent
1094  */
1095 static void
1096 handle_monitor_start (void *cls,
1097                      const struct GNUNET_TRANSPORT_MonitorStart *start)
1098 {
1099   struct TransportClient *tc = cls;
1100
1101   if (CT_NONE != tc->type)
1102   {
1103     GNUNET_break (0);
1104     GNUNET_SERVICE_client_drop (tc->client);
1105     return;
1106   }
1107   tc->type = CT_MONITOR;
1108   tc->details.monitor.peer = start->peer;
1109   tc->details.monitor.one_shot = ntohl (start->one_shot);
1110   // FIXME: do work!
1111   GNUNET_SERVICE_client_continue (tc->client);
1112 }
1113
1114
1115 /**
1116  * Free neighbour entry.
1117  *
1118  * @param cls NULL
1119  * @param pid unused
1120  * @param value a `struct Neighbour`
1121  * @return #GNUNET_OK (always)
1122  */
1123 static int
1124 free_neighbour_cb (void *cls,
1125                    const struct GNUNET_PeerIdentity *pid,
1126                    void *value)
1127 {
1128   struct Neighbour *neighbour = value;
1129
1130   (void) cls;
1131   (void) pid;
1132   GNUNET_break (0); // should this ever happen?
1133   free_neighbour (neighbour);
1134
1135   return GNUNET_OK;
1136 }
1137
1138
1139 /**
1140  * Function called when the service shuts down.  Unloads our plugins
1141  * and cancels pending validations.
1142  *
1143  * @param cls closure, unused
1144  */
1145 static void
1146 do_shutdown (void *cls)
1147 {
1148   (void) cls;
1149
1150   if (NULL != GST_stats)
1151   {
1152     GNUNET_STATISTICS_destroy (GST_stats,
1153                                GNUNET_NO);
1154     GST_stats = NULL;
1155   }
1156   if (NULL != GST_my_private_key)
1157   {
1158     GNUNET_free (GST_my_private_key);
1159     GST_my_private_key = NULL;
1160   }
1161   GNUNET_CONTAINER_multipeermap_iterate (neighbours,
1162                                          &free_neighbour_cb,
1163                                          NULL);
1164   GNUNET_CONTAINER_multipeermap_destroy (neighbours);
1165 }
1166
1167
1168 /**
1169  * Initiate transport service.
1170  *
1171  * @param cls closure
1172  * @param c configuration to use
1173  * @param service the initialized service
1174  */
1175 static void
1176 run (void *cls,
1177      const struct GNUNET_CONFIGURATION_Handle *c,
1178      struct GNUNET_SERVICE_Handle *service)
1179 {
1180   (void) cls;
1181   /* setup globals */
1182   GST_cfg = c;
1183   neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
1184                                                      GNUNET_YES);
1185   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
1186   if (NULL == GST_my_private_key)
1187   {
1188     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1189                 _("Transport service is lacking key configuration settings. Exiting.\n"));
1190     GNUNET_SCHEDULER_shutdown ();
1191     return;
1192   }
1193   GNUNET_CRYPTO_eddsa_key_get_public (GST_my_private_key,
1194                                       &GST_my_identity.public_key);
1195   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
1196              "My identity is `%s'\n",
1197              GNUNET_i2s_full (&GST_my_identity));
1198
1199   GST_stats = GNUNET_STATISTICS_create ("transport",
1200                                         GST_cfg);
1201   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
1202                                  NULL);
1203   /* start subsystems */
1204 }
1205
1206
1207 /**
1208  * Define "main" method using service macro.
1209  */
1210 GNUNET_SERVICE_MAIN
1211 ("transport",
1212  GNUNET_SERVICE_OPTION_NONE,
1213  &run,
1214  &client_connect_cb,
1215  &client_disconnect_cb,
1216  NULL,
1217  /* communication with core */
1218  GNUNET_MQ_hd_fixed_size (client_start,
1219                           GNUNET_MESSAGE_TYPE_TRANSPORT_START,
1220                           struct StartMessage,
1221                           NULL),
1222  GNUNET_MQ_hd_var_size (client_send,
1223                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
1224                         struct OutboundMessage,
1225                         NULL),
1226  /* communication with communicators */
1227  GNUNET_MQ_hd_var_size (communicator_available,
1228                         GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
1229                         struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
1230                         NULL),
1231  GNUNET_MQ_hd_var_size (add_address,
1232                         GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
1233                         struct GNUNET_TRANSPORT_AddAddressMessage,
1234                         NULL),
1235  GNUNET_MQ_hd_fixed_size (del_address,
1236                           GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
1237                           struct GNUNET_TRANSPORT_DelAddressMessage,
1238                           NULL),
1239  GNUNET_MQ_hd_var_size (incoming_msg,
1240                         GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
1241                         struct GNUNET_TRANSPORT_IncomingMessage,
1242                         NULL),
1243  GNUNET_MQ_hd_var_size (add_queue_message,
1244                         GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
1245                         struct GNUNET_TRANSPORT_AddQueueMessage,
1246                         NULL),
1247  GNUNET_MQ_hd_fixed_size (del_queue_message,
1248                           GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
1249                           struct GNUNET_TRANSPORT_DelQueueMessage,
1250                           NULL),
1251  GNUNET_MQ_hd_fixed_size (send_message_ack,
1252                           GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK,
1253                           struct GNUNET_TRANSPORT_SendMessageToAck,
1254                           NULL),
1255  /* communication with monitors */
1256  GNUNET_MQ_hd_fixed_size (monitor_start,
1257                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_START,
1258                           struct GNUNET_TRANSPORT_MonitorStart,
1259                           NULL),
1260  GNUNET_MQ_handler_end ());
1261
1262
1263 /* end of file gnunet-service-transport.c */