2 This file is part of GNUnet.
3 Copyright (C) 2018 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file transport/transport_api2_communication.c
21 * @brief implementation of the gnunet_transport_communication_service.h API
22 * @author Christian Grothoff
25 #include "gnunet_util_lib.h"
26 #include "gnunet_protocols.h"
27 #include "gnunet_transport_communication_service.h"
28 #include "transport.h"
32 * How many messages do we keep at most in the queue to the
33 * transport service before we start to drop (default,
34 * can be changed via the configuration file).
36 #define DEFAULT_MAX_QUEUE_LENGTH 16
40 * Information we track per packet to enable flow control.
47 struct FlowControl *next;
52 struct FlowControl *prev;
55 * Function to call once the message was processed.
57 GNUNET_TRANSPORT_MessageCompletedCallback cb;
65 * Which peer is this about?
67 struct GNUNET_PeerIdentity sender;
70 * More-or-less unique ID for the message.
77 * Information we track per message to tell the transport about
78 * success or failures.
85 struct AckPending *next;
90 struct AckPending *prev;
93 * Which peer is this about?
95 struct GNUNET_PeerIdentity receiver;
98 * More-or-less unique ID for the message.
105 * Opaque handle to the transport service for communicators.
107 struct GNUNET_TRANSPORT_CommunicatorHandle
110 * Head of DLL of addresses this communicator offers to the transport service.
112 struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
115 * Tail of DLL of addresses this communicator offers to the transport service.
117 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
120 * DLL of messages awaiting flow control confirmation (ack).
122 struct FlowControl *fc_head;
125 * DLL of messages awaiting flow control confirmation (ack).
127 struct FlowControl *fc_tail;
130 * DLL of messages awaiting transmission confirmation (ack).
132 struct AckPending *ap_head;
135 * DLL of messages awaiting transmission confirmation (ack).
137 struct AckPending *ac_tail;
140 * DLL of queues we offer.
142 struct QueueHandle *queue_head;
145 * DLL of queues we offer.
147 struct QueueHandle *queue_tail;
152 const struct GNUNET_CONFIGURATION_Handle *cfg;
155 * Name of the communicator.
160 * Function to call when the transport service wants us to initiate
161 * a communication channel with another peer.
163 GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
166 * Closure for @e mq_init.
171 * Maximum permissable queue length.
173 unsigned long long max_queue_length;
176 * Flow-control identifier generator.
181 * MTU of the communicator
186 * Internal UUID for the address used in communication with the
192 * Queue identifier generator.
200 * Handle returned to identify the internal data structure the transport
201 * API has created to manage a message queue to a particular peer.
203 struct GNUNET_TRANSPORT_QueueHandle
206 * Handle this queue belongs to.
208 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
211 * Which peer we can communciate with.
213 struct GNUNET_PeerIdentity peer;
216 * Address used by the communication queue.
221 * Network type of the communciation queue.
223 enum GNUNET_ATS_Network_Type nt;
228 struct GNUNET_MQ_Handle *mq;
231 * ID for this queue when talking to the transport service.
239 * Internal representation of an address a communicator is
240 * currently providing for the transport service.
242 struct GNUNET_TRANSPORT_AddressIdentifier
248 struct GNUNET_TRANSPORT_AddressIdentifier *next;
253 struct GNUNET_TRANSPORT_AddressIdentifier *prev;
256 * Transport handle where the address was added.
258 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
261 * The actual address.
266 * When does the address expire? (Expected lifetime of the
269 struct GNUNET_TIME_Relative expiration;
272 * Internal UUID for the address used in communication with the
278 * Network type for the address.
280 enum GNUNET_ATS_Network_Type nt;
286 * (re)connect our communicator to the transport service
288 * @param ch handle to reconnect
291 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
295 * Send message to the transport service about address @a ai
296 * being now available.
298 * @param ai address to add
301 send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
303 struct GNUNET_MQ_Envelope *env;
304 struct GNUNET_TRANSPORT_AddAddressMessage *aam;
306 if (NULL == ai->ch->mq)
308 env = GNUNET_MQ_msg_extra (aam,
309 strlen (ai->address) + 1,
310 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
311 aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration);
312 aam->nt = htonl ((uint32_t) ai->nt);
315 strlen (ai->address) + 1);
316 GNUNET_MQ_send (ai->ch->mq,
322 * Send message to the transport service about address @a ai
323 * being no longer available.
325 * @param ai address to delete
328 send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
330 struct GNUNET_MQ_Envelope *env;
331 struct GNUNET_TRANSPORT_DelAddressMessage *dam;
333 if (NULL == ai->ch->mq)
335 env = GNUNET_MQ_msg (dam,
336 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
337 dam.aid = htonl (ai->aid);
338 GNUNET_MQ_send (ai->ch->mq,
344 * Send message to the transport service about queue @a qh
345 * being now available.
347 * @param qh queue to add
350 send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
352 struct GNUNET_MQ_Envelope *env;
353 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
355 if (NULL == ai->ch->mq)
357 env = GNUNET_MQ_msg_extra (aqm,
358 strlen (ai->address) + 1,
359 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE);
360 aqm.receiver = qh->peer;
361 aqm.nt = htonl ((uint32_t) qh->nt);
362 aqm.qid = htonl (qh->qid);
365 strlen (ai->address) + 1);
366 GNUNET_MQ_send (ai->ch->mq,
372 * Send message to the transport service about queue @a qh
373 * being no longer available.
375 * @param qh queue to delete
378 send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
380 struct GNUNET_MQ_Envelope *env;
381 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
383 if (NULL == ai->ch->mq)
385 env = GNUNET_MQ_msg (dqm,
386 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE);
387 dqm.qid = htonl (qh->qid);
388 dqm.receiver = qh->peer;
389 GNUNET_MQ_send (ai->ch->mq,
395 * Disconnect from the transport service. Purges
396 * all flow control entries as we will no longer receive
397 * the ACKs. Purges the ack pending entries as the
398 * transport will no longer expect the confirmations.
400 * @param ch service to disconnect from
403 disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
405 struct FlowControl *fcn;
406 struct AckPending *apn;
408 for (struct FlowControl *fc = ch->fc_head;
413 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
420 for (struct AckPending *ap = ch->ap_head;
425 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
432 GNUNET_MQ_destroy (ch->mq);
438 * Function called on MQ errors.
441 error_handler (void *cls,
442 enum GNUNET_MQ_Error error)
444 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
446 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
447 "MQ failure, reconnecting to transport service.\n");
449 /* TODO: maybe do this with exponential backoff/delay */
455 * Transport service acknowledged a message we gave it
456 * (with flow control enabled). Tell the communicator.
458 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
459 * @param incoming_ack the ack
462 handle_incoming_ack (void *cls,
463 struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
465 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
467 for (struct FlowControl *fc = ch->fc_head;
471 if ( (fc->id == incoming_ack->fc_id) &&
472 (0 == memcmp (&fc->sender,
473 incoming_ack->sender,
474 sizeof (struct GNUNET_PeerIdentity))) )
476 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
487 /* TODO: maybe do this with exponential backoff/delay */
493 * Transport service wants us to create a queue. Check if @a cq
496 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
497 * @param cq the queue creation request
498 * @return #GNUNET_OK if @a smt is well-formed
501 check_create_queue (void *cls,
502 struct GNUNET_TRANSPORT_CreateQueue *cq)
504 uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
505 const char *addr = (const char *) &cq[1];
508 ('\0' != addr[len-1]) )
511 return GNUNET_SYSERR;
518 * Transport service wants us to create a queue. Tell the communicator.
520 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
521 * @param cq the queue creation request
524 handle_create_queue (void *cls,
525 struct GNUNET_TRANSPORT_CreateQueue *cq)
527 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
528 const char *addr = (const char *) &cq[1];
531 ch->mq_init (ch->mq_init_cls,
535 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
536 "Address `%s' invalid for this communicator\n",
538 // TODO: do we notify the transport!?
544 * Transport service wants us to send a message. Check if @a smt
547 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
548 * @param smt the transmission request
549 * @return #GNUNET_OK if @a smt is well-formed
552 check_send_msg (void *cls,
553 struct GNUNET_TRANSPORT_SendMessageTo *smt)
555 uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
556 const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
558 if (ntohs (mh->size) != len)
561 return GNUNET_SYSERR;
568 * Notify transport service about @a status of a message with
569 * @a mid sent to @a receiver.
572 * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
573 * @param receiver which peer was the receiver
574 * @param mid message that the ack is about
577 send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
579 const struct GNUNET_PeerIdentity *receiver,
582 struct GNUNET_MQ_Envelope *env;
583 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
585 env = GNUNET_MQ_msg (ack,
586 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
587 ack->status = htonl (GNUNET_OK);
589 ack->receiver = ap->receiver;
590 GNUNET_MQ_send (ch->mq,
596 * Message queue transmission by communicator was successful,
597 * notify transport service.
599 * @param cls an `struct AckPending *`
602 send_ack_cb (void *cls)
604 struct AckPending *ap = cls;
605 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
607 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
619 * Transport service wants us to send a message. Tell the communicator.
621 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
622 * @param smt the transmission request
625 handle_send_msg (void *cls,
626 struct GNUNET_TRANSPORT_SendMessageTo *smt)
628 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
629 const struct GNUNET_MessageHeader *mh;
630 struct GNUNET_MQ_Envelope *env;
631 struct AckPending *ap;
632 struct QueueHandle *qh;
634 for (qh = ch->queue_head;NULL != qh; qh = qh->next)
635 if ( (qh->queue_id == smt->qid) &&
636 (0 == memcmp (&qh->peer,
638 sizeof (struct GNUNET_PeerIdentity))) )
642 /* queue is already gone, tell transport this one failed */
643 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
644 "Transmission failed, queue no longer exists.\n");
651 ap = GNUNET_new (struct AckPending);
653 ap->receiver = smt->receiver;
655 GNUNET_CONTAINER_DLL_insert (ch->ap_head,
658 mh = (const struct GNUNET_MessageHeader *) &smt[1];
659 env = GNUNET_MQ_msg_copy (mh);
660 GNUNET_MQ_notify_sent (env,
663 GNUNET_MQ_send (qh->mq,
669 * (re)connect our communicator to the transport service
671 * @param ch handle to reconnect
674 reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
676 struct GNUNET_MQ_MessageHandler handlers[] = {
677 GNUNET_MQ_hd_fixed_size (incoming_ack,
678 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
679 struct GNUNET_TRANSPORT_IncomingMessageAck,
681 GNUNET_MQ_hd_var_size (create_queue,
682 GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE,
683 struct GNUNET_TRANSPORT_CreateQueue,
685 GNUNET_MQ_hd_var_size (send_msg,
686 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
687 struct GNUNET_TRANSPORT_SendMessageTo,
689 GNUNET_MQ_handler_end()
692 ch->mq = GNUNET_CLIENT_connect (cfg,
697 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
700 send_add_address (ai);
701 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
709 * Connect to the transport service.
711 * @param cfg configuration to use
712 * @param name name of the communicator that is connecting
713 * @param mtu maximum message size supported by communicator, 0 if
714 * sending is not supported, SIZE_MAX for no MTU
715 * @param mq_init function to call to initialize a message queue given
716 * the address of another peer, can be NULL if the
717 * communicator only supports receiving messages
718 * @param mq_init_cls closure for @a mq_init
719 * @return NULL on error
721 struct GNUNET_TRANSPORT_CommunicatorHandle *
722 GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
725 GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
728 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
730 ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
734 ch->mq_init = mq_init;
735 ch->mq_init_cls = mq_init_cls;
738 GNUNET_CONFIGURATION_get_value_number (cfg,
741 &ch->max_queue_length))
742 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
753 * Disconnect from the transport service.
755 * @param ch handle returned from connect
758 GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
761 while (NULL != ch->ai_head)
763 GNUNET_break (0); /* communicator forgot to remove address, warn! */
764 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
770 /* ************************* Receiving *************************** */
774 * Notify transport service that the communicator has received
777 * @param ch connection to transport service
778 * @param sender presumed sender of the message (details to be checked
780 * @param msg the message
781 * @param cb function to call once handling the message is done, NULL if
782 * flow control is not supported by this communicator
783 * @param cb_cls closure for @a cb
784 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
785 * immediately dropped due to memory limitations (communicator
786 * should try to apply back pressure),
787 * #GNUNET_SYSERR if the message could not be delivered because
788 * the tranport service is not yet up
791 GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
792 const struct GNUNET_PeerIdentity *sender,
793 const struct GNUNET_MessageHeader *msg,
794 GNUNET_TRANSPORT_MessageCompletedCallback cb,
797 struct GNUNET_MQ_Envelope *env;
798 struct GNUNET_TRANSPORT_IncomingMessage *im;
801 if (NULL == ai->ch->mq)
802 return GNUNET_SYSERR;
805 struct FlowControl *fc;
807 im->fc_on = htonl (GNUNET_YES);
808 im->fc_id = ai->ch->fc_gen++;
809 fc = GNUNET_new (struct FlowControl);
810 fc->sender = *sender;
814 GNUNET_CONTAINER_DLL_insert (ch->fc_head,
820 if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
822 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
823 "Dropping message: transprot is too slow, queue length %u exceeded\n",
824 ch->max_queue_length);
829 msize = ntohs (msg->size);
830 env = GNUNET_MQ_msg_extra (im,
832 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
836 return GNUNET_SYSERR;
838 im->sender = *sender;
842 GNUNET_MQ_send (ai->ch->mq,
848 /* ************************* Discovery *************************** */
852 * Notify transport service that an MQ became available due to an
853 * "inbound" connection or because the communicator discovered the
854 * presence of another peer.
856 * @param ch connection to transport service
857 * @param peer peer with which we can now communicate
858 * @param address address in human-readable format, 0-terminated, UTF-8
859 * @param nt which network type does the @a address belong to?
860 * @param mq message queue of the @a peer
861 * @return API handle identifying the new MQ
863 struct GNUNET_TRANSPORT_QueueHandle *
864 GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
865 const struct GNUNET_PeerIdentity *peer,
867 enum GNUNET_ATS_Network_Type nt,
868 struct GNUNET_MQ_Handle *mq)
870 struct GNUNET_TRANSPORT_QueueHandle *qh;
872 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
875 qh->address = GNUNET_strdup (address);
878 qh->queue_id = ch->queue_gen++;
879 GNUNET_CONTAINER_DLL_insert (ch->queue_head,
888 * Notify transport service that an MQ became unavailable due to a
889 * disconnect or timeout.
891 * @param qh handle for the queue that must be invalidated
894 GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
896 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
899 GNUNET_CONTAINER_DLL_remove (ch->queue_head,
902 GNUNET_MQ_destroy (qh->mq);
903 GNUNET_free (qh->address);
909 * Notify transport service about an address that this communicator
910 * provides for this peer.
912 * @param ch connection to transport service
913 * @param address our address in human-readable format, 0-terminated, UTF-8
914 * @param nt which network type does the address belong to?
915 * @param expiration when does the communicator forsee this address expiring?
917 struct GNUNET_TRANSPORT_AddressIdentifier *
918 GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
920 enum GNUNET_ATS_Network_Type nt,
921 struct GNUNET_TIME_Relative expiration)
923 struct GNUNET_TRANSPORT_AddressIdentifier *ai;
925 ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
927 ai->address = GNUNET_strdup (address);
929 ai->expiration = expiration;
930 ai->aid = handle->aid_gen++;
931 GNUNET_CONTAINER_DLL_insert (handle->ai_head,
934 send_add_address (ai);
940 * Notify transport service about an address that this communicator no
941 * longer provides for this peer.
943 * @param ai address that is no longer provided
946 GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
948 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
950 send_del_address (ai);
951 GNUNET_CONTAINER_DLL_remove (ch->ai_head,
954 GNUNET_free (ai->address);
959 /* end of transport_api2_communication.c */