more work on tng
[oweals/gnunet.git] / src / transport / transport_api2_communication.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file transport/transport_api2_communication.c
23  * @brief implementation of the gnunet_transport_communication_service.h API
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_transport_communication_service.h"
30 #include "gnunet_ats_transport_service.h"
31 #include "transport.h"
32
33
34 /**
35  * How many messages do we keep at most in the queue to the
36  * transport service before we start to drop (default,
37  * can be changed via the configuration file).
38  */
39 #define DEFAULT_MAX_QUEUE_LENGTH 16
40
41
42 /**
43  * Information we track per packet to enable flow control.
44  */
45 struct FlowControl
46 {
47   /**
48    * Kept in a DLL.
49    */
50   struct FlowControl *next;
51
52   /**
53    * Kept in a DLL.
54    */
55   struct FlowControl *prev;
56
57   /**
58    * Function to call once the message was processed.
59    */
60   GNUNET_TRANSPORT_MessageCompletedCallback cb;
61
62   /**
63    * Closure for @e cb
64    */
65   void *cb_cls;
66
67   /**
68    * Which peer is this about?
69    */
70   struct GNUNET_PeerIdentity sender;
71
72   /**
73    * More-or-less unique ID for the message.
74    */
75   uint64_t id;
76 };
77
78
79 /**
80  * Information we track per message to tell the transport about
81  * success or failures.
82  */
83 struct AckPending
84 {
85   /**
86    * Kept in a DLL.
87    */
88   struct AckPending *next;
89
90   /**
91    * Kept in a DLL.
92    */
93   struct AckPending *prev;
94
95   /**
96    * Communicator this entry belongs to.
97    */
98   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
99
100   /**
101    * Which peer is this about?
102    */
103   struct GNUNET_PeerIdentity receiver;
104
105   /**
106    * More-or-less unique ID for the message.
107    */
108   uint64_t mid;
109 };
110
111
112 /**
113  * Opaque handle to the transport service for communicators.
114  */
115 struct GNUNET_TRANSPORT_CommunicatorHandle
116 {
117   /**
118    * Head of DLL of addresses this communicator offers to the transport service.
119    */
120   struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
121
122   /**
123    * Tail of DLL of addresses this communicator offers to the transport service.
124    */
125   struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
126
127   /**
128    * DLL of messages awaiting flow control confirmation (ack).
129    */
130   struct FlowControl *fc_head;
131
132   /**
133    * DLL of messages awaiting flow control confirmation (ack).
134    */
135   struct FlowControl *fc_tail;
136
137   /**
138    * DLL of messages awaiting transmission confirmation (ack).
139    */
140   struct AckPending *ap_head;
141
142   /**
143    * DLL of messages awaiting transmission confirmation (ack).
144    */
145   struct AckPending *ap_tail;
146
147   /**
148    * DLL of queues we offer.
149    */
150   struct GNUNET_TRANSPORT_QueueHandle *queue_head;
151
152   /**
153    * DLL of queues we offer.
154    */
155   struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
156
157   /**
158    * Our configuration.
159    */
160   const struct GNUNET_CONFIGURATION_Handle *cfg;
161
162   /**
163    * Config section to use.
164    */
165   const char *config_section;
166
167   /**
168    * Address prefix to use.
169    */
170   const char *addr_prefix;
171
172   /**
173    * Function to call when the transport service wants us to initiate
174    * a communication channel with another peer.
175    */
176   GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
177
178   /**
179    * Closure for @e mq_init.
180    */
181   void *mq_init_cls;
182
183   /**
184    * Function to call when the transport service receives messages
185    * for a communicator (i.e. for NAT traversal or for non-bidirectional
186    * communicators).
187    */
188   GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
189
190   /**
191    * Closure for @e notify_Cb.
192    */
193   void *notify_cb_cls;
194
195   /**
196    * Queue to talk to the transport service.
197    */
198   struct GNUNET_MQ_Handle *mq;
199
200   /**
201    * Maximum permissable queue length.
202    */
203   unsigned long long max_queue_length;
204
205   /**
206    * Flow-control identifier generator.
207    */
208   uint64_t fc_gen;
209
210   /**
211    * Internal UUID for the address used in communication with the
212    * transport service.
213    */
214   uint32_t aid_gen;
215
216   /**
217    * Queue identifier generator.
218    */
219   uint32_t queue_gen;
220
221   /**
222    * Characteristics of the communicator.
223    */
224   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
225
226 };
227
228
229 /**
230  * Handle returned to identify the internal data structure the transport
231  * API has created to manage a message queue to a particular peer.
232  */
233 struct GNUNET_TRANSPORT_QueueHandle
234 {
235
236   /**
237    * Kept in a DLL.
238    */
239   struct GNUNET_TRANSPORT_QueueHandle *next;
240
241   /**
242    * Kept in a DLL.
243    */
244   struct GNUNET_TRANSPORT_QueueHandle *prev;
245
246   /**
247    * Handle this queue belongs to.
248    */
249   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
250
251   /**
252    * Address used by the communication queue.
253    */
254   char *address;
255
256   /**
257    * The queue itself.
258    */
259   struct GNUNET_MQ_Handle *mq;
260
261   /**
262    * Which peer we can communciate with.
263    */
264   struct GNUNET_PeerIdentity peer;
265
266   /**
267    * Network type of the communciation queue.
268    */
269   enum GNUNET_NetworkType nt;
270
271   /**
272    * Communication status of the queue.
273    */
274   enum GNUNET_TRANSPORT_ConnectionStatus cs;
275
276   /**
277    * ID for this queue when talking to the transport service.
278    */
279   uint32_t queue_id;
280
281   /**
282    * Maximum transmission unit for the queue.
283    */
284   uint32_t mtu;
285
286 };
287
288
289 /**
290  * Internal representation of an address a communicator is
291  * currently providing for the transport service.
292  */
293 struct GNUNET_TRANSPORT_AddressIdentifier
294 {
295
296   /**
297    * Kept in a DLL.
298    */
299   struct GNUNET_TRANSPORT_AddressIdentifier *next;
300
301   /**
302    * Kept in a DLL.
303    */
304   struct GNUNET_TRANSPORT_AddressIdentifier *prev;
305
306   /**
307    * Transport handle where the address was added.
308    */
309   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
310
311   /**
312    * The actual address.
313    */
314   char *address;
315
316   /**
317    * When does the address expire? (Expected lifetime of the
318    * address.)
319    */
320   struct GNUNET_TIME_Relative expiration;
321
322   /**
323    * Internal UUID for the address used in communication with the
324    * transport service.
325    */
326   uint32_t aid;
327
328   /**
329    * Network type for the address.
330    */
331   enum GNUNET_NetworkType nt;
332
333 };
334
335
336 /**
337  * (re)connect our communicator to the transport service
338  *
339  * @param ch handle to reconnect
340  */
341 static void
342 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
343
344
345 /**
346  * Send message to the transport service about address @a ai
347  * being now available.
348  *
349  * @param ai address to add
350  */
351 static void
352 send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
353 {
354   struct GNUNET_MQ_Envelope *env;
355   struct GNUNET_TRANSPORT_AddAddressMessage *aam;
356
357   if (NULL == ai->ch->mq)
358     return;
359   env = GNUNET_MQ_msg_extra (aam,
360                              strlen (ai->address) + 1,
361                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
362   aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
363   aam->nt = htonl ((uint32_t) ai->nt);
364   memcpy (&aam[1],
365           ai->address,
366           strlen (ai->address) + 1);
367   GNUNET_MQ_send (ai->ch->mq,
368                   env);
369 }
370
371
372 /**
373  * Send message to the transport service about address @a ai
374  * being no longer available.
375  *
376  * @param ai address to delete
377  */
378 static void
379 send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
380 {
381   struct GNUNET_MQ_Envelope *env;
382   struct GNUNET_TRANSPORT_DelAddressMessage *dam;
383
384   if (NULL == ai->ch->mq)
385     return;
386   env = GNUNET_MQ_msg (dam,
387                        GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
388   dam->aid = htonl (ai->aid);
389   GNUNET_MQ_send (ai->ch->mq,
390                   env);
391 }
392
393
394 /**
395  * Send message to the transport service about queue @a qh
396  * being now available.
397  *
398  * @param qh queue to add
399  */
400 static void
401 send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
402 {
403   struct GNUNET_MQ_Envelope *env;
404   struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
405
406   if (NULL == qh->ch->mq)
407     return;
408   env = GNUNET_MQ_msg_extra (aqm,
409                              strlen (qh->address) + 1,
410                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
411   aqm->qid = htonl (qh->queue_id);
412   aqm->receiver = qh->peer;
413   aqm->nt = htonl ((uint32_t) qh->nt);
414   aqm->mtu = htonl (qh->mtu);
415   aqm->cs = htonl ((uint32_t) qh->cs);
416   memcpy (&aqm[1],
417           qh->address,
418           strlen (qh->address) + 1);
419   GNUNET_MQ_send (qh->ch->mq,
420                   env);
421 }
422
423
424 /**
425  * Send message to the transport service about queue @a qh
426  * being no longer available.
427  *
428  * @param qh queue to delete
429  */
430 static void
431 send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
432 {
433   struct GNUNET_MQ_Envelope *env;
434   struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
435
436   if (NULL == qh->ch->mq)
437     return;
438   env = GNUNET_MQ_msg (dqm,
439                        GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
440   dqm->qid = htonl (qh->queue_id);
441   dqm->receiver = qh->peer;
442   GNUNET_MQ_send (qh->ch->mq,
443                   env);
444 }
445
446
447 /**
448  * Disconnect from the transport service.  Purges
449  * all flow control entries as we will no longer receive
450  * the ACKs.  Purges the ack pending entries as the
451  * transport will no longer expect the confirmations.
452  *
453  * @param ch service to disconnect from
454  */
455 static void
456 disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
457 {
458   struct FlowControl *fcn;
459   struct AckPending *apn;
460
461   for (struct FlowControl *fc = ch->fc_head;
462        NULL != fc;
463        fc = fcn)
464   {
465     fcn = fc->next;
466     GNUNET_CONTAINER_DLL_remove (ch->fc_head,
467                                  ch->fc_tail,
468                                  fc);
469     fc->cb (fc->cb_cls,
470             GNUNET_SYSERR);
471     GNUNET_free (fc);
472   }
473   for (struct AckPending *ap = ch->ap_head;
474        NULL != ap;
475        ap = apn)
476   {
477     apn = ap->next;
478     GNUNET_CONTAINER_DLL_remove (ch->ap_head,
479                                  ch->ap_tail,
480                                  ap);
481     GNUNET_free (ap);
482   }
483   if (NULL == ch->mq)
484     return;
485   GNUNET_MQ_destroy (ch->mq);
486   ch->mq = NULL;
487 }
488
489
490 /**
491  * Function called on MQ errors.
492  */
493 static void
494 error_handler (void *cls,
495                enum GNUNET_MQ_Error error)
496 {
497   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
498
499   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
500               "MQ failure %d, reconnecting to transport service.\n",
501               error);
502   disconnect (ch);
503   /* TODO: maybe do this with exponential backoff/delay */
504   reconnect (ch);
505 }
506
507
508 /**
509  * Transport service acknowledged a message we gave it
510  * (with flow control enabled). Tell the communicator.
511  *
512  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
513  * @param incoming_ack the ack
514  */
515 static void
516 handle_incoming_ack (void *cls,
517                      const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
518 {
519   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
520
521   for (struct FlowControl *fc = ch->fc_head;
522        NULL != fc;
523        fc = fc->next)
524   {
525     if ( (fc->id == incoming_ack->fc_id) &&
526          (0 == memcmp (&fc->sender,
527                        &incoming_ack->sender,
528                        sizeof (struct GNUNET_PeerIdentity))) )
529     {
530       GNUNET_CONTAINER_DLL_remove (ch->fc_head,
531                                    ch->fc_tail,
532                                    fc);
533       fc->cb (fc->cb_cls,
534               GNUNET_OK);
535       GNUNET_free (fc);
536       return;
537     }
538   }
539   GNUNET_break (0);
540   disconnect (ch);
541   /* TODO: maybe do this with exponential backoff/delay */
542   reconnect (ch);
543 }
544
545
546 /**
547  * Transport service wants us to create a queue. Check if @a cq
548  * is well-formed.
549  *
550  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
551  * @param cq the queue creation request
552  * @return #GNUNET_OK if @a smt is well-formed
553  */
554 static int
555 check_create_queue (void *cls,
556                     const struct GNUNET_TRANSPORT_CreateQueue *cq)
557 {
558   uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
559   const char *addr = (const char *) &cq[1];
560
561   (void) cls;
562   if ( (0 == len) ||
563        ('\0' != addr[len-1]) )
564   {
565     GNUNET_break (0);
566     return GNUNET_SYSERR;
567   }
568   return GNUNET_OK;
569 }
570
571
572 /**
573  * Transport service wants us to create a queue. Tell the communicator.
574  *
575  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
576  * @param cq the queue creation request
577  */
578 static void
579 handle_create_queue (void *cls,
580                      const struct GNUNET_TRANSPORT_CreateQueue *cq)
581 {
582   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
583   const char *addr = (const char *) &cq[1];
584   struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
585   struct GNUNET_MQ_Envelope *env;
586
587   if (GNUNET_OK !=
588       ch->mq_init (ch->mq_init_cls,
589                    &cq->receiver,
590                    addr))
591   {
592     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
593                 "Address `%s' invalid for this communicator\n",
594                 addr);
595     env = GNUNET_MQ_msg (cqr,
596                          GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
597   }
598   else
599   {
600     env = GNUNET_MQ_msg (cqr,
601                          GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
602   }
603   cqr->request_id = cq->request_id;
604   GNUNET_MQ_send (ch->mq,
605                   env);
606 }
607
608
609 /**
610  * Transport service wants us to send a message. Check if @a smt
611  * is well-formed.
612  *
613  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
614  * @param smt the transmission request
615  * @return #GNUNET_OK if @a smt is well-formed
616  */
617 static int
618 check_send_msg (void *cls,
619                 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
620 {
621   uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
622   const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
623
624   (void) cls;
625   if (ntohs (mh->size) != len)
626   {
627     GNUNET_break (0);
628     return GNUNET_SYSERR;
629   }
630   return GNUNET_OK;
631 }
632
633
634 /**
635  * Notify transport service about @a status of a message with
636  * @a mid sent to @a receiver.
637  *
638  * @param ch handle
639  * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
640  * @param receiver which peer was the receiver
641  * @param mid message that the ack is about
642  */
643 static void
644 send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
645           int status,
646           const struct GNUNET_PeerIdentity *receiver,
647           uint64_t mid)
648 {
649   struct GNUNET_MQ_Envelope *env;
650   struct GNUNET_TRANSPORT_SendMessageToAck *ack;
651
652   env = GNUNET_MQ_msg (ack,
653                        GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
654   ack->status = htonl (status);
655   ack->mid = mid;
656   ack->receiver = *receiver;
657   GNUNET_MQ_send (ch->mq,
658                   env);
659 }
660
661
662 /**
663  * Message queue transmission by communicator was successful,
664  * notify transport service.
665  *
666  * @param cls an `struct AckPending *`
667  */
668 static void
669 send_ack_cb (void *cls)
670 {
671   struct AckPending *ap = cls;
672   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
673
674   GNUNET_CONTAINER_DLL_remove (ch->ap_head,
675                                ch->ap_tail,
676                                ap);
677   send_ack (ch,
678             GNUNET_OK,
679             &ap->receiver,
680             ap->mid);
681   GNUNET_free (ap);
682 }
683
684
685 /**
686  * Transport service wants us to send a message. Tell the communicator.
687  *
688  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
689  * @param smt the transmission request
690  */
691 static void
692 handle_send_msg (void *cls,
693                  const struct GNUNET_TRANSPORT_SendMessageTo *smt)
694 {
695   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
696   const struct GNUNET_MessageHeader *mh;
697   struct GNUNET_MQ_Envelope *env;
698   struct AckPending *ap;
699   struct GNUNET_TRANSPORT_QueueHandle *qh;
700
701   for (qh = ch->queue_head;NULL != qh; qh = qh->next)
702     if ( (qh->queue_id == smt->qid) &&
703          (0 == memcmp (&qh->peer,
704                        &smt->receiver,
705                        sizeof (struct GNUNET_PeerIdentity))) )
706       break;
707   if (NULL == qh)
708   {
709     /* queue is already gone, tell transport this one failed */
710     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
711                 "Transmission failed, queue no longer exists.\n");
712     send_ack (ch,
713               GNUNET_NO,
714               &smt->receiver,
715               smt->mid);
716     return;
717   }
718   ap = GNUNET_new (struct AckPending);
719   ap->ch = ch;
720   ap->receiver = smt->receiver;
721   ap->mid = smt->mid;
722   GNUNET_CONTAINER_DLL_insert (ch->ap_head,
723                                ch->ap_tail,
724                                ap);
725   mh = (const struct GNUNET_MessageHeader *) &smt[1];
726   env = GNUNET_MQ_msg_copy (mh);
727   GNUNET_MQ_notify_sent (env,
728                          &send_ack_cb,
729                          ap);
730   GNUNET_MQ_send (qh->mq,
731                   env);
732 }
733
734
735 /**
736  * (re)connect our communicator to the transport service
737  *
738  * @param ch handle to reconnect
739  */
740 static void
741 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
742 {
743   struct GNUNET_MQ_MessageHandler handlers[] = {
744     GNUNET_MQ_hd_fixed_size (incoming_ack,
745                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
746                              struct GNUNET_TRANSPORT_IncomingMessageAck,
747                              ch),
748     GNUNET_MQ_hd_var_size (create_queue,
749                            GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
750                            struct GNUNET_TRANSPORT_CreateQueue,
751                            ch),
752     GNUNET_MQ_hd_var_size (send_msg,
753                            GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
754                            struct GNUNET_TRANSPORT_SendMessageTo,
755                            ch),
756     // FIXME: handle backchannel notifications!
757     GNUNET_MQ_handler_end()
758   };
759   struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
760   struct GNUNET_MQ_Envelope *env;
761
762   ch->mq = GNUNET_CLIENT_connect (ch->cfg,
763                                   "transport",
764                                   handlers,
765                                   &error_handler,
766                                   ch);
767   if (NULL == ch->mq)
768     return;
769   env = GNUNET_MQ_msg_extra (cam,
770                              strlen (ch->addr_prefix) + 1,
771                              GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
772   cam->cc = htonl ((uint32_t) ch->cc);
773   memcpy (&cam[1],
774           ch->addr_prefix,
775           strlen (ch->addr_prefix) + 1);
776   GNUNET_MQ_send (ch->mq,
777                   env);
778   for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
779        NULL != ai;
780        ai = ai->next)
781     send_add_address (ai);
782   for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
783        NULL != qh;
784        qh = qh->next)
785     send_add_queue (qh);
786 }
787
788
789 /**
790  * Connect to the transport service.
791  *
792  * @param cfg configuration to use
793  * @param config_section section of the configuration to use for options
794  * @param addr_prefix address prefix for addresses supported by this
795  *        communicator, could be NULL for incoming-only communicators
796  * @param cc what characteristics does the communicator have?
797  * @param mtu maximum message size supported by communicator, 0 if
798  *            sending is not supported, SIZE_MAX for no MTU
799  * @param mq_init function to call to initialize a message queue given
800  *                the address of another peer, can be NULL if the
801  *                communicator only supports receiving messages
802  * @param mq_init_cls closure for @a mq_init
803  * @param notify_cb function to pass backchannel messages to communicator
804  * @param notify_cb_cls closure for @a notify_cb
805  * @return NULL on error
806  */
807 struct GNUNET_TRANSPORT_CommunicatorHandle *
808 GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
809                                        const char *config_section,
810                                        const char *addr_prefix,
811                                        enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
812                                        GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
813                                        void *mq_init_cls,
814                                        GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
815                                        void *notify_cb_cls)
816 {
817   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
818
819   ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
820   ch->cfg = cfg;
821   ch->config_section = config_section;
822   ch->addr_prefix = addr_prefix;
823   ch->mq_init = mq_init;
824   ch->mq_init_cls = mq_init_cls;
825   ch->notify_cb = notify_cb;
826   ch->notify_cb_cls = notify_cb_cls;
827   ch->cc = cc;
828   reconnect (ch);
829   if (GNUNET_OK !=
830       GNUNET_CONFIGURATION_get_value_number (cfg,
831                                              config_section,
832                                              "MAX_QUEUE_LENGTH",
833                                              &ch->max_queue_length))
834     ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
835   if (NULL == ch->mq)
836   {
837     GNUNET_free (ch);
838     return NULL;
839   }
840   return ch;
841 }
842
843
844 /**
845  * Disconnect from the transport service.
846  *
847  * @param ch handle returned from connect
848  */
849 void
850 GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
851 {
852   disconnect (ch);
853   while (NULL != ch->ai_head)
854   {
855     GNUNET_break (0); /* communicator forgot to remove address, warn! */
856     GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
857   }
858   GNUNET_free (ch);
859 }
860
861
862 /* ************************* Receiving *************************** */
863
864
865 /**
866  * Notify transport service that the communicator has received
867  * a message.
868  *
869  * @param ch connection to transport service
870  * @param sender presumed sender of the message (details to be checked
871  *        by higher layers)
872  * @param msg the message
873  * @param cb function to call once handling the message is done, NULL if
874  *         flow control is not supported by this communicator
875  * @param cb_cls closure for @a cb
876  * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
877  *         immediately dropped due to memory limitations (communicator
878  *         should try to apply back pressure),
879  *         #GNUNET_SYSERR if the message could not be delivered because
880  *         the tranport service is not yet up
881  */
882 int
883 GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
884                                        const struct GNUNET_PeerIdentity *sender,
885                                        const struct GNUNET_MessageHeader *msg,
886                                        GNUNET_TRANSPORT_MessageCompletedCallback cb,
887                                        void *cb_cls)
888 {
889   struct GNUNET_MQ_Envelope *env;
890   struct GNUNET_TRANSPORT_IncomingMessage *im;
891   uint16_t msize;
892
893   if (NULL == ch->mq)
894     return GNUNET_SYSERR;
895   if ( (NULL == cb) &&
896        (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
897   {
898     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
899                 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
900                 ch->max_queue_length);
901     return GNUNET_NO;
902   }
903
904   msize = ntohs (msg->size);
905   env = GNUNET_MQ_msg_extra (im,
906                              msize,
907                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
908   if (NULL == env)
909   {
910     GNUNET_break (0);
911     return GNUNET_SYSERR;
912   }
913   im->sender = *sender;
914   memcpy (&im[1],
915           msg,
916           msize);
917   if (NULL != cb)
918   {
919     struct FlowControl *fc;
920
921     im->fc_on = htonl (GNUNET_YES);
922     im->fc_id = ch->fc_gen++;
923     fc = GNUNET_new (struct FlowControl);
924     fc->sender = *sender;
925     fc->id = im->fc_id;
926     fc->cb = cb;
927     fc->cb_cls = cb_cls;
928     GNUNET_CONTAINER_DLL_insert (ch->fc_head,
929                                  ch->fc_tail,
930                                  fc);
931   }
932   GNUNET_MQ_send (ch->mq,
933                   env);
934   return GNUNET_OK;
935 }
936
937
938 /* ************************* Discovery *************************** */
939
940
941 /**
942  * Notify transport service that an MQ became available due to an
943  * "inbound" connection or because the communicator discovered the
944  * presence of another peer.
945  *
946  * @param ch connection to transport service
947  * @param peer peer with which we can now communicate
948  * @param address address in human-readable format, 0-terminated, UTF-8
949  * @param mtu maximum message size supported by queue, 0 if
950  *            sending is not supported, SIZE_MAX for no MTU
951  * @param nt which network type does the @a address belong to?
952  * @param cc what characteristics does the communicator have?
953  * @param cs what is the connection status of the queue?
954  * @param mq message queue of the @a peer
955  * @return API handle identifying the new MQ
956  */
957 struct GNUNET_TRANSPORT_QueueHandle *
958 GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
959                                       const struct GNUNET_PeerIdentity *peer,
960                                       const char *address,
961                                       uint32_t mtu,
962                                       enum GNUNET_NetworkType nt,
963                                       enum GNUNET_TRANSPORT_ConnectionStatus cs,
964                                       struct GNUNET_MQ_Handle *mq)
965 {
966   struct GNUNET_TRANSPORT_QueueHandle *qh;
967
968   qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
969   qh->ch = ch;
970   qh->peer = *peer;
971   qh->address = GNUNET_strdup (address);
972   qh->nt = nt;
973   qh->mtu = mtu;
974   qh->cs = cs;
975   qh->mq = mq;
976   qh->queue_id = ch->queue_gen++;
977   GNUNET_CONTAINER_DLL_insert (ch->queue_head,
978                                ch->queue_tail,
979                                qh);
980   send_add_queue (qh);
981   return qh;
982 }
983
984
985 /**
986  * Notify transport service that an MQ became unavailable due to a
987  * disconnect or timeout.
988  *
989  * @param qh handle for the queue that must be invalidated
990  */
991 void
992 GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
993 {
994   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
995
996   send_del_queue (qh);
997   GNUNET_CONTAINER_DLL_remove (ch->queue_head,
998                                ch->queue_tail,
999                                qh);
1000   GNUNET_MQ_destroy (qh->mq);
1001   GNUNET_free (qh->address);
1002   GNUNET_free (qh);
1003 }
1004
1005
1006 /**
1007  * Notify transport service about an address that this communicator
1008  * provides for this peer.
1009  *
1010  * @param ch connection to transport service
1011  * @param address our address in human-readable format, 0-terminated, UTF-8
1012  * @param nt which network type does the address belong to?
1013  * @param expiration when does the communicator forsee this address expiring?
1014  */
1015 struct GNUNET_TRANSPORT_AddressIdentifier *
1016 GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1017                                            const char *address,
1018                                            enum GNUNET_NetworkType nt,
1019                                            struct GNUNET_TIME_Relative expiration)
1020 {
1021   struct GNUNET_TRANSPORT_AddressIdentifier *ai;
1022
1023   ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
1024   ai->ch = ch;
1025   ai->address = GNUNET_strdup (address);
1026   ai->nt = nt;
1027   ai->expiration = expiration;
1028   ai->aid = ch->aid_gen++;
1029   GNUNET_CONTAINER_DLL_insert (ch->ai_head,
1030                                ch->ai_tail,
1031                                ai);
1032   send_add_address (ai);
1033   return ai;
1034 }
1035
1036
1037 /**
1038  * Notify transport service about an address that this communicator no
1039  * longer provides for this peer.
1040  *
1041  * @param ai address that is no longer provided
1042  */
1043 void
1044 GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1045 {
1046   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1047
1048   send_del_address (ai);
1049   GNUNET_CONTAINER_DLL_remove (ch->ai_head,
1050                                ch->ai_tail,
1051                                ai);
1052   GNUNET_free (ai->address);
1053   GNUNET_free (ai);
1054 }
1055
1056
1057 /* ************************* Backchannel *************************** */
1058
1059
1060 /**
1061  * The communicator asks the transport service to route a message via
1062  * a different path to another communicator service at another peer.
1063  * This must only be done for special control traffic (as there is no
1064  * flow control for this API), such as acknowledgements, and generally
1065  * only be done if the communicator is uni-directional (i.e. cannot
1066  * send the message back itself).
1067  *
1068  * @param ch handle of this communicator
1069  * @param pid peer to send the message to
1070  * @param comm name of the communicator to send the message to
1071  * @param header header of the message to transmit and pass via the
1072  *        notify-API to @a pid's communicator @a comm
1073  */
1074 void
1075 GNUNET_TRANSPORT_communicator_notify (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1076                                       const struct GNUNET_PeerIdentity *pid,
1077                                       const char *comm,
1078                                       const struct GNUNET_MessageHeader *header)
1079 {
1080   struct GNUNET_MQ_Envelope *env;
1081   struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1082   size_t slen = strlen (comm) + 1;
1083   uint16_t mlen = ntohs (header->size);
1084
1085   GNUNET_assert (mlen + slen + sizeof (*cb) < UINT16_MAX);
1086   env = GNUNET_MQ_msg_extra (cb,
1087                              slen + mlen,
1088                              GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1089   cb->pid = *pid;
1090   memcpy (&cb[1],
1091           header,
1092           mlen);
1093   memcpy (((char *)&cb[1]) + mlen,
1094           comm,
1095           slen);
1096   GNUNET_MQ_send (ch->mq,
1097                   env);
1098 }
1099
1100
1101 /* end of transport_api2_communication.c */