3 This file is part of GNUnet.
4 Copyright (C) 2001-2017 GNUnet e.V.
6 GNUnet is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 3, or (at your
9 option) any later version.
11 GNUnet is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with GNUnet; see the file COPYING. If not, write to the
18 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19 Boston, MA 02110-1301, USA.
22 * @file cadet/gnunet-service-cadet-new_channel.c
23 * @brief logical links between CADET clients
24 * @author Bartlomiej Polot
25 * @author Christian Grothoff
28 * - FIXME: send ACKs back to loopback clients!
30 * - introduce shutdown so we can have half-closed channels, modify
31 * destroy to include MID to have FIN-ACK equivalents, etc.
32 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
33 * - check that '0xFFULL' really is sufficient for flow control!
34 * - revisit handling of 'unreliable' traffic!
35 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
36 * - figure out flow control without ACKs (unreliable traffic!)
39 #include "gnunet_util_lib.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-cadet-new.h"
43 #include "gnunet-service-cadet-new_channel.h"
44 #include "gnunet-service-cadet-new_connection.h"
45 #include "gnunet-service-cadet-new_tunnels.h"
46 #include "gnunet-service-cadet-new_peer.h"
47 #include "gnunet-service-cadet-new_paths.h"
49 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
52 * How long do we initially wait before retransmitting?
54 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
57 * How long do we wait before dropping state about incoming
58 * connection to closed port?
60 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
64 * All the states a connection can be in.
66 enum CadetChannelState
69 * Uninitialized status, should never appear in operation.
74 * Connection create message sent, waiting for ACK.
76 CADET_CHANNEL_OPEN_SENT,
79 * Connection confirmed, ready to carry traffic.
86 * Info needed to retry a message in case it gets lost.
87 * Note that we DO use this structure also for unreliable
90 struct CadetReliableMessage
93 * Double linked list, FIFO style
95 struct CadetReliableMessage *next;
98 * Double linked list, FIFO style
100 struct CadetReliableMessage *prev;
103 * Which channel is this message in?
105 struct CadetChannel *ch;
108 * Entry in the tunnels queue for this message, NULL if it has left
109 * the tunnel. Used to cancel transmission in case we receive an
112 struct CadetTunnelQueueEntry *qe;
115 * How soon should we retry if we fail to get an ACK?
116 * Messages in the queue are sorted by this value.
118 struct GNUNET_TIME_Absolute next_retry;
121 * How long do we wait for an ACK after transmission?
122 * Use for the back-off calculation.
124 struct GNUNET_TIME_Relative retry_delay;
127 * Data message we are trying to send.
129 struct GNUNET_CADET_ChannelAppDataMessage data_message;
131 /* followed by variable-size payload */
136 * List of received out-of-order data messages.
138 struct CadetOutOfOrderMessage
141 * Double linked list, FIFO style
143 struct CadetOutOfOrderMessage *next;
146 * Double linked list, FIFO style
148 struct CadetOutOfOrderMessage *prev;
151 * ID of the message (messages up to this point needed
152 * before we give this one to the client).
154 struct ChannelMessageIdentifier mid;
157 * The envelope with the payload of the out-of-order message
159 struct GNUNET_MQ_Envelope *env;
165 * Client endpoint of a `struct CadetChannel`. A channel may be a
166 * loopback channel, in which case it has two of these endpoints.
167 * Note that flow control also is required in both directions.
169 struct CadetChannelClient
172 * Client handle. Not by itself sufficient to designate
173 * the client endpoint, as the same client handle may
174 * be used for both the owner and the destination, and
175 * we thus also need the channel ID to identify the client.
177 struct CadetClient *c;
180 * Head of DLL of messages received out of order or while client was unready.
182 struct CadetOutOfOrderMessage *head_recv;
185 * Tail DLL of messages received out of order or while client was unready.
187 struct CadetOutOfOrderMessage *tail_recv;
190 * Local tunnel number for this client.
191 * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
192 * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
194 struct GNUNET_CADET_ClientChannelNumber ccn;
197 * Can we send data to the client?
205 * Struct containing all information regarding a channel to a remote client.
210 * Tunnel this channel is in.
212 struct CadetTunnel *t;
215 * Client owner of the tunnel, if any.
216 * (Used if this channel represends the initiating end of the tunnel.)
218 struct CadetChannelClient *owner;
221 * Client destination of the tunnel, if any.
222 * (Used if this channel represents the listening end of the tunnel.)
224 struct CadetChannelClient *dest;
227 * Last entry in the tunnel's queue relating to control messages
228 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
229 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
230 * transmission in case we receive updated information.
232 struct CadetTunnelQueueEntry *last_control_qe;
235 * Head of DLL of messages sent and not yet ACK'd.
237 struct CadetReliableMessage *head_sent;
240 * Tail of DLL of messages sent and not yet ACK'd.
242 struct CadetReliableMessage *tail_sent;
245 * Task to resend/poll in case no ACK is received.
247 struct GNUNET_SCHEDULER_Task *retry_control_task;
250 * Task to resend/poll in case no ACK is received.
252 struct GNUNET_SCHEDULER_Task *retry_data_task;
255 * Last time the channel was used
257 struct GNUNET_TIME_Absolute timestamp;
260 * Destination port of the channel.
262 struct GNUNET_HashCode port;
265 * Counter for exponential backoff.
267 struct GNUNET_TIME_Relative retry_time;
270 * How long does it usually take to get an ACK.
272 struct GNUNET_TIME_Relative expected_delay;
275 * Bitfield of already-received messages past @e mid_recv.
277 uint64_t mid_futures;
280 * Next MID expected for incoming traffic.
282 struct ChannelMessageIdentifier mid_recv;
285 * Next MID to use for outgoing traffic.
287 struct ChannelMessageIdentifier mid_send;
290 * Total (reliable) messages pending ACK for this channel.
292 unsigned int pending_messages;
295 * Maximum (reliable) messages pending ACK for this channel
296 * before we throttle the client.
298 unsigned int max_pending_messages;
301 * Number identifying this channel in its tunnel.
303 struct GNUNET_CADET_ChannelTunnelNumber ctn;
308 enum CadetChannelState state;
311 * Is the tunnel bufferless (minimum latency)?
316 * Is the tunnel reliable?
321 * Is the tunnel out-of-order?
326 * Is this channel a loopback channel, where the destination is us again?
331 * Flag to signal the destruction of the channel. If this is set to
332 * #GNUNET_YES the channel will be destroyed once the queue is
341 * Get the static string for identification of the channel.
345 * @return Static string with the channel IDs.
348 GCCH_2s (const struct CadetChannel *ch)
350 static char buf[128];
352 GNUNET_snprintf (buf,
354 "Channel %s:%s ctn:%X(%X/%X)",
355 (GNUNET_YES == ch->is_loopback)
357 : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
358 GNUNET_h2s (&ch->port),
360 (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
361 (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
367 * Get the channel's public ID.
371 * @return ID used to identify the channel with the remote peer.
373 struct GNUNET_CADET_ChannelTunnelNumber
374 GCCH_get_id (const struct CadetChannel *ch)
381 * Release memory associated with @a ccc
383 * @param ccc data structure to clean up
386 free_channel_client (struct CadetChannelClient *ccc)
388 struct CadetOutOfOrderMessage *com;
390 while (NULL != (com = ccc->head_recv))
392 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
395 GNUNET_MQ_discard (com->env);
403 * Destroy the given channel.
405 * @param ch channel to destroy
408 channel_destroy (struct CadetChannel *ch)
410 struct CadetReliableMessage *crm;
412 while (NULL != (crm = ch->head_sent))
414 GNUNET_assert (ch == crm->ch);
417 GCT_send_cancel (crm->qe);
420 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
425 if (NULL != ch->owner)
427 free_channel_client (ch->owner);
430 if (NULL != ch->dest)
432 free_channel_client (ch->dest);
435 if (NULL != ch->last_control_qe)
437 GCT_send_cancel (ch->last_control_qe);
438 ch->last_control_qe = NULL;
440 if (NULL != ch->retry_data_task)
442 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
443 ch->retry_data_task = NULL;
445 if (NULL != ch->retry_control_task)
447 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
448 ch->retry_control_task = NULL;
450 if (GNUNET_NO == ch->is_loopback)
452 GCT_remove_channel (ch->t,
462 * Send a channel create message.
464 * @param cls Channel for which to send.
467 send_channel_open (void *cls);
471 * Function called once the tunnel confirms that we sent the
472 * create message. Delays for a bit until we retry.
474 * @param cls our `struct CadetChannel`.
477 channel_open_sent_cb (void *cls)
479 struct CadetChannel *ch = cls;
481 GNUNET_assert (NULL != ch->last_control_qe);
482 ch->last_control_qe = NULL;
483 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
484 LOG (GNUNET_ERROR_TYPE_DEBUG,
485 "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
487 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
489 ch->retry_control_task
490 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
497 * Send a channel open message.
499 * @param cls Channel for which to send.
502 send_channel_open (void *cls)
504 struct CadetChannel *ch = cls;
505 struct GNUNET_CADET_ChannelOpenMessage msgcc;
508 ch->retry_control_task = NULL;
509 LOG (GNUNET_ERROR_TYPE_DEBUG,
510 "Sending CHANNEL_OPEN message for %s\n",
514 options |= GNUNET_CADET_OPTION_NOBUFFER;
516 options |= GNUNET_CADET_OPTION_RELIABLE;
517 if (ch->out_of_order)
518 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
519 msgcc.header.size = htons (sizeof (msgcc));
520 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
521 msgcc.opt = htonl (options);
522 msgcc.port = ch->port;
524 ch->state = CADET_CHANNEL_OPEN_SENT;
525 ch->last_control_qe = GCT_send (ch->t,
527 &channel_open_sent_cb,
533 * Function called once and only once after a channel was bound
534 * to its tunnel via #GCT_add_channel() is ready for transmission.
535 * Note that this is only the case for channels that this peer
536 * initiates, as for incoming channels we assume that they are
537 * ready for transmission immediately upon receiving the open
538 * message. Used to bootstrap the #GCT_send() process.
540 * @param ch the channel for which the tunnel is now ready
543 GCCH_tunnel_up (struct CadetChannel *ch)
545 GNUNET_assert (NULL == ch->retry_control_task);
546 LOG (GNUNET_ERROR_TYPE_DEBUG,
547 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
549 ch->retry_control_task
550 = GNUNET_SCHEDULER_add_now (&send_channel_open,
556 * Create a new channel.
558 * @param owner local client owning the channel
559 * @param ccn local number of this channel at the @a owner
560 * @param destination peer to which we should build the channel
561 * @param port desired port at @a destination
562 * @param options options for the channel
563 * @return handle to the new channel
565 struct CadetChannel *
566 GCCH_channel_local_new (struct CadetClient *owner,
567 struct GNUNET_CADET_ClientChannelNumber ccn,
568 struct CadetPeer *destination,
569 const struct GNUNET_HashCode *port,
572 struct CadetChannel *ch;
573 struct CadetChannelClient *ccco;
575 ccco = GNUNET_new (struct CadetChannelClient);
578 ccco->client_ready = GNUNET_YES;
580 ch = GNUNET_new (struct CadetChannel);
581 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
582 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
583 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
584 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
587 if (0 == memcmp (&my_full_id,
588 GCP_get_id (destination),
589 sizeof (struct GNUNET_PeerIdentity)))
591 struct CadetClient *c;
593 ch->is_loopback = GNUNET_YES;
594 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
598 /* port closed, wait for it to possibly open */
599 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
602 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
603 LOG (GNUNET_ERROR_TYPE_DEBUG,
604 "Created loose incoming loopback channel to port %s\n",
605 GNUNET_h2s (&ch->port));
609 ch->dest = GNUNET_new (struct CadetChannelClient);
611 ch->dest->client_ready = GNUNET_YES;
618 ch->t = GCP_get_tunnel (destination,
620 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
621 ch->ctn = GCT_add_channel (ch->t,
624 GNUNET_STATISTICS_update (stats,
628 LOG (GNUNET_ERROR_TYPE_DEBUG,
629 "Created channel to port %s at peer %s for %s using %s\n",
631 GCP_2s (destination),
633 (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
639 * We had an incoming channel to a port that is closed.
640 * It has not been opened for a while, drop it.
642 * @param cls the channel to drop
645 timeout_closed_cb (void *cls)
647 struct CadetChannel *ch = cls;
649 ch->retry_control_task = NULL;
650 LOG (GNUNET_ERROR_TYPE_DEBUG,
651 "Closing incoming channel to port %s from peer %s due to timeout\n",
652 GNUNET_h2s (&ch->port),
653 GCP_2s (GCT_get_destination (ch->t)));
654 channel_destroy (ch);
659 * Create a new channel based on a request coming in over the network.
661 * @param t tunnel to the remote peer
662 * @param ctn identifier of this channel in the tunnel
663 * @param port desired local port
664 * @param options options for the channel
665 * @return handle to the new channel
667 struct CadetChannel *
668 GCCH_channel_incoming_new (struct CadetTunnel *t,
669 struct GNUNET_CADET_ChannelTunnelNumber ctn,
670 const struct GNUNET_HashCode *port,
673 struct CadetChannel *ch;
674 struct CadetClient *c;
676 ch = GNUNET_new (struct CadetChannel);
680 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
681 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
682 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
683 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
684 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
685 GNUNET_STATISTICS_update (stats,
690 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
694 /* port closed, wait for it to possibly open */
695 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
698 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
699 ch->retry_control_task
700 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
703 LOG (GNUNET_ERROR_TYPE_DEBUG,
704 "Created loose incoming channel to port %s from peer %s\n",
705 GNUNET_h2s (&ch->port),
706 GCP_2s (GCT_get_destination (ch->t)));
713 GNUNET_STATISTICS_update (stats,
722 * Function called once the tunnel confirms that we sent the
723 * ACK message. Just remembers it was sent, we do not expect
726 * @param cls our `struct CadetChannel`.
729 send_ack_cb (void *cls)
731 struct CadetChannel *ch = cls;
733 GNUNET_assert (NULL != ch->last_control_qe);
734 ch->last_control_qe = NULL;
739 * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
741 * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
744 send_channel_data_ack (struct CadetChannel *ch)
746 struct GNUNET_CADET_ChannelDataAckMessage msg;
748 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
749 msg.header.size = htons (sizeof (msg));
751 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
752 msg.futures = GNUNET_htonll (ch->mid_futures);
753 if (NULL != ch->last_control_qe)
754 GCT_send_cancel (ch->last_control_qe);
755 ch->last_control_qe = GCT_send (ch->t,
763 * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
766 * @param cls the `struct CadetChannel`
769 send_open_ack (void *cls)
771 struct CadetChannel *ch = cls;
772 struct GNUNET_CADET_ChannelManageMessage msg;
774 LOG (GNUNET_ERROR_TYPE_DEBUG,
775 "Sending CHANNEL_OPEN_ACK on %s\n",
777 ch->retry_control_task = NULL;
778 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
779 msg.header.size = htons (sizeof (msg));
780 msg.reserved = htonl (0);
782 if (NULL != ch->last_control_qe)
783 GCT_send_cancel (ch->last_control_qe);
784 ch->last_control_qe = GCT_send (ch->t,
792 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
793 * this channel. If the binding was successful, (re)transmit the
794 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
796 * @param ch channel that got the duplicate open
799 GCCH_handle_duplicate_open (struct CadetChannel *ch)
801 if (NULL == ch->dest)
803 LOG (GNUNET_ERROR_TYPE_DEBUG,
804 "Ignoring duplicate channel OPEN on %s: port is closed\n",
808 if (NULL != ch->retry_control_task)
810 LOG (GNUNET_ERROR_TYPE_DEBUG,
811 "Ignoring duplicate channel OPEN on %s: control message is pending\n",
815 LOG (GNUNET_ERROR_TYPE_DEBUG,
816 "Retransmitting OPEN_ACK on %s\n",
818 ch->retry_control_task
819 = GNUNET_SCHEDULER_add_now (&send_open_ack,
825 * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
827 * @param ch channel the ack is for
828 * @param to_owner #GNUNET_YES to send to owner,
829 * #GNUNET_NO to send to dest
832 send_ack_to_client (struct CadetChannel *ch,
835 struct GNUNET_MQ_Envelope *env;
836 struct GNUNET_CADET_LocalAck *ack;
837 struct CadetChannelClient *ccc;
839 env = GNUNET_MQ_msg (ack,
840 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
841 ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
843 LOG (GNUNET_ERROR_TYPE_DEBUG,
844 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n",
846 (GNUNET_YES == to_owner) ? "owner" : "dest",
847 ntohl (ack->ccn.channel_of_client));
848 GSC_send_to_client (ccc->c,
854 * A client is bound to the port that we have a channel
855 * open to. Send the acknowledgement for the connection
856 * request and establish the link with the client.
858 * @param ch open incoming channel
859 * @param c client listening on the respective port
862 GCCH_bind (struct CadetChannel *ch,
863 struct CadetClient *c)
866 struct CadetChannelClient *cccd;
868 LOG (GNUNET_ERROR_TYPE_DEBUG,
869 "Binding %s from %s to port %s of %s\n",
872 GNUNET_h2s (&ch->port),
874 if (NULL != ch->retry_control_task)
876 /* there might be a timeout task here */
877 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
878 ch->retry_control_task = NULL;
882 options |= GNUNET_CADET_OPTION_NOBUFFER;
884 options |= GNUNET_CADET_OPTION_RELIABLE;
885 if (ch->out_of_order)
886 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
887 cccd = GNUNET_new (struct CadetChannelClient);
890 cccd->client_ready = GNUNET_YES;
891 cccd->ccn = GSC_bind (c,
893 (GNUNET_YES == ch->is_loopback)
894 ? GCP_get (&my_full_id,
896 : GCT_get_destination (ch->t),
899 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
900 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
901 ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
902 if (GNUNET_YES == ch->is_loopback)
904 ch->state = CADET_CHANNEL_OPEN_SENT;
905 GCCH_handle_channel_open_ack (ch);
909 /* notify other peer that we accepted the connection */
910 ch->retry_control_task
911 = GNUNET_SCHEDULER_add_now (&send_open_ack,
914 /* give client it's initial supply of ACKs */
915 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
916 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
917 for (unsigned int i=0;i<ch->max_pending_messages;i++)
918 send_ack_to_client (ch,
924 * Destroy locally created channel. Called by the local client, so no
925 * need to tell the client.
927 * @param ch channel to destroy
928 * @param c client that caused the destruction
929 * @param ccn client number of the client @a c
932 GCCH_channel_local_destroy (struct CadetChannel *ch,
933 struct CadetClient *c,
934 struct GNUNET_CADET_ClientChannelNumber ccn)
936 LOG (GNUNET_ERROR_TYPE_DEBUG,
937 "%s asks for destruction of %s\n",
940 GNUNET_assert (NULL != c);
941 if ( (NULL != ch->owner) &&
942 (c == ch->owner->c) &&
943 (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
945 free_channel_client (ch->owner);
948 else if ( (NULL != ch->dest) &&
949 (c == ch->dest->c) &&
950 (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
952 free_channel_client (ch->dest);
960 if (GNUNET_YES == ch->destroy)
962 /* other end already destroyed, with the local client gone, no need
963 to finish transmissions, just destroy immediately. */
964 channel_destroy (ch);
967 if ( (NULL != ch->head_sent) ||
968 (NULL != ch->owner) ||
971 /* Wait for other end to destroy us as well,
972 and otherwise allow send queue to be transmitted first */
973 ch->destroy = GNUNET_YES;
976 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
977 if (CADET_CHANNEL_NEW != ch->state)
978 GCT_send_channel_destroy (ch->t,
980 /* Nothing left to do, just finish destruction */
981 channel_destroy (ch);
986 * We got an acknowledgement for the creation of the channel
987 * (the port is open on the other side). Begin transmissions.
989 * @param ch channel to destroy
992 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
996 case CADET_CHANNEL_NEW:
997 /* this should be impossible */
1000 case CADET_CHANNEL_OPEN_SENT:
1001 if (NULL == ch->owner)
1003 /* We're not the owner, wrong direction! */
1004 GNUNET_break_op (0);
1007 LOG (GNUNET_ERROR_TYPE_DEBUG,
1008 "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1010 if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1012 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1013 ch->retry_control_task = NULL;
1015 ch->state = CADET_CHANNEL_READY;
1016 /* On first connect, send client as many ACKs as we allow messages
1018 for (unsigned int i=0;i<ch->max_pending_messages;i++)
1019 send_ack_to_client (ch,
1022 case CADET_CHANNEL_READY:
1023 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1024 LOG (GNUNET_ERROR_TYPE_DEBUG,
1025 "Received duplicate channel OPEN_ACK for %s\n",
1027 GNUNET_STATISTICS_update (stats,
1028 "# duplicate CREATE_ACKs",
1037 * Test if element @a e1 comes before element @a e2.
1039 * TODO: use opportunity to create generic list insertion sort
1040 * logic in container!
1042 * @param cls closure, our `struct CadetChannel`
1043 * @param e1 an element of to sort
1044 * @param e2 another element to sort
1045 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1048 is_before (void *cls,
1052 struct CadetOutOfOrderMessage *m1 = e1;
1053 struct CadetOutOfOrderMessage *m2 = e2;
1054 uint32_t v1 = ntohl (m1->mid.mid);
1055 uint32_t v2 = ntohl (m2->mid.mid);
1059 if (delta > (uint32_t) INT_MAX)
1061 /* in overflow range, we can safely assume we wrapped around */
1072 * We got payload data for a channel. Pass it on to the client
1073 * and send an ACK to the other end (once flow control allows it!)
1075 * @param ch channel that got data
1076 * @param msg message that was received
1079 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1080 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1082 struct GNUNET_MQ_Envelope *env;
1083 struct GNUNET_CADET_LocalData *ld;
1084 struct CadetChannelClient *ccc;
1085 struct CadetOutOfOrderMessage *com;
1086 size_t payload_size;
1088 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1089 payload_size = ntohs (msg->header.size) - sizeof (*msg);
1090 env = GNUNET_MQ_msg_extra (ld,
1092 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1093 ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1094 GNUNET_memcpy (&ld[1],
1097 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1098 if ( (GNUNET_YES == ccc->client_ready) &&
1099 ( (GNUNET_YES == ch->out_of_order) ||
1100 (msg->mid.mid == ch->mid_recv.mid) ) )
1102 LOG (GNUNET_ERROR_TYPE_DEBUG,
1103 "Giving %u bytes of payload from %s to client %s\n",
1104 (unsigned int) payload_size,
1107 ccc->client_ready = GNUNET_NO;
1108 GSC_send_to_client (ccc->c,
1110 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1111 ch->mid_futures >>= 1;
1115 /* FIXME-SECURITY: if the element is WAY too far ahead,
1116 drop it (can't buffer too much!) */
1117 LOG (GNUNET_ERROR_TYPE_DEBUG,
1118 "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n",
1119 (GNUNET_YES == ccc->client_ready)
1121 : "client-not-ready",
1122 (unsigned int) payload_size,
1124 ntohl (msg->mid.mid),
1125 ntohl (ch->mid_recv.mid));
1127 com = GNUNET_new (struct CadetOutOfOrderMessage);
1128 com->mid = msg->mid;
1130 /* sort into list ordered by "is_before" */
1131 if ( (NULL == ccc->head_recv) ||
1132 (GNUNET_YES == is_before (ch,
1136 GNUNET_CONTAINER_DLL_insert (ccc->head_recv,
1142 struct CadetOutOfOrderMessage *pos;
1144 for (pos = ccc->head_recv;
1155 GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv,
1159 GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv,
1169 * We got an acknowledgement for payload data for a channel.
1170 * Possibly resume transmissions.
1172 * @param ch channel that got the ack
1173 * @param ack details about what was received
1176 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1177 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1179 struct CadetReliableMessage *crm;
1181 GNUNET_break (GNUNET_NO == ch->is_loopback);
1182 if (GNUNET_NO == ch->reliable)
1184 /* not expecting ACKs on unreliable channel, odd */
1185 GNUNET_break_op (0);
1188 for (crm = ch->head_sent;
1191 if (ack->mid.mid == crm->data_message.mid.mid)
1195 /* ACK for message we already dropped, might have been a
1196 duplicate ACK? Ignore. */
1197 LOG (GNUNET_ERROR_TYPE_DEBUG,
1198 "Duplicate DATA_ACK on %s, ignoring\n",
1200 GNUNET_STATISTICS_update (stats,
1201 "# duplicate DATA_ACKs",
1206 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1209 ch->pending_messages--;
1210 send_ack_to_client (ch,
1215 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1216 LOG (GNUNET_ERROR_TYPE_DEBUG,
1217 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1219 (unsigned int) ntohl (ack->mid.mid),
1220 ch->pending_messages);
1221 send_ack_to_client (ch,
1229 * Destroy channel, based on the other peer closing the
1230 * connection. Also needs to remove this channel from
1233 * @param ch channel to destroy
1236 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1238 struct CadetChannelClient *ccc;
1240 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1241 LOG (GNUNET_ERROR_TYPE_DEBUG,
1242 "Received remote channel DESTROY for %s\n",
1244 if (GNUNET_YES == ch->destroy)
1246 /* Local client already gone, this is instant-death. */
1247 channel_destroy (ch);
1250 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1251 if (NULL != ccc->head_recv)
1253 LOG (GNUNET_ERROR_TYPE_WARNING,
1254 "Lost end of transmission due to remote shutdown on %s\n",
1256 /* FIXME: change API to notify client about truncated transmission! */
1258 ch->destroy = GNUNET_YES;
1259 GSC_handle_remote_channel_destroy (ccc->c,
1262 channel_destroy (ch);
1267 * Function called once the tunnel has sent one of our messages.
1268 * If the message is unreliable, simply frees the `crm`. If the
1269 * message was reliable, calculate retransmission time and
1270 * wait for ACK (or retransmit).
1272 * @param cls the `struct CadetReliableMessage` that was sent
1275 data_sent_cb (void *cls);
1279 * We need to retry a transmission, the last one took too long to
1282 * @param cls the `struct CadetChannel` where we need to retransmit
1285 retry_transmission (void *cls)
1287 struct CadetChannel *ch = cls;
1288 struct CadetReliableMessage *crm = ch->head_sent;
1290 ch->retry_data_task = NULL;
1291 GNUNET_assert (NULL == crm->qe);
1292 crm->qe = GCT_send (ch->t,
1293 &crm->data_message.header,
1300 * Function called once the tunnel has sent one of our messages.
1301 * If the message is unreliable, simply frees the `crm`. If the
1302 * message was reliable, calculate retransmission time and
1303 * wait for ACK (or retransmit).
1305 * @param cls the `struct CadetReliableMessage` that was sent
1308 data_sent_cb (void *cls)
1310 struct CadetReliableMessage *crm = cls;
1311 struct CadetChannel *ch = crm->ch;
1312 struct CadetReliableMessage *off;
1314 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1316 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1319 if (GNUNET_NO == ch->reliable)
1322 ch->pending_messages--;
1323 send_ack_to_client (ch,
1329 if (0 == crm->retry_delay.rel_value_us)
1330 crm->retry_delay = ch->expected_delay;
1331 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1333 /* find position for re-insertion into the DLL */
1334 if ( (NULL == ch->head_sent) ||
1335 (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1337 /* insert at HEAD, also (re)schedule retry task! */
1338 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1341 if (NULL != ch->retry_data_task)
1342 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1344 = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1345 &retry_transmission,
1349 for (off = ch->head_sent; NULL != off; off = off->next)
1350 if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1354 /* insert at tail */
1355 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1361 /* insert before off */
1362 GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1371 * Handle data given by a client.
1373 * Check whether the client is allowed to send in this tunnel, save if
1374 * channel is reliable and send an ACK to the client if there is still
1375 * buffer space in the tunnel.
1377 * @param ch Channel.
1378 * @param sender_ccn ccn of the sender
1379 * @param buf payload to transmit.
1380 * @param buf_len number of bytes in @a buf
1381 * @return #GNUNET_OK if everything goes well,
1382 * #GNUNET_SYSERR in case of an error.
1385 GCCH_handle_local_data (struct CadetChannel *ch,
1386 struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1390 struct CadetReliableMessage *crm;
1392 if (ch->pending_messages > ch->max_pending_messages)
1395 return GNUNET_SYSERR;
1397 ch->pending_messages++;
1399 if (GNUNET_YES == ch->is_loopback)
1401 struct CadetChannelClient *receiver;
1402 struct GNUNET_MQ_Envelope *env;
1403 struct GNUNET_CADET_LocalData *ld;
1406 env = GNUNET_MQ_msg_extra (ld,
1408 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1409 if (sender_ccn.channel_of_client ==
1410 ch->owner->ccn.channel_of_client)
1412 receiver = ch->dest;
1413 to_owner = GNUNET_NO;
1417 GNUNET_assert (sender_ccn.channel_of_client ==
1418 ch->dest->ccn.channel_of_client);
1419 receiver = ch->owner;
1420 to_owner = GNUNET_YES;
1422 ld->ccn = receiver->ccn;
1423 GNUNET_memcpy (&ld[1],
1426 /* FIXME: this does not provide for flow control! */
1427 GSC_send_to_client (receiver->c,
1429 send_ack_to_client (ch,
1434 /* Everything is correct, send the message. */
1435 crm = GNUNET_malloc (sizeof (*crm) + buf_len);
1437 crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1438 crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1439 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1440 crm->data_message.mid = ch->mid_send;
1441 crm->data_message.ctn = ch->ctn;
1442 GNUNET_memcpy (&crm[1],
1445 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1448 LOG (GNUNET_ERROR_TYPE_DEBUG,
1449 "Sending %u bytes from local client to %s\n",
1452 crm->qe = GCT_send (ch->t,
1453 &crm->data_message.header,
1461 * Handle ACK from client on local channel. Means the client is ready
1462 * for more data, see if we have any for it.
1464 * @param ch channel to destroy
1465 * @param client_ccn ccn of the client sending the ack
1468 GCCH_handle_local_ack (struct CadetChannel *ch,
1469 struct GNUNET_CADET_ClientChannelNumber client_ccn)
1471 struct CadetChannelClient *ccc;
1472 struct CadetOutOfOrderMessage *com;
1474 if ( (NULL != ch->owner) &&
1475 (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1477 else if ( (NULL != ch->dest) &&
1478 (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1482 ccc->client_ready = GNUNET_YES;
1483 LOG (GNUNET_ERROR_TYPE_DEBUG,
1484 "Got LOCAL_ACK, client ready to receive more data!\n");
1485 com = ccc->head_recv;
1487 return; /* none pending */
1488 if ( (com->mid.mid != ch->mid_recv.mid) &&
1489 (GNUNET_NO == ch->out_of_order) )
1490 return; /* missing next one in-order */
1492 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493 "Passing payload message to client on %s\n",
1496 /* all good, pass next message to client */
1497 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1500 /* FIXME: if unreliable, this is not aggressive
1501 enough, as it would be OK to have lost some! */
1502 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1503 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1504 ccc->client_ready = GNUNET_NO;
1505 GSC_send_to_client (ccc->c,
1508 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1509 (GNUNET_YES == ch->reliable) )
1511 /* The next 15 messages were also already received (0xFF), this
1512 suggests that the sender may be blocked on flow control
1513 urgently waiting for an ACK from us. (As we have an inherent
1514 maximum of 64 bits, and 15 is getting too close for comfort.)
1515 So we should send one now. */
1516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1517 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1519 if (GNUNET_YES == ch->reliable)
1520 send_channel_data_ack (ch);
1523 if (NULL != ccc->head_recv)
1525 if (GNUNET_NO == ch->destroy)
1527 GCT_send_channel_destroy (ch->t,
1529 channel_destroy (ch);
1533 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1539 * @param ch Channel.
1540 * @param level Debug level to use.
1543 GCCH_debug (struct CadetChannel *ch,
1544 enum GNUNET_ErrorType level)
1548 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1550 __FILE__, __FUNCTION__, __LINE__);
1556 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1564 if (NULL != ch->owner)
1567 "CHN origin %s ready %s local-id: %u\n",
1568 GSC_2s (ch->owner->c),
1569 ch->owner->client_ready ? "YES" : "NO",
1570 ntohl (ch->owner->ccn.channel_of_client));
1572 if (NULL != ch->dest)
1575 "CHN destination %s ready %s local-id: %u\n",
1576 GSC_2s (ch->dest->c),
1577 ch->dest->client_ready ? "YES" : "NO",
1578 ntohl (ch->dest->ccn.channel_of_client));
1581 "CHN Message IDs recv: %d (%LLX), send: %d\n",
1582 ntohl (ch->mid_recv.mid),
1583 (unsigned long long) ch->mid_futures,
1584 ntohl (ch->mid_send.mid));
1589 /* end of gnunet-service-cadet-new_channel.c */