tolerate additional IPv4 address now available for gnunet.org
[oweals/gnunet.git] / src / transport / transport_api2_communication.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file transport/transport_api2_communication.c
23  * @brief implementation of the gnunet_transport_communication_service.h API
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_transport_communication_service.h"
30 #include "gnunet_ats_transport_service.h"
31 #include "transport.h"
32
33
34 /**
35  * How many messages do we keep at most in the queue to the
36  * transport service before we start to drop (default,
37  * can be changed via the configuration file).
38  */
39 #define DEFAULT_MAX_QUEUE_LENGTH 16
40
41
42 /**
43  * Information we track per packet to enable flow control.
44  */
45 struct FlowControl
46 {
47   /**
48    * Kept in a DLL.
49    */
50   struct FlowControl *next;
51
52   /**
53    * Kept in a DLL.
54    */
55   struct FlowControl *prev;
56
57   /**
58    * Function to call once the message was processed.
59    */
60   GNUNET_TRANSPORT_MessageCompletedCallback cb;
61
62   /**
63    * Closure for @e cb
64    */
65   void *cb_cls;
66
67   /**
68    * Which peer is this about?
69    */
70   struct GNUNET_PeerIdentity sender;
71
72   /**
73    * More-or-less unique ID for the message.
74    */
75   uint64_t id;
76 };
77
78
79 /**
80  * Information we track per message to tell the transport about
81  * success or failures.
82  */
83 struct AckPending
84 {
85   /**
86    * Kept in a DLL.
87    */
88   struct AckPending *next;
89
90   /**
91    * Kept in a DLL.
92    */
93   struct AckPending *prev;
94
95   /**
96    * Communicator this entry belongs to.
97    */
98   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
99
100   /**
101    * Which peer is this about?
102    */
103   struct GNUNET_PeerIdentity receiver;
104
105   /**
106    * More-or-less unique ID for the message.
107    */
108   uint64_t mid;
109 };
110
111
112 /**
113  * Opaque handle to the transport service for communicators.
114  */
115 struct GNUNET_TRANSPORT_CommunicatorHandle
116 {
117   /**
118    * Head of DLL of addresses this communicator offers to the transport service.
119    */
120   struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
121
122   /**
123    * Tail of DLL of addresses this communicator offers to the transport service.
124    */
125   struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
126
127   /**
128    * DLL of messages awaiting flow control confirmation (ack).
129    */
130   struct FlowControl *fc_head;
131
132   /**
133    * DLL of messages awaiting flow control confirmation (ack).
134    */
135   struct FlowControl *fc_tail;
136
137   /**
138    * DLL of messages awaiting transmission confirmation (ack).
139    */
140   struct AckPending *ap_head;
141
142   /**
143    * DLL of messages awaiting transmission confirmation (ack).
144    */
145   struct AckPending *ap_tail;
146
147   /**
148    * DLL of queues we offer.
149    */
150   struct GNUNET_TRANSPORT_QueueHandle *queue_head;
151
152   /**
153    * DLL of queues we offer.
154    */
155   struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
156
157   /**
158    * Our configuration.
159    */
160   const struct GNUNET_CONFIGURATION_Handle *cfg;
161
162   /**
163    * Config section to use.
164    */
165   const char *config_section;
166
167   /**
168    * Address prefix to use.
169    */
170   const char *addr_prefix;
171
172   /**
173    * Function to call when the transport service wants us to initiate
174    * a communication channel with another peer.
175    */
176   GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
177
178   /**
179    * Closure for @e mq_init.
180    */
181   void *mq_init_cls;
182
183   /**
184    * Function to call when the transport service receives messages
185    * for a communicator (i.e. for NAT traversal or for non-bidirectional
186    * communicators).
187    */
188   GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
189
190   /**
191    * Closure for @e notify_Cb.
192    */
193   void *notify_cb_cls;
194
195   /**
196    * Queue to talk to the transport service.
197    */
198   struct GNUNET_MQ_Handle *mq;
199
200   /**
201    * Maximum permissable queue length.
202    */
203   unsigned long long max_queue_length;
204
205   /**
206    * Flow-control identifier generator.
207    */
208   uint64_t fc_gen;
209
210   /**
211    * Internal UUID for the address used in communication with the
212    * transport service.
213    */
214   uint32_t aid_gen;
215
216   /**
217    * Queue identifier generator.
218    */
219   uint32_t queue_gen;
220
221   /**
222    * Characteristics of the communicator.
223    */
224   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
225
226 };
227
228
229 /**
230  * Handle returned to identify the internal data structure the transport
231  * API has created to manage a message queue to a particular peer.
232  */
233 struct GNUNET_TRANSPORT_QueueHandle
234 {
235
236   /**
237    * Kept in a DLL.
238    */
239   struct GNUNET_TRANSPORT_QueueHandle *next;
240
241   /**
242    * Kept in a DLL.
243    */
244   struct GNUNET_TRANSPORT_QueueHandle *prev;
245
246   /**
247    * Handle this queue belongs to.
248    */
249   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
250
251   /**
252    * Address used by the communication queue.
253    */
254   char *address;
255
256   /**
257    * The queue itself.
258    */
259   struct GNUNET_MQ_Handle *mq;
260
261   /**
262    * Which peer we can communciate with.
263    */
264   struct GNUNET_PeerIdentity peer;
265
266   /**
267    * Network type of the communciation queue.
268    */
269   enum GNUNET_NetworkType nt;
270
271   /**
272    * Communication status of the queue.
273    */
274   enum GNUNET_TRANSPORT_ConnectionStatus cs;
275
276   /**
277    * ID for this queue when talking to the transport service.
278    */
279   uint32_t queue_id;
280
281   /**
282    * Maximum transmission unit for the queue.
283    */
284   uint32_t mtu;
285
286 };
287
288
289 /**
290  * Internal representation of an address a communicator is
291  * currently providing for the transport service.
292  */
293 struct GNUNET_TRANSPORT_AddressIdentifier
294 {
295
296   /**
297    * Kept in a DLL.
298    */
299   struct GNUNET_TRANSPORT_AddressIdentifier *next;
300
301   /**
302    * Kept in a DLL.
303    */
304   struct GNUNET_TRANSPORT_AddressIdentifier *prev;
305
306   /**
307    * Transport handle where the address was added.
308    */
309   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
310
311   /**
312    * The actual address.
313    */
314   char *address;
315
316   /**
317    * When does the address expire? (Expected lifetime of the
318    * address.)
319    */
320   struct GNUNET_TIME_Relative expiration;
321
322   /**
323    * Internal UUID for the address used in communication with the
324    * transport service.
325    */
326   uint32_t aid;
327
328   /**
329    * Network type for the address.
330    */
331   enum GNUNET_NetworkType nt;
332
333 };
334
335
336 /**
337  * (re)connect our communicator to the transport service
338  *
339  * @param ch handle to reconnect
340  */
341 static void
342 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
343
344
345 /**
346  * Send message to the transport service about address @a ai
347  * being now available.
348  *
349  * @param ai address to add
350  */
351 static void
352 send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
353 {
354   struct GNUNET_MQ_Envelope *env;
355   struct GNUNET_TRANSPORT_AddAddressMessage *aam;
356
357   if (NULL == ai->ch->mq)
358     return;
359   env = GNUNET_MQ_msg_extra (aam,
360                              strlen (ai->address) + 1,
361                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
362   aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
363   aam->nt = htonl ((uint32_t) ai->nt);
364   memcpy (&aam[1],
365           ai->address,
366           strlen (ai->address) + 1);
367   GNUNET_MQ_send (ai->ch->mq,
368                   env);
369 }
370
371
372 /**
373  * Send message to the transport service about address @a ai
374  * being no longer available.
375  *
376  * @param ai address to delete
377  */
378 static void
379 send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
380 {
381   struct GNUNET_MQ_Envelope *env;
382   struct GNUNET_TRANSPORT_DelAddressMessage *dam;
383
384   if (NULL == ai->ch->mq)
385     return;
386   env = GNUNET_MQ_msg (dam,
387                        GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
388   dam->aid = htonl (ai->aid);
389   GNUNET_MQ_send (ai->ch->mq,
390                   env);
391 }
392
393
394 /**
395  * Send message to the transport service about queue @a qh
396  * being now available.
397  *
398  * @param qh queue to add
399  */
400 static void
401 send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
402 {
403   struct GNUNET_MQ_Envelope *env;
404   struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
405
406   if (NULL == qh->ch->mq)
407     return;
408   env = GNUNET_MQ_msg_extra (aqm,
409                              strlen (qh->address) + 1,
410                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
411   aqm->qid = htonl (qh->queue_id);
412   aqm->receiver = qh->peer;
413   aqm->nt = htonl ((uint32_t) qh->nt);
414   aqm->mtu = htonl (qh->mtu);
415   aqm->cs = htonl ((uint32_t) qh->cs);
416   memcpy (&aqm[1],
417           qh->address,
418           strlen (qh->address) + 1);
419   GNUNET_MQ_send (qh->ch->mq,
420                   env);
421 }
422
423
424 /**
425  * Send message to the transport service about queue @a qh
426  * being no longer available.
427  *
428  * @param qh queue to delete
429  */
430 static void
431 send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
432 {
433   struct GNUNET_MQ_Envelope *env;
434   struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
435
436   if (NULL == qh->ch->mq)
437     return;
438   env = GNUNET_MQ_msg (dqm,
439                        GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
440   dqm->qid = htonl (qh->queue_id);
441   dqm->receiver = qh->peer;
442   GNUNET_MQ_send (qh->ch->mq,
443                   env);
444 }
445
446
447 /**
448  * Disconnect from the transport service.  Purges
449  * all flow control entries as we will no longer receive
450  * the ACKs.  Purges the ack pending entries as the
451  * transport will no longer expect the confirmations.
452  *
453  * @param ch service to disconnect from
454  */
455 static void
456 disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
457 {
458   struct FlowControl *fcn;
459   struct AckPending *apn;
460
461   for (struct FlowControl *fc = ch->fc_head;
462        NULL != fc;
463        fc = fcn)
464   {
465     fcn = fc->next;
466     GNUNET_CONTAINER_DLL_remove (ch->fc_head,
467                                  ch->fc_tail,
468                                  fc);
469     fc->cb (fc->cb_cls,
470             GNUNET_SYSERR);
471     GNUNET_free (fc);
472   }
473   for (struct AckPending *ap = ch->ap_head;
474        NULL != ap;
475        ap = apn)
476   {
477     apn = ap->next;
478     GNUNET_CONTAINER_DLL_remove (ch->ap_head,
479                                  ch->ap_tail,
480                                  ap);
481     GNUNET_free (ap);
482   }
483   if (NULL == ch->mq)
484     return;
485   GNUNET_MQ_destroy (ch->mq);
486   ch->mq = NULL;
487 }
488
489
490 /**
491  * Function called on MQ errors.
492  */
493 static void
494 error_handler (void *cls,
495                enum GNUNET_MQ_Error error)
496 {
497   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
498
499   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
500               "MQ failure %d, reconnecting to transport service.\n",
501               error);
502   disconnect (ch);
503   /* TODO: maybe do this with exponential backoff/delay */
504   reconnect (ch);
505 }
506
507
508 /**
509  * Transport service acknowledged a message we gave it
510  * (with flow control enabled). Tell the communicator.
511  *
512  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
513  * @param incoming_ack the ack
514  */
515 static void
516 handle_incoming_ack (void *cls,
517                      const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
518 {
519   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
520
521   for (struct FlowControl *fc = ch->fc_head;
522        NULL != fc;
523        fc = fc->next)
524   {
525     if ( (fc->id == incoming_ack->fc_id) &&
526          (0 == memcmp (&fc->sender,
527                        &incoming_ack->sender,
528                        sizeof (struct GNUNET_PeerIdentity))) )
529     {
530       GNUNET_CONTAINER_DLL_remove (ch->fc_head,
531                                    ch->fc_tail,
532                                    fc);
533       fc->cb (fc->cb_cls,
534               GNUNET_OK);
535       GNUNET_free (fc);
536       return;
537     }
538   }
539   GNUNET_break (0);
540   disconnect (ch);
541   /* TODO: maybe do this with exponential backoff/delay */
542   reconnect (ch);
543 }
544
545
546 /**
547  * Transport service wants us to create a queue. Check if @a cq
548  * is well-formed.
549  *
550  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
551  * @param cq the queue creation request
552  * @return #GNUNET_OK if @a smt is well-formed
553  */
554 static int
555 check_create_queue (void *cls,
556                     const struct GNUNET_TRANSPORT_CreateQueue *cq)
557 {
558   uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
559   const char *addr = (const char *) &cq[1];
560
561   (void) cls;
562   if ( (0 == len) ||
563        ('\0' != addr[len-1]) )
564   {
565     GNUNET_break (0);
566     return GNUNET_SYSERR;
567   }
568   return GNUNET_OK;
569 }
570
571
572 /**
573  * Transport service wants us to create a queue. Tell the communicator.
574  *
575  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
576  * @param cq the queue creation request
577  */
578 static void
579 handle_create_queue (void *cls,
580                      const struct GNUNET_TRANSPORT_CreateQueue *cq)
581 {
582   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
583   const char *addr = (const char *) &cq[1];
584   struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
585   struct GNUNET_MQ_Envelope *env;
586
587   if (GNUNET_OK !=
588       ch->mq_init (ch->mq_init_cls,
589                    &cq->receiver,
590                    addr))
591   {
592     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
593                 "Address `%s' invalid for this communicator\n",
594                 addr);
595     env = GNUNET_MQ_msg (cqr,
596                          GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
597   }
598   else
599   {
600     env = GNUNET_MQ_msg (cqr,
601                          GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
602   }
603   cqr->request_id = cq->request_id;
604   GNUNET_MQ_send (ch->mq,
605                   env);
606 }
607
608
609 /**
610  * Transport service wants us to send a message. Check if @a smt
611  * is well-formed.
612  *
613  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
614  * @param smt the transmission request
615  * @return #GNUNET_OK if @a smt is well-formed
616  */
617 static int
618 check_send_msg (void *cls,
619                 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
620 {
621   (void) cls;
622   GNUNET_MQ_check_boxed_message (smt);
623   return GNUNET_OK;
624 }
625
626
627 /**
628  * Notify transport service about @a status of a message with
629  * @a mid sent to @a receiver.
630  *
631  * @param ch handle
632  * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
633  * @param receiver which peer was the receiver
634  * @param mid message that the ack is about
635  */
636 static void
637 send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
638           int status,
639           const struct GNUNET_PeerIdentity *receiver,
640           uint64_t mid)
641 {
642   struct GNUNET_MQ_Envelope *env;
643   struct GNUNET_TRANSPORT_SendMessageToAck *ack;
644
645   env = GNUNET_MQ_msg (ack,
646                        GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
647   ack->status = htonl (status);
648   ack->mid = mid;
649   ack->receiver = *receiver;
650   GNUNET_MQ_send (ch->mq,
651                   env);
652 }
653
654
655 /**
656  * Message queue transmission by communicator was successful,
657  * notify transport service.
658  *
659  * @param cls an `struct AckPending *`
660  */
661 static void
662 send_ack_cb (void *cls)
663 {
664   struct AckPending *ap = cls;
665   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
666
667   GNUNET_CONTAINER_DLL_remove (ch->ap_head,
668                                ch->ap_tail,
669                                ap);
670   send_ack (ch,
671             GNUNET_OK,
672             &ap->receiver,
673             ap->mid);
674   GNUNET_free (ap);
675 }
676
677
678 /**
679  * Transport service wants us to send a message. Tell the communicator.
680  *
681  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
682  * @param smt the transmission request
683  */
684 static void
685 handle_send_msg (void *cls,
686                  const struct GNUNET_TRANSPORT_SendMessageTo *smt)
687 {
688   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
689   const struct GNUNET_MessageHeader *mh;
690   struct GNUNET_MQ_Envelope *env;
691   struct AckPending *ap;
692   struct GNUNET_TRANSPORT_QueueHandle *qh;
693
694   for (qh = ch->queue_head;NULL != qh; qh = qh->next)
695     if ( (qh->queue_id == smt->qid) &&
696          (0 == memcmp (&qh->peer,
697                        &smt->receiver,
698                        sizeof (struct GNUNET_PeerIdentity))) )
699       break;
700   if (NULL == qh)
701   {
702     /* queue is already gone, tell transport this one failed */
703     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
704                 "Transmission failed, queue no longer exists.\n");
705     send_ack (ch,
706               GNUNET_NO,
707               &smt->receiver,
708               smt->mid);
709     return;
710   }
711   ap = GNUNET_new (struct AckPending);
712   ap->ch = ch;
713   ap->receiver = smt->receiver;
714   ap->mid = smt->mid;
715   GNUNET_CONTAINER_DLL_insert (ch->ap_head,
716                                ch->ap_tail,
717                                ap);
718   mh = (const struct GNUNET_MessageHeader *) &smt[1];
719   env = GNUNET_MQ_msg_copy (mh);
720   GNUNET_MQ_notify_sent (env,
721                          &send_ack_cb,
722                          ap);
723   GNUNET_MQ_send (qh->mq,
724                   env);
725 }
726
727
728 /**
729  * Transport service gives us backchannel message. Check if @a bi
730  * is well-formed.
731  *
732  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
733  * @param bi the backchannel message
734  * @return #GNUNET_OK if @a smt is well-formed
735  */
736 static int
737 check_backchannel_incoming (void *cls,
738                             const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
739 {
740   (void) cls;
741   GNUNET_MQ_check_boxed_message (bi);
742   return GNUNET_OK;
743 }
744
745
746 /**
747  * Transport service gives us backchannel message. Handle it.
748  *
749  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
750  * @param bi the backchannel message
751  */
752 static void
753 handle_backchannel_incoming (void *cls,
754                              const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
755 {
756   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
757
758   if (NULL != ch->notify_cb)
759     ch->notify_cb (ch->notify_cb_cls,
760                    &bi->pid,
761                    (const struct GNUNET_MessageHeader *) &bi[1]);
762   else
763     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
764                 _("Dropped backchanel message: handler not provided by communicator\n"));
765 }
766
767
768 /**
769  * (re)connect our communicator to the transport service
770  *
771  * @param ch handle to reconnect
772  */
773 static void
774 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
775 {
776   struct GNUNET_MQ_MessageHandler handlers[] = {
777     GNUNET_MQ_hd_fixed_size (incoming_ack,
778                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
779                              struct GNUNET_TRANSPORT_IncomingMessageAck,
780                              ch),
781     GNUNET_MQ_hd_var_size (create_queue,
782                            GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
783                            struct GNUNET_TRANSPORT_CreateQueue,
784                            ch),
785     GNUNET_MQ_hd_var_size (send_msg,
786                            GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
787                            struct GNUNET_TRANSPORT_SendMessageTo,
788                            ch),
789     GNUNET_MQ_hd_var_size (backchannel_incoming,
790                            GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
791                            struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
792                            ch),
793     GNUNET_MQ_handler_end()
794   };
795   struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
796   struct GNUNET_MQ_Envelope *env;
797
798   ch->mq = GNUNET_CLIENT_connect (ch->cfg,
799                                   "transport",
800                                   handlers,
801                                   &error_handler,
802                                   ch);
803   if (NULL == ch->mq)
804     return;
805   env = GNUNET_MQ_msg_extra (cam,
806                              strlen (ch->addr_prefix) + 1,
807                              GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
808   cam->cc = htonl ((uint32_t) ch->cc);
809   memcpy (&cam[1],
810           ch->addr_prefix,
811           strlen (ch->addr_prefix) + 1);
812   GNUNET_MQ_send (ch->mq,
813                   env);
814   for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
815        NULL != ai;
816        ai = ai->next)
817     send_add_address (ai);
818   for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
819        NULL != qh;
820        qh = qh->next)
821     send_add_queue (qh);
822 }
823
824
825 /**
826  * Connect to the transport service.
827  *
828  * @param cfg configuration to use
829  * @param config_section section of the configuration to use for options
830  * @param addr_prefix address prefix for addresses supported by this
831  *        communicator, could be NULL for incoming-only communicators
832  * @param cc what characteristics does the communicator have?
833  * @param mtu maximum message size supported by communicator, 0 if
834  *            sending is not supported, SIZE_MAX for no MTU
835  * @param mq_init function to call to initialize a message queue given
836  *                the address of another peer, can be NULL if the
837  *                communicator only supports receiving messages
838  * @param mq_init_cls closure for @a mq_init
839  * @param notify_cb function to pass backchannel messages to communicator
840  * @param notify_cb_cls closure for @a notify_cb
841  * @return NULL on error
842  */
843 struct GNUNET_TRANSPORT_CommunicatorHandle *
844 GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
845                                        const char *config_section,
846                                        const char *addr_prefix,
847                                        enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
848                                        GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
849                                        void *mq_init_cls,
850                                        GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
851                                        void *notify_cb_cls)
852 {
853   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
854
855   ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
856   ch->cfg = cfg;
857   ch->config_section = config_section;
858   ch->addr_prefix = addr_prefix;
859   ch->mq_init = mq_init;
860   ch->mq_init_cls = mq_init_cls;
861   ch->notify_cb = notify_cb;
862   ch->notify_cb_cls = notify_cb_cls;
863   ch->cc = cc;
864   reconnect (ch);
865   if (GNUNET_OK !=
866       GNUNET_CONFIGURATION_get_value_number (cfg,
867                                              config_section,
868                                              "MAX_QUEUE_LENGTH",
869                                              &ch->max_queue_length))
870     ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
871   if (NULL == ch->mq)
872   {
873     GNUNET_free (ch);
874     return NULL;
875   }
876   return ch;
877 }
878
879
880 /**
881  * Disconnect from the transport service.
882  *
883  * @param ch handle returned from connect
884  */
885 void
886 GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
887 {
888   disconnect (ch);
889   while (NULL != ch->ai_head)
890   {
891     GNUNET_break (0); /* communicator forgot to remove address, warn! */
892     GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
893   }
894   GNUNET_free (ch);
895 }
896
897
898 /* ************************* Receiving *************************** */
899
900
901 /**
902  * Notify transport service that the communicator has received
903  * a message.
904  *
905  * @param ch connection to transport service
906  * @param sender presumed sender of the message (details to be checked
907  *        by higher layers)
908  * @param msg the message
909  * @param expected_addr_validity how long does the communicator believe it
910  *        will continue to be able to receive messages from the same address
911  *        on which it received this message?
912  * @param cb function to call once handling the message is done, NULL if
913  *         flow control is not supported by this communicator
914  * @param cb_cls closure for @a cb
915  * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
916  *         immediately dropped due to memory limitations (communicator
917  *         should try to apply back pressure),
918  *         #GNUNET_SYSERR if the message could not be delivered because
919  *         the tranport service is not yet up
920  */
921 int
922 GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
923                                        const struct GNUNET_PeerIdentity *sender,
924                                        const struct GNUNET_MessageHeader *msg,
925                                        struct GNUNET_TIME_Relative expected_addr_validity,
926                                        GNUNET_TRANSPORT_MessageCompletedCallback cb,
927                                        void *cb_cls)
928 {
929   struct GNUNET_MQ_Envelope *env;
930   struct GNUNET_TRANSPORT_IncomingMessage *im;
931   uint16_t msize;
932
933   if (NULL == ch->mq)
934     return GNUNET_SYSERR;
935   if ( (NULL == cb) &&
936        (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
937   {
938     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
939                 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
940                 ch->max_queue_length);
941     return GNUNET_NO;
942   }
943
944   msize = ntohs (msg->size);
945   env = GNUNET_MQ_msg_extra (im,
946                              msize,
947                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
948   if (NULL == env)
949   {
950     GNUNET_break (0);
951     return GNUNET_SYSERR;
952   }
953   im->expected_address_validity = GNUNET_TIME_relative_hton (expected_addr_validity);
954   im->sender = *sender;
955   memcpy (&im[1],
956           msg,
957           msize);
958   if (NULL != cb)
959   {
960     struct FlowControl *fc;
961
962     im->fc_on = htonl (GNUNET_YES);
963     im->fc_id = ch->fc_gen++;
964     fc = GNUNET_new (struct FlowControl);
965     fc->sender = *sender;
966     fc->id = im->fc_id;
967     fc->cb = cb;
968     fc->cb_cls = cb_cls;
969     GNUNET_CONTAINER_DLL_insert (ch->fc_head,
970                                  ch->fc_tail,
971                                  fc);
972   }
973   GNUNET_MQ_send (ch->mq,
974                   env);
975   return GNUNET_OK;
976 }
977
978
979 /* ************************* Discovery *************************** */
980
981
982 /**
983  * Notify transport service that an MQ became available due to an
984  * "inbound" connection or because the communicator discovered the
985  * presence of another peer.
986  *
987  * @param ch connection to transport service
988  * @param peer peer with which we can now communicate
989  * @param address address in human-readable format, 0-terminated, UTF-8
990  * @param mtu maximum message size supported by queue, 0 if
991  *            sending is not supported, SIZE_MAX for no MTU
992  * @param nt which network type does the @a address belong to?
993  * @param cc what characteristics does the communicator have?
994  * @param cs what is the connection status of the queue?
995  * @param mq message queue of the @a peer
996  * @return API handle identifying the new MQ
997  */
998 struct GNUNET_TRANSPORT_QueueHandle *
999 GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1000                                       const struct GNUNET_PeerIdentity *peer,
1001                                       const char *address,
1002                                       uint32_t mtu,
1003                                       enum GNUNET_NetworkType nt,
1004                                       enum GNUNET_TRANSPORT_ConnectionStatus cs,
1005                                       struct GNUNET_MQ_Handle *mq)
1006 {
1007   struct GNUNET_TRANSPORT_QueueHandle *qh;
1008
1009   qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
1010   qh->ch = ch;
1011   qh->peer = *peer;
1012   qh->address = GNUNET_strdup (address);
1013   qh->nt = nt;
1014   qh->mtu = mtu;
1015   qh->cs = cs;
1016   qh->mq = mq;
1017   qh->queue_id = ch->queue_gen++;
1018   GNUNET_CONTAINER_DLL_insert (ch->queue_head,
1019                                ch->queue_tail,
1020                                qh);
1021   send_add_queue (qh);
1022   return qh;
1023 }
1024
1025
1026 /**
1027  * Notify transport service that an MQ became unavailable due to a
1028  * disconnect or timeout.
1029  *
1030  * @param qh handle for the queue that must be invalidated
1031  */
1032 void
1033 GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
1034 {
1035   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
1036
1037   send_del_queue (qh);
1038   GNUNET_CONTAINER_DLL_remove (ch->queue_head,
1039                                ch->queue_tail,
1040                                qh);
1041   GNUNET_MQ_destroy (qh->mq);
1042   GNUNET_free (qh->address);
1043   GNUNET_free (qh);
1044 }
1045
1046
1047 /**
1048  * Notify transport service about an address that this communicator
1049  * provides for this peer.
1050  *
1051  * @param ch connection to transport service
1052  * @param address our address in human-readable format, 0-terminated, UTF-8
1053  * @param nt which network type does the address belong to?
1054  * @param expiration when does the communicator forsee this address expiring?
1055  */
1056 struct GNUNET_TRANSPORT_AddressIdentifier *
1057 GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1058                                            const char *address,
1059                                            enum GNUNET_NetworkType nt,
1060                                            struct GNUNET_TIME_Relative expiration)
1061 {
1062   struct GNUNET_TRANSPORT_AddressIdentifier *ai;
1063
1064   ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
1065   ai->ch = ch;
1066   ai->address = GNUNET_strdup (address);
1067   ai->nt = nt;
1068   ai->expiration = expiration;
1069   ai->aid = ch->aid_gen++;
1070   GNUNET_CONTAINER_DLL_insert (ch->ai_head,
1071                                ch->ai_tail,
1072                                ai);
1073   send_add_address (ai);
1074   return ai;
1075 }
1076
1077
1078 /**
1079  * Notify transport service about an address that this communicator no
1080  * longer provides for this peer.
1081  *
1082  * @param ai address that is no longer provided
1083  */
1084 void
1085 GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1086 {
1087   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1088
1089   send_del_address (ai);
1090   GNUNET_CONTAINER_DLL_remove (ch->ai_head,
1091                                ch->ai_tail,
1092                                ai);
1093   GNUNET_free (ai->address);
1094   GNUNET_free (ai);
1095 }
1096
1097
1098 /* ************************* Backchannel *************************** */
1099
1100
1101 /**
1102  * The communicator asks the transport service to route a message via
1103  * a different path to another communicator service at another peer.
1104  * This must only be done for special control traffic (as there is no
1105  * flow control for this API), such as acknowledgements, and generally
1106  * only be done if the communicator is uni-directional (i.e. cannot
1107  * send the message back itself).
1108  *
1109  * @param ch handle of this communicator
1110  * @param pid peer to send the message to
1111  * @param comm name of the communicator to send the message to
1112  * @param header header of the message to transmit and pass via the
1113  *        notify-API to @a pid's communicator @a comm
1114  */
1115 void
1116 GNUNET_TRANSPORT_communicator_notify (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1117                                       const struct GNUNET_PeerIdentity *pid,
1118                                       const char *comm,
1119                                       const struct GNUNET_MessageHeader *header)
1120 {
1121   struct GNUNET_MQ_Envelope *env;
1122   struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1123   size_t slen = strlen (comm) + 1;
1124   uint16_t mlen = ntohs (header->size);
1125
1126   GNUNET_assert (mlen + slen + sizeof (*cb) < UINT16_MAX);
1127   env = GNUNET_MQ_msg_extra (cb,
1128                              slen + mlen,
1129                              GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1130   cb->pid = *pid;
1131   memcpy (&cb[1],
1132           header,
1133           mlen);
1134   memcpy (((char *)&cb[1]) + mlen,
1135           comm,
1136           slen);
1137   GNUNET_MQ_send (ch->mq,
1138                   env);
1139 }
1140
1141
1142 /* end of transport_api2_communication.c */