misc bugfixes
[oweals/gnunet.git] / src / transport / transport_api2_communication.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file transport/transport_api2_communication.c
23  * @brief implementation of the gnunet_transport_communication_service.h API
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_transport_communication_service.h"
30 #include "gnunet_ats_transport_service.h"
31 #include "transport.h"
32
33
34 /**
35  * How many messages do we keep at most in the queue to the
36  * transport service before we start to drop (default,
37  * can be changed via the configuration file).
38  */
39 #define DEFAULT_MAX_QUEUE_LENGTH 16
40
41
42 /**
43  * Information we track per packet to enable flow control.
44  */
45 struct FlowControl
46 {
47   /**
48    * Kept in a DLL.
49    */
50   struct FlowControl *next;
51
52   /**
53    * Kept in a DLL.
54    */
55   struct FlowControl *prev;
56
57   /**
58    * Function to call once the message was processed.
59    */
60   GNUNET_TRANSPORT_MessageCompletedCallback cb;
61
62   /**
63    * Closure for @e cb
64    */
65   void *cb_cls;
66
67   /**
68    * Which peer is this about?
69    */
70   struct GNUNET_PeerIdentity sender;
71
72   /**
73    * More-or-less unique ID for the message.
74    */
75   uint64_t id;
76 };
77
78
79 /**
80  * Information we track per message to tell the transport about
81  * success or failures.
82  */
83 struct AckPending
84 {
85   /**
86    * Kept in a DLL.
87    */
88   struct AckPending *next;
89
90   /**
91    * Kept in a DLL.
92    */
93   struct AckPending *prev;
94
95   /**
96    * Communicator this entry belongs to.
97    */
98   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
99
100   /**
101    * Which peer is this about?
102    */
103   struct GNUNET_PeerIdentity receiver;
104
105   /**
106    * More-or-less unique ID for the message.
107    */
108   uint64_t mid;
109 };
110
111
112 /**
113  * Opaque handle to the transport service for communicators.
114  */
115 struct GNUNET_TRANSPORT_CommunicatorHandle
116 {
117   /**
118    * Head of DLL of addresses this communicator offers to the transport service.
119    */
120   struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
121
122   /**
123    * Tail of DLL of addresses this communicator offers to the transport service.
124    */
125   struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
126
127   /**
128    * DLL of messages awaiting flow control confirmation (ack).
129    */
130   struct FlowControl *fc_head;
131
132   /**
133    * DLL of messages awaiting flow control confirmation (ack).
134    */
135   struct FlowControl *fc_tail;
136
137   /**
138    * DLL of messages awaiting transmission confirmation (ack).
139    */
140   struct AckPending *ap_head;
141
142   /**
143    * DLL of messages awaiting transmission confirmation (ack).
144    */
145   struct AckPending *ap_tail;
146
147   /**
148    * DLL of queues we offer.
149    */
150   struct GNUNET_TRANSPORT_QueueHandle *queue_head;
151
152   /**
153    * DLL of queues we offer.
154    */
155   struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
156
157   /**
158    * Our configuration.
159    */
160   const struct GNUNET_CONFIGURATION_Handle *cfg;
161
162   /**
163    * Config section to use.
164    */
165   const char *config_section;
166
167   /**
168    * Address prefix to use.
169    */
170   const char *addr_prefix;
171
172   /**
173    * Function to call when the transport service wants us to initiate
174    * a communication channel with another peer.
175    */
176   GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
177
178   /**
179    * Closure for @e mq_init.
180    */
181   void *mq_init_cls;
182
183   /**
184    * Function to call when the transport service receives messages
185    * for a communicator (i.e. for NAT traversal or for non-bidirectional
186    * communicators).
187    */
188   GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
189
190   /**
191    * Closure for @e notify_Cb.
192    */
193   void *notify_cb_cls;
194
195   /**
196    * Queue to talk to the transport service.
197    */
198   struct GNUNET_MQ_Handle *mq;
199
200   /**
201    * Maximum permissable queue length.
202    */
203   unsigned long long max_queue_length;
204
205   /**
206    * Flow-control identifier generator.
207    */
208   uint64_t fc_gen;
209
210   /**
211    * Internal UUID for the address used in communication with the
212    * transport service.
213    */
214   uint32_t aid_gen;
215
216   /**
217    * Queue identifier generator.
218    */
219   uint32_t queue_gen;
220
221   /**
222    * Characteristics of the communicator.
223    */
224   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
225 };
226
227
228 /**
229  * Handle returned to identify the internal data structure the transport
230  * API has created to manage a message queue to a particular peer.
231  */
232 struct GNUNET_TRANSPORT_QueueHandle
233 {
234
235   /**
236    * Kept in a DLL.
237    */
238   struct GNUNET_TRANSPORT_QueueHandle *next;
239
240   /**
241    * Kept in a DLL.
242    */
243   struct GNUNET_TRANSPORT_QueueHandle *prev;
244
245   /**
246    * Handle this queue belongs to.
247    */
248   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
249
250   /**
251    * Address used by the communication queue.
252    */
253   char *address;
254
255   /**
256    * The queue itself.
257    */
258   struct GNUNET_MQ_Handle *mq;
259
260   /**
261    * Which peer we can communciate with.
262    */
263   struct GNUNET_PeerIdentity peer;
264
265   /**
266    * Network type of the communciation queue.
267    */
268   enum GNUNET_NetworkType nt;
269
270   /**
271    * Communication status of the queue.
272    */
273   enum GNUNET_TRANSPORT_ConnectionStatus cs;
274
275   /**
276    * ID for this queue when talking to the transport service.
277    */
278   uint32_t queue_id;
279
280   /**
281    * Maximum transmission unit for the queue.
282    */
283   uint32_t mtu;
284 };
285
286
287 /**
288  * Internal representation of an address a communicator is
289  * currently providing for the transport service.
290  */
291 struct GNUNET_TRANSPORT_AddressIdentifier
292 {
293
294   /**
295    * Kept in a DLL.
296    */
297   struct GNUNET_TRANSPORT_AddressIdentifier *next;
298
299   /**
300    * Kept in a DLL.
301    */
302   struct GNUNET_TRANSPORT_AddressIdentifier *prev;
303
304   /**
305    * Transport handle where the address was added.
306    */
307   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
308
309   /**
310    * The actual address.
311    */
312   char *address;
313
314   /**
315    * When does the address expire? (Expected lifetime of the
316    * address.)
317    */
318   struct GNUNET_TIME_Relative expiration;
319
320   /**
321    * Internal UUID for the address used in communication with the
322    * transport service.
323    */
324   uint32_t aid;
325
326   /**
327    * Network type for the address.
328    */
329   enum GNUNET_NetworkType nt;
330 };
331
332
333 /**
334  * (re)connect our communicator to the transport service
335  *
336  * @param ch handle to reconnect
337  */
338 static void
339 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
340
341
342 /**
343  * Send message to the transport service about address @a ai
344  * being now available.
345  *
346  * @param ai address to add
347  */
348 static void
349 send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
350 {
351   struct GNUNET_MQ_Envelope *env;
352   struct GNUNET_TRANSPORT_AddAddressMessage *aam;
353
354   if (NULL == ai->ch->mq)
355     return;
356   env = GNUNET_MQ_msg_extra (aam,
357                              strlen (ai->address) + 1,
358                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
359   aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
360   aam->nt = htonl ((uint32_t) ai->nt);
361   memcpy (&aam[1], ai->address, strlen (ai->address) + 1);
362   GNUNET_MQ_send (ai->ch->mq, env);
363 }
364
365
366 /**
367  * Send message to the transport service about address @a ai
368  * being no longer available.
369  *
370  * @param ai address to delete
371  */
372 static void
373 send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
374 {
375   struct GNUNET_MQ_Envelope *env;
376   struct GNUNET_TRANSPORT_DelAddressMessage *dam;
377
378   if (NULL == ai->ch->mq)
379     return;
380   env = GNUNET_MQ_msg (dam, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
381   dam->aid = htonl (ai->aid);
382   GNUNET_MQ_send (ai->ch->mq, env);
383 }
384
385
386 /**
387  * Send message to the transport service about queue @a qh
388  * being now available.
389  *
390  * @param qh queue to add
391  */
392 static void
393 send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
394 {
395   struct GNUNET_MQ_Envelope *env;
396   struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
397
398   if (NULL == qh->ch->mq)
399     return;
400   env = GNUNET_MQ_msg_extra (aqm,
401                              strlen (qh->address) + 1,
402                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
403   aqm->qid = htonl (qh->queue_id);
404   aqm->receiver = qh->peer;
405   aqm->nt = htonl ((uint32_t) qh->nt);
406   aqm->mtu = htonl (qh->mtu);
407   aqm->cs = htonl ((uint32_t) qh->cs);
408   memcpy (&aqm[1], qh->address, strlen (qh->address) + 1);
409   GNUNET_MQ_send (qh->ch->mq, env);
410 }
411
412
413 /**
414  * Send message to the transport service about queue @a qh
415  * being no longer available.
416  *
417  * @param qh queue to delete
418  */
419 static void
420 send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
421 {
422   struct GNUNET_MQ_Envelope *env;
423   struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
424
425   if (NULL == qh->ch->mq)
426     return;
427   env = GNUNET_MQ_msg (dqm, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
428   dqm->qid = htonl (qh->queue_id);
429   dqm->receiver = qh->peer;
430   GNUNET_MQ_send (qh->ch->mq, env);
431 }
432
433
434 /**
435  * Disconnect from the transport service.  Purges
436  * all flow control entries as we will no longer receive
437  * the ACKs.  Purges the ack pending entries as the
438  * transport will no longer expect the confirmations.
439  *
440  * @param ch service to disconnect from
441  */
442 static void
443 disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
444 {
445   struct FlowControl *fcn;
446   struct AckPending *apn;
447
448   for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fcn)
449   {
450     fcn = fc->next;
451     GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
452     fc->cb (fc->cb_cls, GNUNET_SYSERR);
453     GNUNET_free (fc);
454   }
455   for (struct AckPending *ap = ch->ap_head; NULL != ap; ap = apn)
456   {
457     apn = ap->next;
458     GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
459     GNUNET_free (ap);
460   }
461   if (NULL == ch->mq)
462     return;
463   GNUNET_MQ_destroy (ch->mq);
464   ch->mq = NULL;
465 }
466
467
468 /**
469  * Function called on MQ errors.
470  */
471 static void
472 error_handler (void *cls, enum GNUNET_MQ_Error error)
473 {
474   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
475
476   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
477               "MQ failure %d, reconnecting to transport service.\n",
478               error);
479   disconnect (ch);
480   /* TODO: maybe do this with exponential backoff/delay */
481   reconnect (ch);
482 }
483
484
485 /**
486  * Transport service acknowledged a message we gave it
487  * (with flow control enabled). Tell the communicator.
488  *
489  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
490  * @param incoming_ack the ack
491  */
492 static void
493 handle_incoming_ack (
494   void *cls,
495   const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
496 {
497   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
498
499   for (struct FlowControl *fc = ch->fc_head; NULL != fc; fc = fc->next)
500   {
501     if ((fc->id == incoming_ack->fc_id) &&
502         (0 == memcmp (&fc->sender,
503                       &incoming_ack->sender,
504                       sizeof (struct GNUNET_PeerIdentity))))
505     {
506       GNUNET_CONTAINER_DLL_remove (ch->fc_head, ch->fc_tail, fc);
507       fc->cb (fc->cb_cls, GNUNET_OK);
508       GNUNET_free (fc);
509       return;
510     }
511   }
512   GNUNET_break (0);
513   disconnect (ch);
514   /* TODO: maybe do this with exponential backoff/delay */
515   reconnect (ch);
516 }
517
518
519 /**
520  * Transport service wants us to create a queue. Check if @a cq
521  * is well-formed.
522  *
523  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
524  * @param cq the queue creation request
525  * @return #GNUNET_OK if @a smt is well-formed
526  */
527 static int
528 check_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
529 {
530   (void) cls;
531   GNUNET_MQ_check_zero_termination (cq);
532   return GNUNET_OK;
533 }
534
535
536 /**
537  * Transport service wants us to create a queue. Tell the communicator.
538  *
539  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
540  * @param cq the queue creation request
541  */
542 static void
543 handle_create_queue (void *cls, const struct GNUNET_TRANSPORT_CreateQueue *cq)
544 {
545   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
546   const char *addr = (const char *) &cq[1];
547   struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
548   struct GNUNET_MQ_Envelope *env;
549
550   if (GNUNET_OK != ch->mq_init (ch->mq_init_cls, &cq->receiver, addr))
551   {
552     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
553                 "Address `%s' invalid for this communicator\n",
554                 addr);
555     env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
556   }
557   else
558   {
559     env = GNUNET_MQ_msg (cqr, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
560   }
561   cqr->request_id = cq->request_id;
562   GNUNET_MQ_send (ch->mq, env);
563 }
564
565
566 /**
567  * Transport service wants us to send a message. Check if @a smt
568  * is well-formed.
569  *
570  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
571  * @param smt the transmission request
572  * @return #GNUNET_OK if @a smt is well-formed
573  */
574 static int
575 check_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
576 {
577   (void) cls;
578   GNUNET_MQ_check_boxed_message (smt);
579   return GNUNET_OK;
580 }
581
582
583 /**
584  * Notify transport service about @a status of a message with
585  * @a mid sent to @a receiver.
586  *
587  * @param ch handle
588  * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
589  * @param receiver which peer was the receiver
590  * @param mid message that the ack is about
591  */
592 static void
593 send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
594           int status,
595           const struct GNUNET_PeerIdentity *receiver,
596           uint64_t mid)
597 {
598   struct GNUNET_MQ_Envelope *env;
599   struct GNUNET_TRANSPORT_SendMessageToAck *ack;
600
601   env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
602   ack->status = htonl (status);
603   ack->mid = mid;
604   ack->receiver = *receiver;
605   GNUNET_MQ_send (ch->mq, env);
606 }
607
608
609 /**
610  * Message queue transmission by communicator was successful,
611  * notify transport service.
612  *
613  * @param cls an `struct AckPending *`
614  */
615 static void
616 send_ack_cb (void *cls)
617 {
618   struct AckPending *ap = cls;
619   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
620
621   GNUNET_CONTAINER_DLL_remove (ch->ap_head, ch->ap_tail, ap);
622   send_ack (ch, GNUNET_OK, &ap->receiver, ap->mid);
623   GNUNET_free (ap);
624 }
625
626
627 /**
628  * Transport service wants us to send a message. Tell the communicator.
629  *
630  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
631  * @param smt the transmission request
632  */
633 static void
634 handle_send_msg (void *cls, const struct GNUNET_TRANSPORT_SendMessageTo *smt)
635 {
636   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
637   const struct GNUNET_MessageHeader *mh;
638   struct GNUNET_MQ_Envelope *env;
639   struct AckPending *ap;
640   struct GNUNET_TRANSPORT_QueueHandle *qh;
641
642   for (qh = ch->queue_head; NULL != qh; qh = qh->next)
643     if ((qh->queue_id == smt->qid) &&
644         (0 == memcmp (&qh->peer,
645                       &smt->receiver,
646                       sizeof (struct GNUNET_PeerIdentity))))
647       break;
648   if (NULL == qh)
649   {
650     /* queue is already gone, tell transport this one failed */
651     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
652                 "Transmission failed, queue no longer exists.\n");
653     send_ack (ch, GNUNET_NO, &smt->receiver, smt->mid);
654     return;
655   }
656   ap = GNUNET_new (struct AckPending);
657   ap->ch = ch;
658   ap->receiver = smt->receiver;
659   ap->mid = smt->mid;
660   GNUNET_CONTAINER_DLL_insert (ch->ap_head, ch->ap_tail, ap);
661   mh = (const struct GNUNET_MessageHeader *) &smt[1];
662   env = GNUNET_MQ_msg_copy (mh);
663   GNUNET_MQ_notify_sent (env, &send_ack_cb, ap);
664   GNUNET_MQ_send (qh->mq, env);
665 }
666
667
668 /**
669  * Transport service gives us backchannel message. Check if @a bi
670  * is well-formed.
671  *
672  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
673  * @param bi the backchannel message
674  * @return #GNUNET_OK if @a smt is well-formed
675  */
676 static int
677 check_backchannel_incoming (
678   void *cls,
679   const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
680 {
681   (void) cls;
682   GNUNET_MQ_check_boxed_message (bi);
683   return GNUNET_OK;
684 }
685
686
687 /**
688  * Transport service gives us backchannel message. Handle it.
689  *
690  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
691  * @param bi the backchannel message
692  */
693 static void
694 handle_backchannel_incoming (
695   void *cls,
696   const struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *bi)
697 {
698   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
699
700   if (NULL != ch->notify_cb)
701     ch->notify_cb (ch->notify_cb_cls,
702                    &bi->pid,
703                    (const struct GNUNET_MessageHeader *) &bi[1]);
704   else
705     GNUNET_log (
706       GNUNET_ERROR_TYPE_INFO,
707       _ ("Dropped backchanel message: handler not provided by communicator\n"));
708 }
709
710
711 /**
712  * (re)connect our communicator to the transport service
713  *
714  * @param ch handle to reconnect
715  */
716 static void
717 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
718 {
719   struct GNUNET_MQ_MessageHandler handlers[] =
720     {GNUNET_MQ_hd_fixed_size (incoming_ack,
721                               GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
722                               struct GNUNET_TRANSPORT_IncomingMessageAck,
723                               ch),
724      GNUNET_MQ_hd_var_size (create_queue,
725                             GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
726                             struct GNUNET_TRANSPORT_CreateQueue,
727                             ch),
728      GNUNET_MQ_hd_var_size (send_msg,
729                             GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
730                             struct GNUNET_TRANSPORT_SendMessageTo,
731                             ch),
732      GNUNET_MQ_hd_var_size (
733        backchannel_incoming,
734        GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL_INCOMING,
735        struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming,
736        ch),
737      GNUNET_MQ_handler_end ()};
738   struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
739   struct GNUNET_MQ_Envelope *env;
740
741   ch->mq =
742     GNUNET_CLIENT_connect (ch->cfg, "transport", handlers, &error_handler, ch);
743   if (NULL == ch->mq)
744     return;
745   env = GNUNET_MQ_msg_extra (cam,
746                              strlen (ch->addr_prefix) + 1,
747                              GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
748   cam->cc = htonl ((uint32_t) ch->cc);
749   memcpy (&cam[1], ch->addr_prefix, strlen (ch->addr_prefix) + 1);
750   GNUNET_MQ_send (ch->mq, env);
751   for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head; NULL != ai;
752        ai = ai->next)
753     send_add_address (ai);
754   for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head; NULL != qh;
755        qh = qh->next)
756     send_add_queue (qh);
757 }
758
759
760 /**
761  * Connect to the transport service.
762  *
763  * @param cfg configuration to use
764  * @param config_section section of the configuration to use for options
765  * @param addr_prefix address prefix for addresses supported by this
766  *        communicator, could be NULL for incoming-only communicators
767  * @param cc what characteristics does the communicator have?
768  * @param mtu maximum message size supported by communicator, 0 if
769  *            sending is not supported, SIZE_MAX for no MTU
770  * @param mq_init function to call to initialize a message queue given
771  *                the address of another peer, can be NULL if the
772  *                communicator only supports receiving messages
773  * @param mq_init_cls closure for @a mq_init
774  * @param notify_cb function to pass backchannel messages to communicator
775  * @param notify_cb_cls closure for @a notify_cb
776  * @return NULL on error
777  */
778 struct GNUNET_TRANSPORT_CommunicatorHandle *
779 GNUNET_TRANSPORT_communicator_connect (
780   const struct GNUNET_CONFIGURATION_Handle *cfg,
781   const char *config_section,
782   const char *addr_prefix,
783   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
784   GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
785   void *mq_init_cls,
786   GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
787   void *notify_cb_cls)
788 {
789   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
790
791   ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
792   ch->cfg = cfg;
793   ch->config_section = config_section;
794   ch->addr_prefix = addr_prefix;
795   ch->mq_init = mq_init;
796   ch->mq_init_cls = mq_init_cls;
797   ch->notify_cb = notify_cb;
798   ch->notify_cb_cls = notify_cb_cls;
799   ch->cc = cc;
800   reconnect (ch);
801   if (GNUNET_OK !=
802       GNUNET_CONFIGURATION_get_value_number (cfg,
803                                              config_section,
804                                              "MAX_QUEUE_LENGTH",
805                                              &ch->max_queue_length))
806     ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
807   if (NULL == ch->mq)
808   {
809     GNUNET_free (ch);
810     return NULL;
811   }
812   return ch;
813 }
814
815
816 /**
817  * Disconnect from the transport service.
818  *
819  * @param ch handle returned from connect
820  */
821 void
822 GNUNET_TRANSPORT_communicator_disconnect (
823   struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
824 {
825   disconnect (ch);
826   while (NULL != ch->ai_head)
827   {
828     GNUNET_break (0); /* communicator forgot to remove address, warn! */
829     GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
830   }
831   GNUNET_free (ch);
832 }
833
834
835 /* ************************* Receiving *************************** */
836
837
838 /**
839  * Notify transport service that the communicator has received
840  * a message.
841  *
842  * @param ch connection to transport service
843  * @param sender presumed sender of the message (details to be checked
844  *        by higher layers)
845  * @param msg the message
846  * @param expected_addr_validity how long does the communicator believe it
847  *        will continue to be able to receive messages from the same address
848  *        on which it received this message?
849  * @param cb function to call once handling the message is done, NULL if
850  *         flow control is not supported by this communicator
851  * @param cb_cls closure for @a cb
852  * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
853  *         immediately dropped due to memory limitations (communicator
854  *         should try to apply back pressure),
855  *         #GNUNET_SYSERR if the message could not be delivered because
856  *         the tranport service is not yet up
857  */
858 int
859 GNUNET_TRANSPORT_communicator_receive (
860   struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
861   const struct GNUNET_PeerIdentity *sender,
862   const struct GNUNET_MessageHeader *msg,
863   struct GNUNET_TIME_Relative expected_addr_validity,
864   GNUNET_TRANSPORT_MessageCompletedCallback cb,
865   void *cb_cls)
866 {
867   struct GNUNET_MQ_Envelope *env;
868   struct GNUNET_TRANSPORT_IncomingMessage *im;
869   uint16_t msize;
870
871   if (NULL == ch->mq)
872     return GNUNET_SYSERR;
873   if ((NULL == cb) && (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length))
874   {
875     GNUNET_log (
876       GNUNET_ERROR_TYPE_WARNING,
877       "Dropping message: transprot is too slow, queue length %llu exceeded\n",
878       ch->max_queue_length);
879     return GNUNET_NO;
880   }
881
882   msize = ntohs (msg->size);
883   env =
884     GNUNET_MQ_msg_extra (im, msize, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
885   if (NULL == env)
886   {
887     GNUNET_break (0);
888     return GNUNET_SYSERR;
889   }
890   im->expected_address_validity =
891     GNUNET_TIME_relative_hton (expected_addr_validity);
892   im->sender = *sender;
893   memcpy (&im[1], msg, msize);
894   if (NULL != cb)
895   {
896     struct FlowControl *fc;
897
898     im->fc_on = htonl (GNUNET_YES);
899     im->fc_id = ch->fc_gen++;
900     fc = GNUNET_new (struct FlowControl);
901     fc->sender = *sender;
902     fc->id = im->fc_id;
903     fc->cb = cb;
904     fc->cb_cls = cb_cls;
905     GNUNET_CONTAINER_DLL_insert (ch->fc_head, ch->fc_tail, fc);
906   }
907   GNUNET_MQ_send (ch->mq, env);
908   return GNUNET_OK;
909 }
910
911
912 /* ************************* Discovery *************************** */
913
914
915 /**
916  * Notify transport service that an MQ became available due to an
917  * "inbound" connection or because the communicator discovered the
918  * presence of another peer.
919  *
920  * @param ch connection to transport service
921  * @param peer peer with which we can now communicate
922  * @param address address in human-readable format, 0-terminated, UTF-8
923  * @param mtu maximum message size supported by queue, 0 if
924  *            sending is not supported, SIZE_MAX for no MTU
925  * @param nt which network type does the @a address belong to?
926  * @param cc what characteristics does the communicator have?
927  * @param cs what is the connection status of the queue?
928  * @param mq message queue of the @a peer
929  * @return API handle identifying the new MQ
930  */
931 struct GNUNET_TRANSPORT_QueueHandle *
932 GNUNET_TRANSPORT_communicator_mq_add (
933   struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
934   const struct GNUNET_PeerIdentity *peer,
935   const char *address,
936   uint32_t mtu,
937   enum GNUNET_NetworkType nt,
938   enum GNUNET_TRANSPORT_ConnectionStatus cs,
939   struct GNUNET_MQ_Handle *mq)
940 {
941   struct GNUNET_TRANSPORT_QueueHandle *qh;
942
943   qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
944   qh->ch = ch;
945   qh->peer = *peer;
946   qh->address = GNUNET_strdup (address);
947   qh->nt = nt;
948   qh->mtu = mtu;
949   qh->cs = cs;
950   qh->mq = mq;
951   qh->queue_id = ch->queue_gen++;
952   GNUNET_CONTAINER_DLL_insert (ch->queue_head, ch->queue_tail, qh);
953   send_add_queue (qh);
954   return qh;
955 }
956
957
958 /**
959  * Notify transport service that an MQ became unavailable due to a
960  * disconnect or timeout.
961  *
962  * @param qh handle for the queue that must be invalidated
963  */
964 void
965 GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
966 {
967   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
968
969   send_del_queue (qh);
970   GNUNET_CONTAINER_DLL_remove (ch->queue_head, ch->queue_tail, qh);
971   GNUNET_MQ_destroy (qh->mq);
972   GNUNET_free (qh->address);
973   GNUNET_free (qh);
974 }
975
976
977 /**
978  * Notify transport service about an address that this communicator
979  * provides for this peer.
980  *
981  * @param ch connection to transport service
982  * @param address our address in human-readable format, 0-terminated, UTF-8
983  * @param nt which network type does the address belong to?
984  * @param expiration when does the communicator forsee this address expiring?
985  */
986 struct GNUNET_TRANSPORT_AddressIdentifier *
987 GNUNET_TRANSPORT_communicator_address_add (
988   struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
989   const char *address,
990   enum GNUNET_NetworkType nt,
991   struct GNUNET_TIME_Relative expiration)
992 {
993   struct GNUNET_TRANSPORT_AddressIdentifier *ai;
994
995   ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
996   ai->ch = ch;
997   ai->address = GNUNET_strdup (address);
998   ai->nt = nt;
999   ai->expiration = expiration;
1000   ai->aid = ch->aid_gen++;
1001   GNUNET_CONTAINER_DLL_insert (ch->ai_head, ch->ai_tail, ai);
1002   send_add_address (ai);
1003   return ai;
1004 }
1005
1006
1007 /**
1008  * Notify transport service about an address that this communicator no
1009  * longer provides for this peer.
1010  *
1011  * @param ai address that is no longer provided
1012  */
1013 void
1014 GNUNET_TRANSPORT_communicator_address_remove (
1015   struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1016 {
1017   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1018
1019   send_del_address (ai);
1020   GNUNET_CONTAINER_DLL_remove (ch->ai_head, ch->ai_tail, ai);
1021   GNUNET_free (ai->address);
1022   GNUNET_free (ai);
1023 }
1024
1025
1026 /* ************************* Backchannel *************************** */
1027
1028
1029 /**
1030  * The communicator asks the transport service to route a message via
1031  * a different path to another communicator service at another peer.
1032  * This must only be done for special control traffic (as there is no
1033  * flow control for this API), such as acknowledgements, and generally
1034  * only be done if the communicator is uni-directional (i.e. cannot
1035  * send the message back itself).
1036  *
1037  * @param ch handle of this communicator
1038  * @param pid peer to send the message to
1039  * @param comm name of the communicator to send the message to
1040  * @param header header of the message to transmit and pass via the
1041  *        notify-API to @a pid's communicator @a comm
1042  */
1043 void
1044 GNUNET_TRANSPORT_communicator_notify (
1045   struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1046   const struct GNUNET_PeerIdentity *pid,
1047   const char *comm,
1048   const struct GNUNET_MessageHeader *header)
1049 {
1050   struct GNUNET_MQ_Envelope *env;
1051   struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1052   size_t slen = strlen (comm) + 1;
1053   uint16_t mlen = ntohs (header->size);
1054
1055   GNUNET_assert (mlen + slen + sizeof (*cb) < UINT16_MAX);
1056   env =
1057     GNUNET_MQ_msg_extra (cb,
1058                          slen + mlen,
1059                          GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1060   cb->pid = *pid;
1061   memcpy (&cb[1], header, mlen);
1062   memcpy (((char *) &cb[1]) + mlen, comm, slen);
1063   GNUNET_MQ_send (ch->mq, env);
1064 }
1065
1066
1067 /* end of transport_api2_communication.c */