src: for every AGPL3.0 file, add SPDX identifier.
[oweals/gnunet.git] / src / transport / transport_api2_communication.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file transport/transport_api2_communication.c
23  * @brief implementation of the gnunet_transport_communication_service.h API
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_protocols.h"
29 #include "gnunet_transport_communication_service.h"
30 #include "gnunet_ats_transport_service.h"
31 #include "transport.h"
32
33
34 /**
35  * How many messages do we keep at most in the queue to the
36  * transport service before we start to drop (default,
37  * can be changed via the configuration file).
38  */
39 #define DEFAULT_MAX_QUEUE_LENGTH 16
40
41
42 /**
43  * Information we track per packet to enable flow control.
44  */
45 struct FlowControl
46 {
47   /**
48    * Kept in a DLL.
49    */
50   struct FlowControl *next;
51
52   /**
53    * Kept in a DLL.
54    */
55   struct FlowControl *prev;
56
57   /**
58    * Function to call once the message was processed.
59    */
60   GNUNET_TRANSPORT_MessageCompletedCallback cb;
61
62   /**
63    * Closure for @e cb
64    */
65   void *cb_cls;
66
67   /**
68    * Which peer is this about?
69    */
70   struct GNUNET_PeerIdentity sender;
71
72   /**
73    * More-or-less unique ID for the message.
74    */
75   uint64_t id;
76 };
77
78
79 /**
80  * Information we track per message to tell the transport about
81  * success or failures.
82  */
83 struct AckPending
84 {
85   /**
86    * Kept in a DLL.
87    */
88   struct AckPending *next;
89
90   /**
91    * Kept in a DLL.
92    */
93   struct AckPending *prev;
94
95   /**
96    * Communicator this entry belongs to.
97    */
98   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
99
100   /**
101    * Which peer is this about?
102    */
103   struct GNUNET_PeerIdentity receiver;
104
105   /**
106    * More-or-less unique ID for the message.
107    */
108   uint64_t mid;
109 };
110
111
112 /**
113  * Opaque handle to the transport service for communicators.
114  */
115 struct GNUNET_TRANSPORT_CommunicatorHandle
116 {
117   /**
118    * Head of DLL of addresses this communicator offers to the transport service.
119    */
120   struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
121
122   /**
123    * Tail of DLL of addresses this communicator offers to the transport service.
124    */
125   struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
126
127   /**
128    * DLL of messages awaiting flow control confirmation (ack).
129    */
130   struct FlowControl *fc_head;
131
132   /**
133    * DLL of messages awaiting flow control confirmation (ack).
134    */
135   struct FlowControl *fc_tail;
136
137   /**
138    * DLL of messages awaiting transmission confirmation (ack).
139    */
140   struct AckPending *ap_head;
141
142   /**
143    * DLL of messages awaiting transmission confirmation (ack).
144    */
145   struct AckPending *ap_tail;
146
147   /**
148    * DLL of queues we offer.
149    */
150   struct GNUNET_TRANSPORT_QueueHandle *queue_head;
151
152   /**
153    * DLL of queues we offer.
154    */
155   struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
156
157   /**
158    * Our configuration.
159    */
160   const struct GNUNET_CONFIGURATION_Handle *cfg;
161
162   /**
163    * Config section to use.
164    */
165   const char *config_section;
166
167   /**
168    * Address prefix to use.
169    */
170   const char *addr_prefix;
171
172   /**
173    * Function to call when the transport service wants us to initiate
174    * a communication channel with another peer.
175    */
176   GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
177
178   /**
179    * Closure for @e mq_init.
180    */
181   void *mq_init_cls;
182
183   /**
184    * Function to call when the transport service receives messages
185    * for a communicator (i.e. for NAT traversal or for non-bidirectional
186    * communicators).
187    */
188   GNUNET_TRANSPORT_CommunicatorNotify notify_cb;
189
190   /**
191    * Closure for @e notify_Cb.
192    */
193   void *notify_cb_cls;
194
195   /**
196    * Queue to talk to the transport service.
197    */
198   struct GNUNET_MQ_Handle *mq;
199
200   /**
201    * Maximum permissable queue length.
202    */
203   unsigned long long max_queue_length;
204
205   /**
206    * Flow-control identifier generator.
207    */
208   uint64_t fc_gen;
209
210   /**
211    * Internal UUID for the address used in communication with the
212    * transport service.
213    */
214   uint32_t aid_gen;
215
216   /**
217    * Queue identifier generator.
218    */
219   uint32_t queue_gen;
220
221   /**
222    * Characteristics of the communicator.
223    */
224   enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc;
225
226 };
227
228
229 /**
230  * Handle returned to identify the internal data structure the transport
231  * API has created to manage a message queue to a particular peer.
232  */
233 struct GNUNET_TRANSPORT_QueueHandle
234 {
235
236   /**
237    * Kept in a DLL.
238    */
239   struct GNUNET_TRANSPORT_QueueHandle *next;
240
241   /**
242    * Kept in a DLL.
243    */
244   struct GNUNET_TRANSPORT_QueueHandle *prev;
245
246   /**
247    * Handle this queue belongs to.
248    */
249   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
250
251   /**
252    * Address used by the communication queue.
253    */
254   char *address;
255
256   /**
257    * The queue itself.
258    */
259   struct GNUNET_MQ_Handle *mq;
260
261   /**
262    * Which peer we can communciate with.
263    */
264   struct GNUNET_PeerIdentity peer;
265
266   /**
267    * Network type of the communciation queue.
268    */
269   enum GNUNET_NetworkType nt;
270
271   /**
272    * Communication status of the queue.
273    */
274   enum GNUNET_TRANSPORT_ConnectionStatus cs;
275
276   /**
277    * How many hops is the target away (DV-only)
278    */
279   uint32_t distance;
280
281   /**
282    * ID for this queue when talking to the transport service.
283    */
284   uint32_t queue_id;
285
286   /**
287    * Maximum transmission unit for the queue.
288    */
289   uint32_t mtu;
290
291 };
292
293
294 /**
295  * Internal representation of an address a communicator is
296  * currently providing for the transport service.
297  */
298 struct GNUNET_TRANSPORT_AddressIdentifier
299 {
300
301   /**
302    * Kept in a DLL.
303    */
304   struct GNUNET_TRANSPORT_AddressIdentifier *next;
305
306   /**
307    * Kept in a DLL.
308    */
309   struct GNUNET_TRANSPORT_AddressIdentifier *prev;
310
311   /**
312    * Transport handle where the address was added.
313    */
314   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
315
316   /**
317    * The actual address.
318    */
319   char *address;
320
321   /**
322    * When does the address expire? (Expected lifetime of the
323    * address.)
324    */
325   struct GNUNET_TIME_Relative expiration;
326
327   /**
328    * Internal UUID for the address used in communication with the
329    * transport service.
330    */
331   uint32_t aid;
332
333   /**
334    * Network type for the address.
335    */
336   enum GNUNET_NetworkType nt;
337
338 };
339
340
341 /**
342  * (re)connect our communicator to the transport service
343  *
344  * @param ch handle to reconnect
345  */
346 static void
347 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
348
349
350 /**
351  * Send message to the transport service about address @a ai
352  * being now available.
353  *
354  * @param ai address to add
355  */
356 static void
357 send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
358 {
359   struct GNUNET_MQ_Envelope *env;
360   struct GNUNET_TRANSPORT_AddAddressMessage *aam;
361
362   if (NULL == ai->ch->mq)
363     return;
364   env = GNUNET_MQ_msg_extra (aam,
365                              strlen (ai->address) + 1,
366                              GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
367   aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
368   aam->nt = htonl ((uint32_t) ai->nt);
369   memcpy (&aam[1],
370           ai->address,
371           strlen (ai->address) + 1);
372   GNUNET_MQ_send (ai->ch->mq,
373                   env);
374 }
375
376
377 /**
378  * Send message to the transport service about address @a ai
379  * being no longer available.
380  *
381  * @param ai address to delete
382  */
383 static void
384 send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
385 {
386   struct GNUNET_MQ_Envelope *env;
387   struct GNUNET_TRANSPORT_DelAddressMessage *dam;
388
389   if (NULL == ai->ch->mq)
390     return;
391   env = GNUNET_MQ_msg (dam,
392                        GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
393   dam->aid = htonl (ai->aid);
394   GNUNET_MQ_send (ai->ch->mq,
395                   env);
396 }
397
398
399 /**
400  * Send message to the transport service about queue @a qh
401  * being now available.
402  *
403  * @param qh queue to add
404  */
405 static void
406 send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
407 {
408   struct GNUNET_MQ_Envelope *env;
409   struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
410
411   if (NULL == qh->ch->mq)
412     return;
413   env = GNUNET_MQ_msg_extra (aqm,
414                              strlen (qh->address) + 1,
415                              GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
416   aqm->qid = htonl (qh->queue_id);
417   aqm->receiver = qh->peer;
418   aqm->nt = htonl ((uint32_t) qh->nt);
419   aqm->mtu = htonl (qh->mtu);
420   aqm->cs = htonl ((uint32_t) qh->cs);
421   aqm->distance = htonl (qh->distance);
422   memcpy (&aqm[1],
423           qh->address,
424           strlen (qh->address) + 1);
425   GNUNET_MQ_send (qh->ch->mq,
426                   env);
427 }
428
429
430 /**
431  * Send message to the transport service about queue @a qh
432  * being no longer available.
433  *
434  * @param qh queue to delete
435  */
436 static void
437 send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
438 {
439   struct GNUNET_MQ_Envelope *env;
440   struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
441
442   if (NULL == qh->ch->mq)
443     return;
444   env = GNUNET_MQ_msg (dqm,
445                        GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
446   dqm->qid = htonl (qh->queue_id);
447   dqm->receiver = qh->peer;
448   GNUNET_MQ_send (qh->ch->mq,
449                   env);
450 }
451
452
453 /**
454  * Disconnect from the transport service.  Purges
455  * all flow control entries as we will no longer receive
456  * the ACKs.  Purges the ack pending entries as the
457  * transport will no longer expect the confirmations.
458  *
459  * @param ch service to disconnect from
460  */
461 static void
462 disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
463 {
464   struct FlowControl *fcn;
465   struct AckPending *apn;
466
467   for (struct FlowControl *fc = ch->fc_head;
468        NULL != fc;
469        fc = fcn)
470   {
471     fcn = fc->next;
472     GNUNET_CONTAINER_DLL_remove (ch->fc_head,
473                                  ch->fc_tail,
474                                  fc);
475     fc->cb (fc->cb_cls,
476             GNUNET_SYSERR);
477     GNUNET_free (fc);
478   }
479   for (struct AckPending *ap = ch->ap_head;
480        NULL != ap;
481        ap = apn)
482   {
483     apn = ap->next;
484     GNUNET_CONTAINER_DLL_remove (ch->ap_head,
485                                  ch->ap_tail,
486                                  ap);
487     GNUNET_free (ap);
488   }
489   if (NULL == ch->mq)
490     return;
491   GNUNET_MQ_destroy (ch->mq);
492   ch->mq = NULL;
493 }
494
495
496 /**
497  * Function called on MQ errors.
498  */
499 static void
500 error_handler (void *cls,
501                enum GNUNET_MQ_Error error)
502 {
503   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
504
505   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
506               "MQ failure %d, reconnecting to transport service.\n",
507               error);
508   disconnect (ch);
509   /* TODO: maybe do this with exponential backoff/delay */
510   reconnect (ch);
511 }
512
513
514 /**
515  * Transport service acknowledged a message we gave it
516  * (with flow control enabled). Tell the communicator.
517  *
518  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
519  * @param incoming_ack the ack
520  */
521 static void
522 handle_incoming_ack (void *cls,
523                      const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
524 {
525   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
526
527   for (struct FlowControl *fc = ch->fc_head;
528        NULL != fc;
529        fc = fc->next)
530   {
531     if ( (fc->id == incoming_ack->fc_id) &&
532          (0 == memcmp (&fc->sender,
533                        &incoming_ack->sender,
534                        sizeof (struct GNUNET_PeerIdentity))) )
535     {
536       GNUNET_CONTAINER_DLL_remove (ch->fc_head,
537                                    ch->fc_tail,
538                                    fc);
539       fc->cb (fc->cb_cls,
540               GNUNET_OK);
541       GNUNET_free (fc);
542       return;
543     }
544   }
545   GNUNET_break (0);
546   disconnect (ch);
547   /* TODO: maybe do this with exponential backoff/delay */
548   reconnect (ch);
549 }
550
551
552 /**
553  * Transport service wants us to create a queue. Check if @a cq
554  * is well-formed.
555  *
556  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
557  * @param cq the queue creation request
558  * @return #GNUNET_OK if @a smt is well-formed
559  */
560 static int
561 check_create_queue (void *cls,
562                     const struct GNUNET_TRANSPORT_CreateQueue *cq)
563 {
564   uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
565   const char *addr = (const char *) &cq[1];
566
567   (void) cls;
568   if ( (0 == len) ||
569        ('\0' != addr[len-1]) )
570   {
571     GNUNET_break (0);
572     return GNUNET_SYSERR;
573   }
574   return GNUNET_OK;
575 }
576
577
578 /**
579  * Transport service wants us to create a queue. Tell the communicator.
580  *
581  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
582  * @param cq the queue creation request
583  */
584 static void
585 handle_create_queue (void *cls,
586                      const struct GNUNET_TRANSPORT_CreateQueue *cq)
587 {
588   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
589   const char *addr = (const char *) &cq[1];
590   struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
591   struct GNUNET_MQ_Envelope *env;
592
593   if (GNUNET_OK !=
594       ch->mq_init (ch->mq_init_cls,
595                    &cq->receiver,
596                    addr))
597   {
598     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
599                 "Address `%s' invalid for this communicator\n",
600                 addr);
601     env = GNUNET_MQ_msg (cqr,
602                          GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
603   }
604   else
605   {
606     env = GNUNET_MQ_msg (cqr,
607                          GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
608   }
609   cqr->request_id = cq->request_id;
610   GNUNET_MQ_send (ch->mq,
611                   env);
612 }
613
614
615 /**
616  * Transport service wants us to send a message. Check if @a smt
617  * is well-formed.
618  *
619  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
620  * @param smt the transmission request
621  * @return #GNUNET_OK if @a smt is well-formed
622  */
623 static int
624 check_send_msg (void *cls,
625                 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
626 {
627   uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
628   const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
629
630   (void) cls;
631   if (ntohs (mh->size) != len)
632   {
633     GNUNET_break (0);
634     return GNUNET_SYSERR;
635   }
636   return GNUNET_OK;
637 }
638
639
640 /**
641  * Notify transport service about @a status of a message with
642  * @a mid sent to @a receiver.
643  *
644  * @param ch handle
645  * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
646  * @param receiver which peer was the receiver
647  * @param mid message that the ack is about
648  */
649 static void
650 send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
651           int status,
652           const struct GNUNET_PeerIdentity *receiver,
653           uint64_t mid)
654 {
655   struct GNUNET_MQ_Envelope *env;
656   struct GNUNET_TRANSPORT_SendMessageToAck *ack;
657
658   env = GNUNET_MQ_msg (ack,
659                        GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
660   ack->status = htonl (status);
661   ack->mid = mid;
662   ack->receiver = *receiver;
663   GNUNET_MQ_send (ch->mq,
664                   env);
665 }
666
667
668 /**
669  * Message queue transmission by communicator was successful,
670  * notify transport service.
671  *
672  * @param cls an `struct AckPending *`
673  */
674 static void
675 send_ack_cb (void *cls)
676 {
677   struct AckPending *ap = cls;
678   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
679
680   GNUNET_CONTAINER_DLL_remove (ch->ap_head,
681                                ch->ap_tail,
682                                ap);
683   send_ack (ch,
684             GNUNET_OK,
685             &ap->receiver,
686             ap->mid);
687   GNUNET_free (ap);
688 }
689
690
691 /**
692  * Transport service wants us to send a message. Tell the communicator.
693  *
694  * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
695  * @param smt the transmission request
696  */
697 static void
698 handle_send_msg (void *cls,
699                  const struct GNUNET_TRANSPORT_SendMessageTo *smt)
700 {
701   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
702   const struct GNUNET_MessageHeader *mh;
703   struct GNUNET_MQ_Envelope *env;
704   struct AckPending *ap;
705   struct GNUNET_TRANSPORT_QueueHandle *qh;
706
707   for (qh = ch->queue_head;NULL != qh; qh = qh->next)
708     if ( (qh->queue_id == smt->qid) &&
709          (0 == memcmp (&qh->peer,
710                        &smt->receiver,
711                        sizeof (struct GNUNET_PeerIdentity))) )
712       break;
713   if (NULL == qh)
714   {
715     /* queue is already gone, tell transport this one failed */
716     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
717                 "Transmission failed, queue no longer exists.\n");
718     send_ack (ch,
719               GNUNET_NO,
720               &smt->receiver,
721               smt->mid);
722     return;
723   }
724   ap = GNUNET_new (struct AckPending);
725   ap->ch = ch;
726   ap->receiver = smt->receiver;
727   ap->mid = smt->mid;
728   GNUNET_CONTAINER_DLL_insert (ch->ap_head,
729                                ch->ap_tail,
730                                ap);
731   mh = (const struct GNUNET_MessageHeader *) &smt[1];
732   env = GNUNET_MQ_msg_copy (mh);
733   GNUNET_MQ_notify_sent (env,
734                          &send_ack_cb,
735                          ap);
736   GNUNET_MQ_send (qh->mq,
737                   env);
738 }
739
740
741 /**
742  * (re)connect our communicator to the transport service
743  *
744  * @param ch handle to reconnect
745  */
746 static void
747 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
748 {
749   struct GNUNET_MQ_MessageHandler handlers[] = {
750     GNUNET_MQ_hd_fixed_size (incoming_ack,
751                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
752                              struct GNUNET_TRANSPORT_IncomingMessageAck,
753                              ch),
754     GNUNET_MQ_hd_var_size (create_queue,
755                            GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
756                            struct GNUNET_TRANSPORT_CreateQueue,
757                            ch),
758     GNUNET_MQ_hd_var_size (send_msg,
759                            GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
760                            struct GNUNET_TRANSPORT_SendMessageTo,
761                            ch),
762     // FIXME: handle backchannel notifications!
763     GNUNET_MQ_handler_end()
764   };
765   struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
766   struct GNUNET_MQ_Envelope *env;
767
768   ch->mq = GNUNET_CLIENT_connect (ch->cfg,
769                                   "transport",
770                                   handlers,
771                                   &error_handler,
772                                   ch);
773   if (NULL == ch->mq)
774     return;
775   env = GNUNET_MQ_msg_extra (cam,
776                              strlen (ch->addr_prefix) + 1,
777                              GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
778   cam->cc = htonl ((uint32_t) ch->cc);
779   memcpy (&cam[1],
780           ch->addr_prefix,
781           strlen (ch->addr_prefix) + 1);
782   GNUNET_MQ_send (ch->mq,
783                   env);
784   for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
785        NULL != ai;
786        ai = ai->next)
787     send_add_address (ai);
788   for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
789        NULL != qh;
790        qh = qh->next)
791     send_add_queue (qh);
792 }
793
794
795 /**
796  * Connect to the transport service.
797  *
798  * @param cfg configuration to use
799  * @param config_section section of the configuration to use for options
800  * @param addr_prefix address prefix for addresses supported by this
801  *        communicator, could be NULL for incoming-only communicators
802  * @param cc what characteristics does the communicator have?
803  * @param mtu maximum message size supported by communicator, 0 if
804  *            sending is not supported, SIZE_MAX for no MTU
805  * @param mq_init function to call to initialize a message queue given
806  *                the address of another peer, can be NULL if the
807  *                communicator only supports receiving messages
808  * @param mq_init_cls closure for @a mq_init
809  * @param notify_cb function to pass backchannel messages to communicator
810  * @param notify_cb_cls closure for @a notify_cb
811  * @return NULL on error
812  */
813 struct GNUNET_TRANSPORT_CommunicatorHandle *
814 GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
815                                        const char *config_section,
816                                        const char *addr_prefix,
817                                        enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc,
818                                        GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
819                                        void *mq_init_cls,
820                                        GNUNET_TRANSPORT_CommunicatorNotify notify_cb,
821                                        void *notify_cb_cls)
822 {
823   struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
824
825   ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
826   ch->cfg = cfg;
827   ch->config_section = config_section;
828   ch->addr_prefix = addr_prefix;
829   ch->mq_init = mq_init;
830   ch->mq_init_cls = mq_init_cls;
831   ch->notify_cb = notify_cb;
832   ch->notify_cb_cls = notify_cb_cls;
833   ch->cc = cc;
834   reconnect (ch);
835   if (GNUNET_OK !=
836       GNUNET_CONFIGURATION_get_value_number (cfg,
837                                              config_section,
838                                              "MAX_QUEUE_LENGTH",
839                                              &ch->max_queue_length))
840     ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
841   if (NULL == ch->mq)
842   {
843     GNUNET_free (ch);
844     return NULL;
845   }
846   return ch;
847 }
848
849
850 /**
851  * Disconnect from the transport service.
852  *
853  * @param ch handle returned from connect
854  */
855 void
856 GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
857 {
858   disconnect (ch);
859   while (NULL != ch->ai_head)
860   {
861     GNUNET_break (0); /* communicator forgot to remove address, warn! */
862     GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
863   }
864   GNUNET_free (ch);
865 }
866
867
868 /* ************************* Receiving *************************** */
869
870
871 /**
872  * Notify transport service that the communicator has received
873  * a message.
874  *
875  * @param ch connection to transport service
876  * @param sender presumed sender of the message (details to be checked
877  *        by higher layers)
878  * @param msg the message
879  * @param cb function to call once handling the message is done, NULL if
880  *         flow control is not supported by this communicator
881  * @param cb_cls closure for @a cb
882  * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
883  *         immediately dropped due to memory limitations (communicator
884  *         should try to apply back pressure),
885  *         #GNUNET_SYSERR if the message could not be delivered because
886  *         the tranport service is not yet up
887  */
888 int
889 GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
890                                        const struct GNUNET_PeerIdentity *sender,
891                                        const struct GNUNET_MessageHeader *msg,
892                                        GNUNET_TRANSPORT_MessageCompletedCallback cb,
893                                        void *cb_cls)
894 {
895   struct GNUNET_MQ_Envelope *env;
896   struct GNUNET_TRANSPORT_IncomingMessage *im;
897   uint16_t msize;
898
899   if (NULL == ch->mq)
900     return GNUNET_SYSERR;
901   if ( (NULL == cb) &&
902        (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
903   {
904     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
905                 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
906                 ch->max_queue_length);
907     return GNUNET_NO;
908   }
909
910   msize = ntohs (msg->size);
911   env = GNUNET_MQ_msg_extra (im,
912                              msize,
913                              GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
914   if (NULL == env)
915   {
916     GNUNET_break (0);
917     return GNUNET_SYSERR;
918   }
919   im->sender = *sender;
920   memcpy (&im[1],
921           msg,
922           msize);
923   if (NULL != cb)
924   {
925     struct FlowControl *fc;
926
927     im->fc_on = htonl (GNUNET_YES);
928     im->fc_id = ch->fc_gen++;
929     fc = GNUNET_new (struct FlowControl);
930     fc->sender = *sender;
931     fc->id = im->fc_id;
932     fc->cb = cb;
933     fc->cb_cls = cb_cls;
934     GNUNET_CONTAINER_DLL_insert (ch->fc_head,
935                                  ch->fc_tail,
936                                  fc);
937   }
938   GNUNET_MQ_send (ch->mq,
939                   env);
940   return GNUNET_OK;
941 }
942
943
944 /* ************************* Discovery *************************** */
945
946
947 /**
948  * Notify transport service that an MQ became available due to an
949  * "inbound" connection or because the communicator discovered the
950  * presence of another peer.
951  *
952  * @param ch connection to transport service
953  * @param peer peer with which we can now communicate
954  * @param address address in human-readable format, 0-terminated, UTF-8
955  * @param mtu maximum message size supported by queue, 0 if
956  *            sending is not supported, SIZE_MAX for no MTU
957  * @param nt which network type does the @a address belong to?
958  * @param cc what characteristics does the communicator have?
959  * @param distance how many hops does this queue use (DV-only)?
960  * @param cs what is the connection status of the queue?
961  * @param mq message queue of the @a peer
962  * @return API handle identifying the new MQ
963  */
964 struct GNUNET_TRANSPORT_QueueHandle *
965 GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
966                                       const struct GNUNET_PeerIdentity *peer,
967                                       const char *address,
968                                       uint32_t mtu,
969                                       enum GNUNET_NetworkType nt,
970                                       uint32_t distance,
971                                       enum GNUNET_TRANSPORT_ConnectionStatus cs,
972                                       struct GNUNET_MQ_Handle *mq)
973 {
974   struct GNUNET_TRANSPORT_QueueHandle *qh;
975
976   qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
977   qh->ch = ch;
978   qh->peer = *peer;
979   qh->address = GNUNET_strdup (address);
980   qh->nt = nt;
981   qh->mtu = mtu;
982   qh->distance = distance;
983   qh->cs = cs;
984   qh->mq = mq;
985   qh->queue_id = ch->queue_gen++;
986   GNUNET_CONTAINER_DLL_insert (ch->queue_head,
987                                ch->queue_tail,
988                                qh);
989   send_add_queue (qh);
990   return qh;
991 }
992
993
994 /**
995  * Notify transport service that an MQ became unavailable due to a
996  * disconnect or timeout.
997  *
998  * @param qh handle for the queue that must be invalidated
999  */
1000 void
1001 GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
1002 {
1003   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
1004
1005   send_del_queue (qh);
1006   GNUNET_CONTAINER_DLL_remove (ch->queue_head,
1007                                ch->queue_tail,
1008                                qh);
1009   GNUNET_MQ_destroy (qh->mq);
1010   GNUNET_free (qh->address);
1011   GNUNET_free (qh);
1012 }
1013
1014
1015 /**
1016  * Notify transport service about an address that this communicator
1017  * provides for this peer.
1018  *
1019  * @param ch connection to transport service
1020  * @param address our address in human-readable format, 0-terminated, UTF-8
1021  * @param nt which network type does the address belong to?
1022  * @param expiration when does the communicator forsee this address expiring?
1023  */
1024 struct GNUNET_TRANSPORT_AddressIdentifier *
1025 GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1026                                            const char *address,
1027                                            enum GNUNET_NetworkType nt,
1028                                            struct GNUNET_TIME_Relative expiration)
1029 {
1030   struct GNUNET_TRANSPORT_AddressIdentifier *ai;
1031
1032   ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
1033   ai->ch = ch;
1034   ai->address = GNUNET_strdup (address);
1035   ai->nt = nt;
1036   ai->expiration = expiration;
1037   ai->aid = ch->aid_gen++;
1038   GNUNET_CONTAINER_DLL_insert (ch->ai_head,
1039                                ch->ai_tail,
1040                                ai);
1041   send_add_address (ai);
1042   return ai;
1043 }
1044
1045
1046 /**
1047  * Notify transport service about an address that this communicator no
1048  * longer provides for this peer.
1049  *
1050  * @param ai address that is no longer provided
1051  */
1052 void
1053 GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1054 {
1055   struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1056
1057   send_del_address (ai);
1058   GNUNET_CONTAINER_DLL_remove (ch->ai_head,
1059                                ch->ai_tail,
1060                                ai);
1061   GNUNET_free (ai->address);
1062   GNUNET_free (ai);
1063 }
1064
1065
1066 /* ************************* Backchannel *************************** */
1067
1068
1069 /**
1070  * The communicator asks the transport service to route a message via
1071  * a different path to another communicator service at another peer.
1072  * This must only be done for special control traffic (as there is no
1073  * flow control for this API), such as acknowledgements, and generally
1074  * only be done if the communicator is uni-directional (i.e. cannot
1075  * send the message back itself).
1076  *
1077  * @param ch handle of this communicator
1078  * @param pid peer to send the message to
1079  * @param comm name of the communicator to send the message to
1080  * @param header header of the message to transmit and pass via the
1081  *        notify-API to @a pid's communicator @a comm
1082  */
1083 void
1084 GNUNET_TRANSPORT_communicator_notify (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
1085                                       const struct GNUNET_PeerIdentity *pid,
1086                                       const char *comm,
1087                                       const struct GNUNET_MessageHeader *header)
1088 {
1089   struct GNUNET_MQ_Envelope *env;
1090   struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb;
1091   size_t slen = strlen (comm) + 1;
1092   uint16_t mlen = ntohs (header->size);
1093
1094   GNUNET_assert (mlen + slen + sizeof (*cb) < UINT16_MAX);
1095   env = GNUNET_MQ_msg_extra (cb,
1096                              slen + mlen,
1097                              GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL);
1098   cb->pid = *pid;
1099   memcpy (&cb[1],
1100           header,
1101           mlen);
1102   memcpy (((char *)&cb[1]) + mlen,
1103           comm,
1104           slen);
1105   GNUNET_MQ_send (ch->mq,
1106                   env);
1107 }
1108
1109
1110 /* end of transport_api2_communication.c */