replace printf() with GNUNET_log()
[oweals/gnunet.git] / src / transport / transport_api2_communication.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file transport/transport_api2_communication.c
23  * @brief implementation of the gnunet_transport_communication_service.h API
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_transport_communication_service.h"
30 #include "gnunet_ats_transport_service.h"
31 #include "transport.h"
32
33
34 /**
35  * How many messages do we keep at most in the queue to the
36  * transport service before we start to drop (default,
37  * can be changed via the configuration file).
38  */
39 #define DEFAULT_MAX_QUEUE_LENGTH 16
40
41
42 /**
43  * Information we track per packet to enable flow control.
44  */
45 struct FlowControl
46 {
47   /**
48    * Kept in a DLL.
49    */
50   struct FlowControl *next;
51
52   /**
53    * Kept in a DLL.
54    */
55   struct FlowControl *prev;
56
57   /**
58    * Function to call once the message was processed.
59    */
60   GNUNET_TRANSPORT_MessageCompletedCallback cb;
61
62   /**
63    * Closure for @e cb
64    */
65   void *cb_cls;
66
67   /**
68    * Which peer is this about?
69    */
70   struct GNUNET_PeerIdentity sender;
71
72   /**
73    * More-or-less unique ID for the message.
74    */
75   uint64_t id;
76 };
77
78
79 /**
80  * Information we track per message to tell the transport about
81  * success or failures.
82  */
83 struct AckPending
84 {
85   /**
86    * Kept in a DLL.
87    */
88   struct AckPending *next;
89
90   /**
91    * Kept in a DLL.
92    */
93   struct AckPending *prev;
94
95   /**
96    * Communicator this entry belongs to.
97    */
98   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
99
100   /**
101    * Which peer is this about?
102    */
103   struct GNUNET_PeerIdentity receiver;
104
105   /**
106    * More-or-less unique ID for the message.
107    */
108   uint64_t mid;
109 };
110
111
112 /**
113  * Opaque handle to the transport service for communicators.
114  */
115 struct GNUNET_TRANSPORT_CommunicatorHandle
116 {
117   /**
118    * Head of DLL of addresses this communicator offers to the transport service.
119    */
120   struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
121
122   /**
123    * Tail of DLL of addresses this communicator offers to the transport service.
124    */
125   struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
126
127   /**
128    * DLL of messages awaiting flow control confirmation (ack).
129    */
130   struct FlowControl *fc_head;
131
132   /**
133    * DLL of messages awaiting flow control confirmation (ack).
134    */
135   struct FlowControl *fc_tail;
136
137   /**
138    * DLL of messages awaiting transmission confirmation (ack).
139    */
140   struct AckPending *ap_head;
141
142   /**
143    * DLL of messages awaiting transmission confirmation (ack).
144    */
145   struct AckPending *ap_tail;
146
147   /**
148    * DLL of queues we offer.
149    */
150   struct GNUNET_TRANSPORT_QueueHandle *queue_head;
151
152   /**
153    * DLL of queues we offer.
154    */
155   struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
156
157   /**
158    * Our configuration.
159    */
160   const struct GNUNET_CONFIGURATION_Handle *cfg;
161
162   /**
163    * Config section to use.
164    */
165   const char *config_section;
166
167   /**
168    * Address prefix to use.
169    */
170   const char *addr_prefix;
171
172   /**
173    * Function to call when the transport service wants us to initiate
174    * a communication channel with another peer.
175    */
176   GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
177
178   /**
179    * Closure for @e mq_init.
180    */
181   void *mq_init_cls;
182
183   /**
184    * Function to call when the transport service receives messages
185    * for a communicator (i.e. for NAT traversal or for non-bidirectional
186    * communicators).
187    */
188   GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
189
190   /**
191    * Closure for @e notify_Cb.
192    */
193   void *notify_cb_cls;
194
195   /**
196    * Queue to talk to the transport service.
197    */
198   struct GNUNET_MQ_Handle *mq;
199
200   /**
201    * Maximum permissable queue length.
202    */
203   unsigned long long max_queue_length;
204
205   /**
206    * Flow-control identifier generator.
207    */
208   uint64_t fc_gen;
209
210   /**
211    * Internal UUID for the address used in communication with the
212    * transport service.
213    */
214   uint32_t aid_gen;
215
216   /**
217    * Queue identifier generator.
218    */
219   uint32_t queue_gen;
220
221   /**
222    * Characteristics of the communicator.
223    */
224   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
225 };
226
227
228 /**
229  * Handle returned to identify the internal data structure the transport
230  * API has created to manage a message queue to a particular peer.
231  */
232 struct GNUNET_TRANSPORT_QueueHandle
233 {
234   /**
235    * Kept in a DLL.
236    */
237   struct GNUNET_TRANSPORT_QueueHandle *next;
238
239   /**
240    * Kept in a DLL.
241    */
242   struct GNUNET_TRANSPORT_QueueHandle *prev;
243
244   /**
245    * Handle this queue belongs to.
246    */
247   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
248
249   /**
250    * Address used by the communication queue.
251    */
252   char *address;
253
254   /**
255    * The queue itself.
256    */
257   struct GNUNET_MQ_Handle *mq;
258
259   /**
260    * Which peer we can communciate with.
261    */
262   struct GNUNET_PeerIdentity peer;
263
264   /**
265    * Network type of the communciation queue.
266    */
267   enum GNUNET_NetworkType nt;
268
269   /**
270    * Communication status of the queue.
271    */
272   enum GNUNET_TRANSPORT_ConnectionStatus cs;
273
274   /**
275    * ID for this queue when talking to the transport service.
276    */
277   uint32_t queue_id;
278
279   /**
280    * Maximum transmission unit for the queue.
281    */
282   uint32_t mtu;
283 };
284
285
286 /**
287  * Internal representation of an address a communicator is
288  * currently providing for the transport service.
289  */
290 struct GNUNET_TRANSPORT_AddressIdentifier
291 {
292   /**
293    * Kept in a DLL.
294    */
295   struct GNUNET_TRANSPORT_AddressIdentifier *next;
296
297   /**
298    * Kept in a DLL.
299    */
300   struct GNUNET_TRANSPORT_AddressIdentifier *prev;
301
302   /**
303    * Transport handle where the address was added.
304    */
305   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
306
307   /**
308    * The actual address.
309    */
310   char *address;
311
312   /**
313    * When does the address expire? (Expected lifetime of the
314    * address.)
315    */
316   struct GNUNET_TIME_Relative expiration;
317
318   /**
319    * Internal UUID for the address used in communication with the
320    * transport service.
321    */
322   uint32_t aid;
323
324   /**
325    * Network type for the address.
326    */
327   enum GNUNET_NetworkType nt;
328 };
329
330
331 /**
332  * (re)connect our communicator to the transport service
333  *
334  * @param ch handle to reconnect
335  */
336 static void
337 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
338
339
340 /**
341  * Send message to the transport service about address @a ai
342  * being now available.
343  *
344  * @param ai address to add
345  */
346 static void
347 send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
348 {
349   struct GNUNET_MQ_Envelope *env;
350   struct GNUNET_TRANSPORT_AddAddressMessage *aam;
351
352   if (NULL == ai->ch->mq)
353     return;
354   env = GNUNET_MQ_msg_extra (aam,
355                              strlen (ai->address) + 1,
356                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
357   aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
358   aam->nt = htonl ((uint32_t) ai->nt);
359   memcpy (&aam[1], ai->address, strlen (ai->address) + 1);
360   GNUNET_MQ_send (ai->ch->mq, env);
361 }
362
363
364 /**
365  * Send message to the transport service about address @a ai
366  * being no longer available.
367  *
368  * @param ai address to delete
369  */
370 static void
371 send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
372 {
373   struct GNUNET_MQ_Envelope *env;
374   struct GNUNET_TRANSPORT_DelAddressMessage *dam;
375
376   if (NULL == ai->ch->mq)
377     return;
378   env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
379   dam->aid = htonl (ai->aid);
380   GNUNET_MQ_send (ai->ch->mq, env);
381 }
382
383
384 /**
385  * Send message to the transport service about queue @a qh
386  * being now available.
387  *
388  * @param qh queue to add
389  */
390 static void
391 send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
392 {
393   struct GNUNET_MQ_Envelope *env;
394   struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
395
396   if (NULL == qh->ch->mq)
397     return;
398   env = GNUNET_MQ_msg_extra (aqm,
399                              strlen (qh->address) + 1,
400                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
401   aqm->qid = htonl (qh->queue_id);
402   aqm->receiver = qh->peer;
403   aqm->nt = htonl ((uint32_t) qh->nt);
404   aqm->mtu = htonl (qh->mtu);
405   aqm->cs = htonl ((uint32_t) qh->cs);
406   memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
407   GNUNET_MQ_send (qh->ch->mq, env);
408 }
409
410
411 /**
412  * Send message to the transport service about queue @a qh
413  * being no longer available.
414  *
415  * @param qh queue to delete
416  */
417 static void
418 send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
419 {
420   struct GNUNET_MQ_Envelope *env;
421   struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
422
423   if (NULL == qh->ch->mq)
424     return;
425   env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
426   dqm->qid = htonl (qh->queue_id);
427   dqm->receiver = qh->peer;
428   GNUNET_MQ_send (qh->ch->mq, env);
429 }
430
431
432 /**
433  * Disconnect from the transport service.  Purges
434  * all flow control entries as we will no longer receive
435  * the ACKs.  Purges the ack pending entries as the
436  * transport will no longer expect the confirmations.
437  *
438  * @param ch service to disconnect from
439  */
440 static void
441 disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
442 {
443   struct FlowControl *fcn;
444   struct AckPending *apn;
445
446   for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
447   {
448     fcn = fc->next;
449     GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
450     fc->cb (fc->cb_cls, GNUNET_SYSERR);
451     GNUNET_free (fc);
452   }
453   for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
454   {
455     apn = ap->next;
456     GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
457     GNUNET_free (ap);
458   }
459   if (NULL == ch->mq)
460     return;
461   GNUNET_MQ_destroy (ch->mq);
462   ch->mq = NULL;
463 }
464
465
466 /**
467  * Function called on MQ errors.
468  */
469 static void
470 error_handler (void *cls, enum GNUNET_MQ_Error error)
471 {
472   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
473
474   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
475               "MQ failure %d, reconnecting to transport service.\n",
476               error);
477   disconnect (ch);
478   /* TODO: maybe do this with exponential backoff/delay */
479   reconnect (ch);
480 }
481
482
483 /**
484  * Transport service acknowledged a message we gave it
485  * (with flow control enabled). Tell the communicator.
486  *
487  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
488  * @param incoming_ack the ack
489  */
490 static void
491 handle_incoming_ack (
492   void *cls,
493   const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
494 {
495   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
496
497   for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
498   {
499     if ((fc->id == incoming_ack->fc_id) &&
500         (0 == memcmp (&fc->sender,
501                       &incoming_ack->sender,
502                       sizeof(struct GNUNET_PeerIdentity))))
503     {
504       GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
505       fc->cb (fc->cb_cls, GNUNET_OK);
506       GNUNET_free (fc);
507       return;
508     }
509   }
510   GNUNET_break (0);
511   disconnect (ch);
512   /* TODO: maybe do this with exponential backoff/delay */
513   reconnect (ch);
514 }
515
516
517 /**
518  * Transport service wants us to create a queue. Check if @a cq
519  * is well-formed.
520  *
521  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
522  * @param cq the queue creation request
523  * @return #GNUNET_OK if @a smt is well-formed
524  */
525 static int
526 check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
527 {
528   (void) cls;
529   GNUNET_MQ_check_zero_termination (cq);
530   return GNUNET_OK;
531 }
532
533
534 /**
535  * Transport service wants us to create a queue. Tell the communicator.
536  *
537  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
538  * @param cq the queue creation request
539  */
540 static void
541 handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
542 {
543   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
544   const char *addr = (const char *) &cq[1];
545   struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
546   struct GNUNET_MQ_Envelope *env;
547
548   if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr))
549   {
550     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
551                 "Address `%s' invalid for this communicator\n",
552                 addr);
553     env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
554   }
555   else
556   {
557     env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
558   }
559   cqr->request_id = cq->request_id;
560   GNUNET_MQ_send (ch->mq, env);
561 }
562
563
564 /**
565  * Transport service wants us to send a message. Check if @a smt
566  * is well-formed.
567  *
568  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
569  * @param smt the transmission request
570  * @return #GNUNET_OK if @a smt is well-formed
571  */
572 static int
573 check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
574 {
575   (void) cls;
576   GNUNET_MQ_check_boxed_message (smt);
577   return GNUNET_OK;
578 }
579
580
581 /**
582  * Notify transport service about @a status of a message with
583  * @a mid sent to @a receiver.
584  *
585  * @param ch handle
586  * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
587  * @param receiver which peer was the receiver
588  * @param mid message that the ack is about
589  */
590 static void
591 send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
592           int status,
593           const struct GNUNET_PeerIdentity *receiver,
594           uint64_t mid)
595 {
596   struct GNUNET_MQ_Envelope *env;
597   struct GNUNET_TRANSPORT_SendMessageToAck *ack;
598
599   env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
600   ack->status = htonl (status);
601   ack->mid = mid;
602   ack->receiver = *receiver;
603   GNUNET_MQ_send (ch->mq, env);
604 }
605
606
607 /**
608  * Message queue transmission by communicator was successful,
609  * notify transport service.
610  *
611  * @param cls an `struct AckPending *`
612  */
613 static void
614 send_ack_cb (void *cls)
615 {
616   struct AckPending *ap = cls;
617   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
618
619   GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
620   send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid);
621   GNUNET_free (ap);
622 }
623
624
625 /**
626  * Transport service wants us to send a message. Tell the communicator.
627  *
628  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
629  * @param smt the transmission request
630  */
631 static void
632 handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
633 {
634   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
635   const struct GNUNET_MessageHeader *mh;
636   struct GNUNET_MQ_Envelope *env;
637   struct AckPending *ap;
638   struct GNUNET_TRANSPORT_QueueHandle *qh;
639
640   for (qh = ch->queue_head; NULL != qh; qh = qh->next)
641     if ((qh->queue_id == smt->qid) &&
642         (0 == memcmp (&qh->peer,
643                       &smt->receiver,
644                       sizeof(struct GNUNET_PeerIdentity))))
645       break;
646   if (NULL == qh)
647   {
648     /* queue is already gone, tell transport this one failed */
649     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
650                 "Transmission failed, queue no longer exists.\n");
651     send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid);
652     return;
653   }
654   ap = GNUNET_new (struct AckPending);
655   ap->ch = ch;
656   ap->receiver = smt->receiver;
657   ap->mid = smt->mid;
658   GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
659   mh = (const struct GNUNET_MessageHeader *) &smt[1];
660   env = GNUNET_MQ_msg_copy (mh);
661   GNUNET_MQ_notify_sent (env, &send_ack_cb, ap);
662   GNUNET_MQ_send (qh->mq, env);
663 }
664
665
666 /**
667  * Transport service gives us backchannel message. Check if @a bi
668  * is well-formed.
669  *
670  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
671  * @param bi the backchannel message
672  * @return #GNUNET_OK if @a smt is well-formed
673  */
674 static int
675 check_backchannel_incoming (
676   void *cls,
677   const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
678 {
679   (void) cls;
680   GNUNET_MQ_check_boxed_message (bi);
681   return GNUNET_OK;
682 }
683
684
685 /**
686  * Transport service gives us backchannel message. Handle it.
687  *
688  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
689  * @param bi the backchannel message
690  */
691 static void
692 handle_backchannel_incoming (
693   void *cls,
694   const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
695 {
696   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
697
698   if (NULL != ch->notify_cb)
699     ch->notify_cb (ch->notify_cb_cls,
700                    &bi->pid,
701                    (const struct GNUNET_MessageHeader *) &bi[1]);
702   else
703     GNUNET_log (
704       GNUNET_ERROR_TYPE_INFO,
705       _ ("Dropped backchanel message: handler not provided by communicator\n"));
706 }
707
708
709 /**
710  * (re)connect our communicator to the transport service
711  *
712  * @param ch handle to reconnect
713  */
714 static void
715 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
716 {
717   struct GNUNET_MQ_MessageHandler handlers[] =
718   { GNUNET_MQ_hd_fixed_size (incoming_ack,
719                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
720                              struct GNUNET_TRANSPORT_IncomingMessageAck,
721                              ch),
722     GNUNET_MQ_hd_var_size (create_queue,
723                            GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
724                            struct GNUNET_TRANSPORT_CreateQueue,
725                            ch),
726     GNUNET_MQ_hd_var_size (send_msg,
727                            GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
728                            struct GNUNET_TRANSPORT_SendMessageTo,
729                            ch),
730     GNUNET_MQ_hd_var_size (
731       backchannel_incoming,
732       GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
733       struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
734       ch),
735     GNUNET_MQ_handler_end () };
736   struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
737   struct GNUNET_MQ_Envelope *env;
738
739   ch->mq =
740     GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch);
741   if (NULL == ch->mq)
742     return;
743   env = GNUNET_MQ_msg_extra (cam,
744                              strlen (ch->addr_prefix) + 1,
745                              GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
746   cam->cc = htonl ((uint32_t) ch->cc);
747   memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1);
748   GNUNET_MQ_send (ch->mq, env);
749   for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
750        ai = ai->next)
751     send_add_address (ai);
752   for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
753        qh = qh->next)
754     send_add_queue (qh);
755 }
756
757
758 /**
759  * Connect to the transport service.
760  *
761  * @param cfg configuration to use
762  * @param config_section section of the configuration to use for options
763  * @param addr_prefix address prefix for addresses supported by this
764  *        communicator, could be NULL for incoming-only communicators
765  * @param cc what characteristics does the communicator have?
766  * @param mtu maximum message size supported by communicator, 0 if
767  *            sending is not supported, SIZE_MAX for no MTU
768  * @param mq_init function to call to initialize a message queue given
769  *                the address of another peer, can be NULL if the
770  *                communicator only supports receiving messages
771  * @param mq_init_cls closure for @a mq_init
772  * @param notify_cb function to pass backchannel messages to communicator
773  * @param notify_cb_cls closure for @a notify_cb
774  * @return NULL on error
775  */
776 struct GNUNET_TRANSPORT_CommunicatorHandle *
777 GNUNET_TRANSPORT_communicator_connect (
778   const struct GNUNET_CONFIGURATION_Handle *cfg,
779   const char *config_section,
780   const char *addr_prefix,
781   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
782   GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
783   void *mq_init_cls,
784   GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
785   void *notify_cb_cls)
786 {
787   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
788
789   ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
790   ch->cfg = cfg;
791   ch->config_section = config_section;
792   ch->addr_prefix = addr_prefix;
793   ch->mq_init = mq_init;
794   ch->mq_init_cls = mq_init_cls;
795   ch->notify_cb = notify_cb;
796   ch->notify_cb_cls = notify_cb_cls;
797   ch->cc = cc;
798   reconnect (ch);
799   if (GNUNET_OK !=
800       GNUNET_CONFIGURATION_get_value_number (cfg,
801                                              config_section,
802                                              "MAX_QUEUE_LENGTH",
803                                              &ch->max_queue_length))
804     ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
805   if (NULL == ch->mq)
806   {
807     GNUNET_free (ch);
808     return NULL;
809   }
810   return ch;
811 }
812
813
814 /**
815  * Disconnect from the transport service.
816  *
817  * @param ch handle returned from connect
818  */
819 void
820 GNUNET_TRANSPORT_communicator_disconnect (
821   struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
822 {
823   disconnect (ch);
824   while (NULL != ch->ai_head)
825   {
826     GNUNET_break (0);  /* communicator forgot to remove address, warn! */
827     GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
828   }
829   GNUNET_free (ch);
830 }
831
832
833 /* ************************* Receiving *************************** */
834
835
836 /**
837  * Notify transport service that the communicator has received
838  * a message.
839  *
840  * @param ch connection to transport service
841  * @param sender presumed sender of the message (details to be checked
842  *        by higher layers)
843  * @param msg the message
844  * @param expected_addr_validity how long does the communicator believe it
845  *        will continue to be able to receive messages from the same address
846  *        on which it received this message?
847  * @param cb function to call once handling the message is done, NULL if
848  *         flow control is not supported by this communicator
849  * @param cb_cls closure for @a cb
850  * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
851  *         immediately dropped due to memory limitations (communicator
852  *         should try to apply back pressure),
853  *         #GNUNET_SYSERR if the message could not be delivered because
854  *         the tranport service is not yet up
855  */
856 int
857 GNUNET_TRANSPORT_communicator_receive (
858   struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
859   const struct GNUNET_PeerIdentity *sender,
860   const struct GNUNET_MessageHeader *msg,
861   struct GNUNET_TIME_Relative expected_addr_validity,
862   GNUNET_TRANSPORT_MessageCompletedCallback cb,
863   void *cb_cls)
864 {
865   struct GNUNET_MQ_Envelope *env;
866   struct GNUNET_TRANSPORT_IncomingMessage *im;
867   uint16_t msize;
868
869   if (NULL == ch->mq)
870     return GNUNET_SYSERR;
871   if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length))
872   {
873     GNUNET_log (
874       GNUNET_ERROR_TYPE_WARNING,
875       "Dropping message: transport is too slow, queue length %llu exceeded\n",
876       ch->max_queue_length);
877     return GNUNET_NO;
878   }
879
880   msize = ntohs (msg->size);
881   env =
882     GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
883   if (NULL == env)
884   {
885     GNUNET_break (0);
886     return GNUNET_SYSERR;
887   }
888   im->expected_address_validity =
889     GNUNET_TIME_relative_hton (expected_addr_validity);
890   im->sender = *sender;
891   // FIXME: this is expensive, would be better if we would
892   // re-design the API to allow us to create the envelope first,
893   // and then have the application fill in the body so we do
894   // not have to memcpy()
895   memcpy (&im[1], msg, msize);
896   if (NULL != cb)
897   {
898     struct FlowControl *fc;
899
900     im->fc_on = htonl (GNUNET_YES);
901     im->fc_id = ch->fc_gen++;
902     fc = GNUNET_new (struct FlowControl);
903     fc->sender = *sender;
904     fc->id = im->fc_id;
905     fc->cb = cb;
906     fc->cb_cls = cb_cls;
907     GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc);
908   }
909   GNUNET_MQ_send (ch->mq, env);
910   return GNUNET_OK;
911 }
912
913
914 /* ************************* Discovery *************************** */
915
916
917 /**
918  * Notify transport service that an MQ became available due to an
919  * "inbound" connection or because the communicator discovered the
920  * presence of another peer.
921  *
922  * @param ch connection to transport service
923  * @param peer peer with which we can now communicate
924  * @param address address in human-readable format, 0-terminated, UTF-8
925  * @param mtu maximum message size supported by queue, 0 if
926  *            sending is not supported, SIZE_MAX for no MTU
927  * @param nt which network type does the @a address belong to?
928  * @param cc what characteristics does the communicator have?
929  * @param cs what is the connection status of the queue?
930  * @param mq message queue of the @a peer
931  * @return API handle identifying the new MQ
932  */
933 struct GNUNET_TRANSPORT_QueueHandle *
934 GNUNET_TRANSPORT_communicator_mq_add (
935   struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
936   const struct GNUNET_PeerIdentity *peer,
937   const char *address,
938   uint32_t mtu,
939   enum GNUNET_NetworkType nt,
940   enum GNUNET_TRANSPORT_ConnectionStatus cs,
941   struct GNUNET_MQ_Handle *mq)
942 {
943   struct GNUNET_TRANSPORT_QueueHandle *qh;
944
945   qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
946   qh->ch = ch;
947   qh->peer = *peer;
948   qh->address = GNUNET_strdup (address);
949   qh->nt = nt;
950   qh->mtu = mtu;
951   qh->cs = cs;
952   qh->mq = mq;
953   qh->queue_id = ch->queue_gen++;
954   GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh);
955   send_add_queue (qh);
956   return qh;
957 }
958
959
960 /**
961  * Notify transport service that an MQ became unavailable due to a
962  * disconnect or timeout.
963  *
964  * @param qh handle for the queue that must be invalidated
965  */
966 void
967 GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
968 {
969   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
970
971   send_del_queue (qh);
972   GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh);
973   GNUNET_MQ_destroy (qh->mq);
974   GNUNET_free (qh->address);
975   GNUNET_free (qh);
976 }
977
978
979 /**
980  * Notify transport service about an address that this communicator
981  * provides for this peer.
982  *
983  * @param ch connection to transport service
984  * @param address our address in human-readable format, 0-terminated, UTF-8
985  * @param nt which network type does the address belong to?
986  * @param expiration when does the communicator forsee this address expiring?
987  */
988 struct GNUNET_TRANSPORT_AddressIdentifier *
989 GNUNET_TRANSPORT_communicator_address_add (
990   struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
991   const char *address,
992   enum GNUNET_NetworkType nt,
993   struct GNUNET_TIME_Relative expiration)
994 {
995   struct GNUNET_TRANSPORT_AddressIdentifier *ai;
996
997   ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
998   ai->ch = ch;
999   ai->address = GNUNET_strdup (address);
1000   ai->nt = nt;
1001   ai->expiration = expiration;
1002   ai->aid = ch->aid_gen++;
1003   GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai);
1004   send_add_address (ai);
1005   return ai;
1006 }
1007
1008
1009 /**
1010  * Notify transport service about an address that this communicator no
1011  * longer provides for this peer.
1012  *
1013  * @param ai address that is no longer provided
1014  */
1015 void
1016 GNUNET_TRANSPORT_communicator_address_remove (
1017   struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1018 {
1019   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1020
1021   send_del_address (ai);
1022   GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai);
1023   GNUNET_free (ai->address);
1024   GNUNET_free (ai);
1025 }
1026
1027
1028 /* ************************* Backchannel *************************** */
1029
1030
1031 /**
1032  * The communicator asks the transport service to route a message via
1033  * a different path to another communicator service at another peer.
1034  * This must only be done for special control traffic (as there is no
1035  * flow control for this API), such as acknowledgements, and generally
1036  * only be done if the communicator is uni-directional (i.e. cannot
1037  * send the message back itself).
1038  *
1039  * @param ch handle of this communicator
1040  * @param pid peer to send the message to
1041  * @param comm name of the communicator to send the message to
1042  * @param header header of the message to transmit and pass via the
1043  *        notify-API to @a pid's communicator @a comm
1044  */
1045 void
1046 GNUNET_TRANSPORT_communicator_notify (
1047   struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1048   const struct GNUNET_PeerIdentity *pid,
1049   const char *comm,
1050   const struct GNUNET_MessageHeader *header)
1051 {
1052   struct GNUNET_MQ_Envelope *env;
1053   struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1054   size_t slen = strlen (comm) + 1;
1055   uint16_t mlen = ntohs (header->size);
1056
1057   GNUNET_assert (mlen + slen + sizeof(*cb) < UINT16_MAX);
1058   env =
1059     GNUNET_MQ_msg_extra (cb,
1060                          slen + mlen,
1061                          GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1062   cb->pid = *pid;
1063   memcpy (&cb[1], header, mlen);
1064   memcpy (((char *) &cb[1]) + mlen, comm, slen);
1065   GNUNET_MQ_send (ch->mq, env);
1066 }
1067
1068
1069 /* end of transport_api2_communication.c */