2 This file is part of GNUnet.
3 Copyright (C) 2018 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file transport/transport_api2_communication.c
21 * @brief implementation of the gnunet_transport_communication_service.h API
22 * @author Christian Grothoff
25 #include "gnunet_util_lib.h"
26 #include "gnunet_protocols.h"
27 #include "gnunet_transport_communication_service.h"
28 #include "transport.h"
32 * How many messages do we keep at most in the queue to the
33 * transport service before we start to drop (default,
34 * can be changed via the configuration file).
36 #define DEFAULT_MAX_QUEUE_LENGTH 16
40 * Information we track per packet to enable flow control.
47 struct FlowControl *next;
52 struct FlowControl *prev;
55 * Function to call once the message was processed.
57 GNUNET_TRANSPORT_MessageCompletedCallback cb;
65 * Which peer is this about?
67 struct GNUNET_PeerIdentity sender;
70 * More-or-less unique ID for the message.
77 * Information we track per message to tell the transport about
78 * success or failures.
85 struct AckPending *next;
90 struct AckPending *prev;
93 * Communicator this entry belongs to.
95 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
98 * Which peer is this about?
100 struct GNUNET_PeerIdentity receiver;
103 * More-or-less unique ID for the message.
110 * Opaque handle to the transport service for communicators.
112 struct GNUNET_TRANSPORT_CommunicatorHandle
115 * Head of DLL of addresses this communicator offers to the transport service.
117 struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
120 * Tail of DLL of addresses this communicator offers to the transport service.
122 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
125 * DLL of messages awaiting flow control confirmation (ack).
127 struct FlowControl *fc_head;
130 * DLL of messages awaiting flow control confirmation (ack).
132 struct FlowControl *fc_tail;
135 * DLL of messages awaiting transmission confirmation (ack).
137 struct AckPending *ap_head;
140 * DLL of messages awaiting transmission confirmation (ack).
142 struct AckPending *ap_tail;
145 * DLL of queues we offer.
147 struct GNUNET_TRANSPORT_QueueHandle *queue_head;
150 * DLL of queues we offer.
152 struct GNUNET_TRANSPORT_QueueHandle *queue_tail;
157 const struct GNUNET_CONFIGURATION_Handle *cfg;
160 * Config section to use.
162 const char *config_section;
165 * Address prefix to use.
167 const char *addr_prefix;
170 * Function to call when the transport service wants us to initiate
171 * a communication channel with another peer.
173 GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
176 * Closure for @e mq_init.
181 * Queue to talk to the transport service.
183 struct GNUNET_MQ_Handle *mq;
186 * Maximum permissable queue length.
188 unsigned long long max_queue_length;
191 * Flow-control identifier generator.
196 * MTU of the communicator
201 * Internal UUID for the address used in communication with the
207 * Queue identifier generator.
215 * Handle returned to identify the internal data structure the transport
216 * API has created to manage a message queue to a particular peer.
218 struct GNUNET_TRANSPORT_QueueHandle
224 struct GNUNET_TRANSPORT_QueueHandle *next;
229 struct GNUNET_TRANSPORT_QueueHandle *prev;
232 * Handle this queue belongs to.
234 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
237 * Which peer we can communciate with.
239 struct GNUNET_PeerIdentity peer;
242 * Address used by the communication queue.
247 * Network type of the communciation queue.
249 enum GNUNET_ATS_Network_Type nt;
254 struct GNUNET_MQ_Handle *mq;
257 * ID for this queue when talking to the transport service.
265 * Internal representation of an address a communicator is
266 * currently providing for the transport service.
268 struct GNUNET_TRANSPORT_AddressIdentifier
274 struct GNUNET_TRANSPORT_AddressIdentifier *next;
279 struct GNUNET_TRANSPORT_AddressIdentifier *prev;
282 * Transport handle where the address was added.
284 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
287 * The actual address.
292 * When does the address expire? (Expected lifetime of the
295 struct GNUNET_TIME_Relative expiration;
298 * Internal UUID for the address used in communication with the
304 * Network type for the address.
306 enum GNUNET_ATS_Network_Type nt;
312 * (re)connect our communicator to the transport service
314 * @param ch handle to reconnect
317 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
321 * Send message to the transport service about address @a ai
322 * being now available.
324 * @param ai address to add
327 send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
329 struct GNUNET_MQ_Envelope *env;
330 struct GNUNET_TRANSPORT_AddAddressMessage *aam;
332 if (NULL == ai->ch->mq)
334 env = GNUNET_MQ_msg_extra (aam,
335 strlen (ai->address) + 1,
336 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
337 aam->expiration = GNUNET_TIME_relative_hton (ai->expiration);
338 aam->nt = htonl ((uint32_t) ai->nt);
341 strlen (ai->address) + 1);
342 GNUNET_MQ_send (ai->ch->mq,
348 * Send message to the transport service about address @a ai
349 * being no longer available.
351 * @param ai address to delete
354 send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
356 struct GNUNET_MQ_Envelope *env;
357 struct GNUNET_TRANSPORT_DelAddressMessage *dam;
359 if (NULL == ai->ch->mq)
361 env = GNUNET_MQ_msg (dam,
362 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
363 dam->aid = htonl (ai->aid);
364 GNUNET_MQ_send (ai->ch->mq,
370 * Send message to the transport service about queue @a qh
371 * being now available.
373 * @param qh queue to add
376 send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
378 struct GNUNET_MQ_Envelope *env;
379 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
381 if (NULL == qh->ch->mq)
383 env = GNUNET_MQ_msg_extra (aqm,
384 strlen (qh->address) + 1,
385 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
386 aqm->receiver = qh->peer;
387 aqm->nt = htonl ((uint32_t) qh->nt);
388 aqm->qid = htonl (qh->queue_id);
391 strlen (qh->address) + 1);
392 GNUNET_MQ_send (qh->ch->mq,
398 * Send message to the transport service about queue @a qh
399 * being no longer available.
401 * @param qh queue to delete
404 send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
406 struct GNUNET_MQ_Envelope *env;
407 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
409 if (NULL == qh->ch->mq)
411 env = GNUNET_MQ_msg (dqm,
412 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN);
413 dqm->qid = htonl (qh->queue_id);
414 dqm->receiver = qh->peer;
415 GNUNET_MQ_send (qh->ch->mq,
421 * Disconnect from the transport service. Purges
422 * all flow control entries as we will no longer receive
423 * the ACKs. Purges the ack pending entries as the
424 * transport will no longer expect the confirmations.
426 * @param ch service to disconnect from
429 disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
431 struct FlowControl *fcn;
432 struct AckPending *apn;
434 for (struct FlowControl *fc = ch->fc_head;
439 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
446 for (struct AckPending *ap = ch->ap_head;
451 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
458 GNUNET_MQ_destroy (ch->mq);
464 * Function called on MQ errors.
467 error_handler (void *cls,
468 enum GNUNET_MQ_Error error)
470 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
472 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
473 "MQ failure %d, reconnecting to transport service.\n",
476 /* TODO: maybe do this with exponential backoff/delay */
482 * Transport service acknowledged a message we gave it
483 * (with flow control enabled). Tell the communicator.
485 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
486 * @param incoming_ack the ack
489 handle_incoming_ack (void *cls,
490 const struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
492 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
494 for (struct FlowControl *fc = ch->fc_head;
498 if ( (fc->id == incoming_ack->fc_id) &&
499 (0 == memcmp (&fc->sender,
500 &incoming_ack->sender,
501 sizeof (struct GNUNET_PeerIdentity))) )
503 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
514 /* TODO: maybe do this with exponential backoff/delay */
520 * Transport service wants us to create a queue. Check if @a cq
523 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
524 * @param cq the queue creation request
525 * @return #GNUNET_OK if @a smt is well-formed
528 check_create_queue (void *cls,
529 const struct GNUNET_TRANSPORT_CreateQueue *cq)
531 uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
532 const char *addr = (const char *) &cq[1];
536 ('\0' != addr[len-1]) )
539 return GNUNET_SYSERR;
546 * Transport service wants us to create a queue. Tell the communicator.
548 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
549 * @param cq the queue creation request
552 handle_create_queue (void *cls,
553 const struct GNUNET_TRANSPORT_CreateQueue *cq)
555 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
556 const char *addr = (const char *) &cq[1];
557 struct GNUNET_TRANSPORT_CreateQueueResponse *cqr;
558 struct GNUNET_MQ_Envelope *env;
561 ch->mq_init (ch->mq_init_cls,
565 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
566 "Address `%s' invalid for this communicator\n",
568 env = GNUNET_MQ_msg (cqr,
569 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL);
573 env = GNUNET_MQ_msg (cqr,
574 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK);
576 cqr->request_id = cq->request_id;
577 GNUNET_MQ_send (ch->mq,
583 * Transport service wants us to send a message. Check if @a smt
586 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
587 * @param smt the transmission request
588 * @return #GNUNET_OK if @a smt is well-formed
591 check_send_msg (void *cls,
592 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
594 uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
595 const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
598 if (ntohs (mh->size) != len)
601 return GNUNET_SYSERR;
608 * Notify transport service about @a status of a message with
609 * @a mid sent to @a receiver.
612 * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
613 * @param receiver which peer was the receiver
614 * @param mid message that the ack is about
617 send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
619 const struct GNUNET_PeerIdentity *receiver,
622 struct GNUNET_MQ_Envelope *env;
623 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
625 env = GNUNET_MQ_msg (ack,
626 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
627 ack->status = htonl (status);
629 ack->receiver = *receiver;
630 GNUNET_MQ_send (ch->mq,
636 * Message queue transmission by communicator was successful,
637 * notify transport service.
639 * @param cls an `struct AckPending *`
642 send_ack_cb (void *cls)
644 struct AckPending *ap = cls;
645 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
647 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
659 * Transport service wants us to send a message. Tell the communicator.
661 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
662 * @param smt the transmission request
665 handle_send_msg (void *cls,
666 const struct GNUNET_TRANSPORT_SendMessageTo *smt)
668 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
669 const struct GNUNET_MessageHeader *mh;
670 struct GNUNET_MQ_Envelope *env;
671 struct AckPending *ap;
672 struct GNUNET_TRANSPORT_QueueHandle *qh;
674 for (qh = ch->queue_head;NULL != qh; qh = qh->next)
675 if ( (qh->queue_id == smt->qid) &&
676 (0 == memcmp (&qh->peer,
678 sizeof (struct GNUNET_PeerIdentity))) )
682 /* queue is already gone, tell transport this one failed */
683 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
684 "Transmission failed, queue no longer exists.\n");
691 ap = GNUNET_new (struct AckPending);
693 ap->receiver = smt->receiver;
695 GNUNET_CONTAINER_DLL_insert (ch->ap_head,
698 mh = (const struct GNUNET_MessageHeader *) &smt[1];
699 env = GNUNET_MQ_msg_copy (mh);
700 GNUNET_MQ_notify_sent (env,
703 GNUNET_MQ_send (qh->mq,
709 * (re)connect our communicator to the transport service
711 * @param ch handle to reconnect
714 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
716 struct GNUNET_MQ_MessageHandler handlers[] = {
717 GNUNET_MQ_hd_fixed_size (incoming_ack,
718 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
719 struct GNUNET_TRANSPORT_IncomingMessageAck,
721 GNUNET_MQ_hd_var_size (create_queue,
722 GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE,
723 struct GNUNET_TRANSPORT_CreateQueue,
725 GNUNET_MQ_hd_var_size (send_msg,
726 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
727 struct GNUNET_TRANSPORT_SendMessageTo,
729 GNUNET_MQ_handler_end()
731 struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam;
732 struct GNUNET_MQ_Envelope *env;
734 ch->mq = GNUNET_CLIENT_connect (ch->cfg,
741 env = GNUNET_MQ_msg_extra (cam,
742 strlen (ch->addr_prefix) + 1,
743 GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR);
746 strlen (ch->addr_prefix) + 1);
747 GNUNET_MQ_send (ch->mq,
749 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
752 send_add_address (ai);
753 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
761 * Connect to the transport service.
763 * @param cfg configuration to use
764 * @param config_section section of the configuration to use for options
765 * @param addr_prefix address prefix for addresses supported by this
766 * communicator, could be NULL for incoming-only communicators
767 * @param mtu maximum message size supported by communicator, 0 if
768 * sending is not supported, SIZE_MAX for no MTU
769 * @param mq_init function to call to initialize a message queue given
770 * the address of another peer, can be NULL if the
771 * communicator only supports receiving messages
772 * @param mq_init_cls closure for @a mq_init
773 * @return NULL on error
775 struct GNUNET_TRANSPORT_CommunicatorHandle *
776 GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
777 const char *config_section,
778 const char *addr_prefix,
780 GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
783 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
785 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
787 ch->config_section = config_section;
788 ch->addr_prefix = addr_prefix;
790 ch->mq_init = mq_init;
791 ch->mq_init_cls = mq_init_cls;
794 GNUNET_CONFIGURATION_get_value_number (cfg,
797 &ch->max_queue_length))
798 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
809 * Disconnect from the transport service.
811 * @param ch handle returned from connect
814 GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
817 while (NULL != ch->ai_head)
819 GNUNET_break (0); /* communicator forgot to remove address, warn! */
820 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
826 /* ************************* Receiving *************************** */
830 * Notify transport service that the communicator has received
833 * @param ch connection to transport service
834 * @param sender presumed sender of the message (details to be checked
836 * @param msg the message
837 * @param cb function to call once handling the message is done, NULL if
838 * flow control is not supported by this communicator
839 * @param cb_cls closure for @a cb
840 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
841 * immediately dropped due to memory limitations (communicator
842 * should try to apply back pressure),
843 * #GNUNET_SYSERR if the message could not be delivered because
844 * the tranport service is not yet up
847 GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
848 const struct GNUNET_PeerIdentity *sender,
849 const struct GNUNET_MessageHeader *msg,
850 GNUNET_TRANSPORT_MessageCompletedCallback cb,
853 struct GNUNET_MQ_Envelope *env;
854 struct GNUNET_TRANSPORT_IncomingMessage *im;
858 return GNUNET_SYSERR;
860 (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length) )
862 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
863 "Dropping message: transprot is too slow, queue length %llu exceeded\n",
864 ch->max_queue_length);
868 msize = ntohs (msg->size);
869 env = GNUNET_MQ_msg_extra (im,
871 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
875 return GNUNET_SYSERR;
877 im->sender = *sender;
883 struct FlowControl *fc;
885 im->fc_on = htonl (GNUNET_YES);
886 im->fc_id = ch->fc_gen++;
887 fc = GNUNET_new (struct FlowControl);
888 fc->sender = *sender;
892 GNUNET_CONTAINER_DLL_insert (ch->fc_head,
896 GNUNET_MQ_send (ch->mq,
902 /* ************************* Discovery *************************** */
906 * Notify transport service that an MQ became available due to an
907 * "inbound" connection or because the communicator discovered the
908 * presence of another peer.
910 * @param ch connection to transport service
911 * @param peer peer with which we can now communicate
912 * @param address address in human-readable format, 0-terminated, UTF-8
913 * @param nt which network type does the @a address belong to?
914 * @param mq message queue of the @a peer
915 * @return API handle identifying the new MQ
917 struct GNUNET_TRANSPORT_QueueHandle *
918 GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
919 const struct GNUNET_PeerIdentity *peer,
921 enum GNUNET_ATS_Network_Type nt,
922 struct GNUNET_MQ_Handle *mq)
924 struct GNUNET_TRANSPORT_QueueHandle *qh;
926 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
929 qh->address = GNUNET_strdup (address);
932 qh->queue_id = ch->queue_gen++;
933 GNUNET_CONTAINER_DLL_insert (ch->queue_head,
942 * Notify transport service that an MQ became unavailable due to a
943 * disconnect or timeout.
945 * @param qh handle for the queue that must be invalidated
948 GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
950 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
953 GNUNET_CONTAINER_DLL_remove (ch->queue_head,
956 GNUNET_MQ_destroy (qh->mq);
957 GNUNET_free (qh->address);
963 * Notify transport service about an address that this communicator
964 * provides for this peer.
966 * @param ch connection to transport service
967 * @param address our address in human-readable format, 0-terminated, UTF-8
968 * @param nt which network type does the address belong to?
969 * @param expiration when does the communicator forsee this address expiring?
971 struct GNUNET_TRANSPORT_AddressIdentifier *
972 GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
974 enum GNUNET_ATS_Network_Type nt,
975 struct GNUNET_TIME_Relative expiration)
977 struct GNUNET_TRANSPORT_AddressIdentifier *ai;
979 ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
981 ai->address = GNUNET_strdup (address);
983 ai->expiration = expiration;
984 ai->aid = ch->aid_gen++;
985 GNUNET_CONTAINER_DLL_insert (ch->ai_head,
988 send_add_address (ai);
994 * Notify transport service about an address that this communicator no
995 * longer provides for this peer.
997 * @param ai address that is no longer provided
1000 GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
1002 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
1004 send_del_address (ai);
1005 GNUNET_CONTAINER_DLL_remove (ch->ai_head,
1008 GNUNET_free (ai->address);
1013 /* end of transport_api2_communication.c */