3 This file is part of GNUnet.
4 Copyright (C) 2001-2017 GNUnet e.V.
6 GNUnet is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 3, or (at your
9 option) any later version.
11 GNUnet is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with GNUnet; see the file COPYING. If not, write to the
18 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19 Boston, MA 02110-1301, USA.
22 * @file cadet/gnunet-service-cadet-new_channel.c
23 * @brief logical links between CADET clients
24 * @author Bartlomiej Polot
25 * @author Christian Grothoff
28 * - FIXME: send ACKs back to loopback clients!
30 * - introduce shutdown so we can have half-closed channels, modify
31 * destroy to include MID to have FIN-ACK equivalents, etc.
32 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
33 * - check that '0xFFULL' really is sufficient for flow control!
34 * - revisit handling of 'unreliable' traffic!
35 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
36 * - figure out flow control without ACKs (unreliable traffic!)
39 #include "gnunet_util_lib.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-cadet-new.h"
43 #include "gnunet-service-cadet-new_channel.h"
44 #include "gnunet-service-cadet-new_connection.h"
45 #include "gnunet-service-cadet-new_tunnels.h"
46 #include "gnunet-service-cadet-new_peer.h"
47 #include "gnunet-service-cadet-new_paths.h"
49 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
52 * How long do we initially wait before retransmitting?
54 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
57 * How long do we wait before dropping state about incoming
58 * connection to closed port?
60 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
64 * All the states a connection can be in.
66 enum CadetChannelState
69 * Uninitialized status, should never appear in operation.
74 * Connection create message sent, waiting for ACK.
76 CADET_CHANNEL_OPEN_SENT,
79 * Connection confirmed, ready to carry traffic.
86 * Info needed to retry a message in case it gets lost.
87 * Note that we DO use this structure also for unreliable
90 struct CadetReliableMessage
93 * Double linked list, FIFO style
95 struct CadetReliableMessage *next;
98 * Double linked list, FIFO style
100 struct CadetReliableMessage *prev;
103 * Which channel is this message in?
105 struct CadetChannel *ch;
108 * Entry in the tunnels queue for this message, NULL if it has left
109 * the tunnel. Used to cancel transmission in case we receive an
112 struct CadetTunnelQueueEntry *qe;
115 * How soon should we retry if we fail to get an ACK?
116 * Messages in the queue are sorted by this value.
118 struct GNUNET_TIME_Absolute next_retry;
121 * How long do we wait for an ACK after transmission?
122 * Use for the back-off calculation.
124 struct GNUNET_TIME_Relative retry_delay;
127 * Data message we are trying to send.
129 struct GNUNET_CADET_ChannelAppDataMessage *data_message;
135 * List of received out-of-order data messages.
137 struct CadetOutOfOrderMessage
140 * Double linked list, FIFO style
142 struct CadetOutOfOrderMessage *next;
145 * Double linked list, FIFO style
147 struct CadetOutOfOrderMessage *prev;
150 * ID of the message (messages up to this point needed
151 * before we give this one to the client).
153 struct ChannelMessageIdentifier mid;
156 * The envelope with the payload of the out-of-order message
158 struct GNUNET_MQ_Envelope *env;
164 * Client endpoint of a `struct CadetChannel`. A channel may be a
165 * loopback channel, in which case it has two of these endpoints.
166 * Note that flow control also is required in both directions.
168 struct CadetChannelClient
171 * Client handle. Not by itself sufficient to designate
172 * the client endpoint, as the same client handle may
173 * be used for both the owner and the destination, and
174 * we thus also need the channel ID to identify the client.
176 struct CadetClient *c;
179 * Head of DLL of messages received out of order or while client was unready.
181 struct CadetOutOfOrderMessage *head_recv;
184 * Tail DLL of messages received out of order or while client was unready.
186 struct CadetOutOfOrderMessage *tail_recv;
189 * Local tunnel number for this client.
190 * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
191 * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
193 struct GNUNET_CADET_ClientChannelNumber ccn;
196 * Can we send data to the client?
204 * Struct containing all information regarding a channel to a remote client.
209 * Tunnel this channel is in.
211 struct CadetTunnel *t;
214 * Client owner of the tunnel, if any.
215 * (Used if this channel represends the initiating end of the tunnel.)
217 struct CadetChannelClient *owner;
220 * Client destination of the tunnel, if any.
221 * (Used if this channel represents the listening end of the tunnel.)
223 struct CadetChannelClient *dest;
226 * Last entry in the tunnel's queue relating to control messages
227 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
228 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
229 * transmission in case we receive updated information.
231 struct CadetTunnelQueueEntry *last_control_qe;
234 * Head of DLL of messages sent and not yet ACK'd.
236 struct CadetReliableMessage *head_sent;
239 * Tail of DLL of messages sent and not yet ACK'd.
241 struct CadetReliableMessage *tail_sent;
244 * Task to resend/poll in case no ACK is received.
246 struct GNUNET_SCHEDULER_Task *retry_control_task;
249 * Task to resend/poll in case no ACK is received.
251 struct GNUNET_SCHEDULER_Task *retry_data_task;
254 * Last time the channel was used
256 struct GNUNET_TIME_Absolute timestamp;
259 * Destination port of the channel.
261 struct GNUNET_HashCode port;
264 * Counter for exponential backoff.
266 struct GNUNET_TIME_Relative retry_time;
269 * How long does it usually take to get an ACK.
271 struct GNUNET_TIME_Relative expected_delay;
274 * Bitfield of already-received messages past @e mid_recv.
276 uint64_t mid_futures;
279 * Next MID expected for incoming traffic.
281 struct ChannelMessageIdentifier mid_recv;
284 * Next MID to use for outgoing traffic.
286 struct ChannelMessageIdentifier mid_send;
289 * Total (reliable) messages pending ACK for this channel.
291 unsigned int pending_messages;
294 * Maximum (reliable) messages pending ACK for this channel
295 * before we throttle the client.
297 unsigned int max_pending_messages;
300 * Number identifying this channel in its tunnel.
302 struct GNUNET_CADET_ChannelTunnelNumber ctn;
307 enum CadetChannelState state;
310 * Is the tunnel bufferless (minimum latency)?
315 * Is the tunnel reliable?
320 * Is the tunnel out-of-order?
325 * Is this channel a loopback channel, where the destination is us again?
330 * Flag to signal the destruction of the channel. If this is set to
331 * #GNUNET_YES the channel will be destroyed once the queue is
340 * Get the static string for identification of the channel.
344 * @return Static string with the channel IDs.
347 GCCH_2s (const struct CadetChannel *ch)
349 static char buf[128];
351 GNUNET_snprintf (buf,
353 "Channel %s:%s ctn:%X(%X/%X)",
354 (GNUNET_YES == ch->is_loopback)
356 : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
357 GNUNET_h2s (&ch->port),
359 (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
360 (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
366 * Get the channel's public ID.
370 * @return ID used to identify the channel with the remote peer.
372 struct GNUNET_CADET_ChannelTunnelNumber
373 GCCH_get_id (const struct CadetChannel *ch)
380 * Release memory associated with @a ccc
382 * @param ccc data structure to clean up
385 free_channel_client (struct CadetChannelClient *ccc)
387 struct CadetOutOfOrderMessage *com;
389 while (NULL != (com = ccc->head_recv))
391 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
394 GNUNET_MQ_discard (com->env);
402 * Destroy the given channel.
404 * @param ch channel to destroy
407 channel_destroy (struct CadetChannel *ch)
409 struct CadetReliableMessage *crm;
411 while (NULL != (crm = ch->head_sent))
413 GNUNET_assert (ch == crm->ch);
416 GCT_send_cancel (crm->qe);
419 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
422 GNUNET_free (crm->data_message);
425 if (NULL != ch->owner)
427 free_channel_client (ch->owner);
430 if (NULL != ch->dest)
432 free_channel_client (ch->dest);
435 if (NULL != ch->last_control_qe)
437 GCT_send_cancel (ch->last_control_qe);
438 ch->last_control_qe = NULL;
440 if (NULL != ch->retry_data_task)
442 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
443 ch->retry_data_task = NULL;
445 if (NULL != ch->retry_control_task)
447 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
448 ch->retry_control_task = NULL;
450 if (GNUNET_NO == ch->is_loopback)
452 GCT_remove_channel (ch->t,
462 * Send a channel create message.
464 * @param cls Channel for which to send.
467 send_channel_open (void *cls);
471 * Function called once the tunnel confirms that we sent the
472 * create message. Delays for a bit until we retry.
474 * @param cls our `struct CadetChannel`.
477 channel_open_sent_cb (void *cls)
479 struct CadetChannel *ch = cls;
481 GNUNET_assert (NULL != ch->last_control_qe);
482 ch->last_control_qe = NULL;
483 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
484 LOG (GNUNET_ERROR_TYPE_DEBUG,
485 "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
487 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
489 ch->retry_control_task
490 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
497 * Send a channel open message.
499 * @param cls Channel for which to send.
502 send_channel_open (void *cls)
504 struct CadetChannel *ch = cls;
505 struct GNUNET_CADET_ChannelOpenMessage msgcc;
508 ch->retry_control_task = NULL;
509 LOG (GNUNET_ERROR_TYPE_DEBUG,
510 "Sending CHANNEL_OPEN message for %s\n",
514 options |= GNUNET_CADET_OPTION_NOBUFFER;
516 options |= GNUNET_CADET_OPTION_RELIABLE;
517 if (ch->out_of_order)
518 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
519 msgcc.header.size = htons (sizeof (msgcc));
520 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
521 msgcc.opt = htonl (options);
522 msgcc.port = ch->port;
524 ch->state = CADET_CHANNEL_OPEN_SENT;
525 ch->last_control_qe = GCT_send (ch->t,
527 &channel_open_sent_cb,
533 * Function called once and only once after a channel was bound
534 * to its tunnel via #GCT_add_channel() is ready for transmission.
535 * Note that this is only the case for channels that this peer
536 * initiates, as for incoming channels we assume that they are
537 * ready for transmission immediately upon receiving the open
538 * message. Used to bootstrap the #GCT_send() process.
540 * @param ch the channel for which the tunnel is now ready
543 GCCH_tunnel_up (struct CadetChannel *ch)
545 GNUNET_assert (NULL == ch->retry_control_task);
546 LOG (GNUNET_ERROR_TYPE_DEBUG,
547 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
549 ch->retry_control_task
550 = GNUNET_SCHEDULER_add_now (&send_channel_open,
556 * Create a new channel.
558 * @param owner local client owning the channel
559 * @param ccn local number of this channel at the @a owner
560 * @param destination peer to which we should build the channel
561 * @param port desired port at @a destination
562 * @param options options for the channel
563 * @return handle to the new channel
565 struct CadetChannel *
566 GCCH_channel_local_new (struct CadetClient *owner,
567 struct GNUNET_CADET_ClientChannelNumber ccn,
568 struct CadetPeer *destination,
569 const struct GNUNET_HashCode *port,
572 struct CadetChannel *ch;
573 struct CadetChannelClient *ccco;
575 ccco = GNUNET_new (struct CadetChannelClient);
578 ccco->client_ready = GNUNET_YES;
580 ch = GNUNET_new (struct CadetChannel);
581 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
582 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
583 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
584 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
587 if (0 == memcmp (&my_full_id,
588 GCP_get_id (destination),
589 sizeof (struct GNUNET_PeerIdentity)))
591 struct CadetClient *c;
593 ch->is_loopback = GNUNET_YES;
594 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
598 /* port closed, wait for it to possibly open */
599 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
602 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
603 LOG (GNUNET_ERROR_TYPE_DEBUG,
604 "Created loose incoming loopback channel to port %s\n",
605 GNUNET_h2s (&ch->port));
609 ch->dest = GNUNET_new (struct CadetChannelClient);
611 ch->dest->client_ready = GNUNET_YES;
618 ch->t = GCP_get_tunnel (destination,
620 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
621 ch->ctn = GCT_add_channel (ch->t,
624 GNUNET_STATISTICS_update (stats,
628 LOG (GNUNET_ERROR_TYPE_DEBUG,
629 "Created channel to port %s at peer %s for %s using %s\n",
631 GCP_2s (destination),
633 (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
639 * We had an incoming channel to a port that is closed.
640 * It has not been opened for a while, drop it.
642 * @param cls the channel to drop
645 timeout_closed_cb (void *cls)
647 struct CadetChannel *ch = cls;
649 ch->retry_control_task = NULL;
650 LOG (GNUNET_ERROR_TYPE_DEBUG,
651 "Closing incoming channel to port %s from peer %s due to timeout\n",
652 GNUNET_h2s (&ch->port),
653 GCP_2s (GCT_get_destination (ch->t)));
654 channel_destroy (ch);
659 * Create a new channel based on a request coming in over the network.
661 * @param t tunnel to the remote peer
662 * @param ctn identifier of this channel in the tunnel
663 * @param port desired local port
664 * @param options options for the channel
665 * @return handle to the new channel
667 struct CadetChannel *
668 GCCH_channel_incoming_new (struct CadetTunnel *t,
669 struct GNUNET_CADET_ChannelTunnelNumber ctn,
670 const struct GNUNET_HashCode *port,
673 struct CadetChannel *ch;
674 struct CadetClient *c;
676 ch = GNUNET_new (struct CadetChannel);
680 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
681 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
682 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
683 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
684 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
685 GNUNET_STATISTICS_update (stats,
690 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
694 /* port closed, wait for it to possibly open */
695 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
698 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
699 ch->retry_control_task
700 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
703 LOG (GNUNET_ERROR_TYPE_DEBUG,
704 "Created loose incoming channel to port %s from peer %s\n",
705 GNUNET_h2s (&ch->port),
706 GCP_2s (GCT_get_destination (ch->t)));
713 GNUNET_STATISTICS_update (stats,
722 * Function called once the tunnel confirms that we sent the
723 * ACK message. Just remembers it was sent, we do not expect
726 * @param cls our `struct CadetChannel`.
729 send_ack_cb (void *cls)
731 struct CadetChannel *ch = cls;
733 GNUNET_assert (NULL != ch->last_control_qe);
734 ch->last_control_qe = NULL;
739 * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
741 * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
744 send_channel_data_ack (struct CadetChannel *ch)
746 struct GNUNET_CADET_ChannelDataAckMessage msg;
748 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
749 msg.header.size = htons (sizeof (msg));
751 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
752 msg.futures = GNUNET_htonll (ch->mid_futures);
753 if (NULL != ch->last_control_qe)
754 GCT_send_cancel (ch->last_control_qe);
755 ch->last_control_qe = GCT_send (ch->t,
763 * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
766 * @param cls the `struct CadetChannel`
769 send_open_ack (void *cls)
771 struct CadetChannel *ch = cls;
772 struct GNUNET_CADET_ChannelManageMessage msg;
774 LOG (GNUNET_ERROR_TYPE_DEBUG,
775 "Sending CHANNEL_OPEN_ACK on %s\n",
777 ch->retry_control_task = NULL;
778 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
779 msg.header.size = htons (sizeof (msg));
780 msg.reserved = htonl (0);
782 if (NULL != ch->last_control_qe)
783 GCT_send_cancel (ch->last_control_qe);
784 ch->last_control_qe = GCT_send (ch->t,
792 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
793 * this channel. If the binding was successful, (re)transmit the
794 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
796 * @param ch channel that got the duplicate open
799 GCCH_handle_duplicate_open (struct CadetChannel *ch)
801 if (NULL == ch->dest)
803 LOG (GNUNET_ERROR_TYPE_DEBUG,
804 "Ignoring duplicate channel OPEN on %s: port is closed\n",
808 if (NULL != ch->retry_control_task)
810 LOG (GNUNET_ERROR_TYPE_DEBUG,
811 "Ignoring duplicate channel OPEN on %s: control message is pending\n",
815 LOG (GNUNET_ERROR_TYPE_DEBUG,
816 "Retransmitting OPEN_ACK on %s\n",
818 ch->retry_control_task
819 = GNUNET_SCHEDULER_add_now (&send_open_ack,
825 * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
827 * @param ch channel the ack is for
828 * @param to_owner #GNUNET_YES to send to owner,
829 * #GNUNET_NO to send to dest
832 send_ack_to_client (struct CadetChannel *ch,
835 struct GNUNET_MQ_Envelope *env;
836 struct GNUNET_CADET_LocalAck *ack;
837 struct CadetChannelClient *ccc;
839 env = GNUNET_MQ_msg (ack,
840 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
841 ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
843 LOG (GNUNET_ERROR_TYPE_DEBUG,
844 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n",
846 (GNUNET_YES == to_owner) ? "owner" : "dest",
847 ntohl (ack->ccn.channel_of_client));
848 GSC_send_to_client (ccc->c,
854 * A client is bound to the port that we have a channel
855 * open to. Send the acknowledgement for the connection
856 * request and establish the link with the client.
858 * @param ch open incoming channel
859 * @param c client listening on the respective port
862 GCCH_bind (struct CadetChannel *ch,
863 struct CadetClient *c)
866 struct CadetChannelClient *cccd;
868 LOG (GNUNET_ERROR_TYPE_DEBUG,
869 "Binding %s from %s to port %s of %s\n",
872 GNUNET_h2s (&ch->port),
874 if (NULL != ch->retry_control_task)
876 /* there might be a timeout task here */
877 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
878 ch->retry_control_task = NULL;
882 options |= GNUNET_CADET_OPTION_NOBUFFER;
884 options |= GNUNET_CADET_OPTION_RELIABLE;
885 if (ch->out_of_order)
886 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
887 cccd = GNUNET_new (struct CadetChannelClient);
890 cccd->client_ready = GNUNET_YES;
891 cccd->ccn = GSC_bind (c,
893 (GNUNET_YES == ch->is_loopback)
894 ? GCP_get (&my_full_id,
896 : GCT_get_destination (ch->t),
899 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
900 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
901 ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
902 if (GNUNET_YES == ch->is_loopback)
904 ch->state = CADET_CHANNEL_OPEN_SENT;
905 GCCH_handle_channel_open_ack (ch);
909 /* notify other peer that we accepted the connection */
910 ch->retry_control_task
911 = GNUNET_SCHEDULER_add_now (&send_open_ack,
914 /* give client it's initial supply of ACKs */
915 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
916 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
917 for (unsigned int i=0;i<ch->max_pending_messages;i++)
918 send_ack_to_client (ch,
924 * Destroy locally created channel. Called by the local client, so no
925 * need to tell the client.
927 * @param ch channel to destroy
928 * @param c client that caused the destruction
929 * @param ccn client number of the client @a c
932 GCCH_channel_local_destroy (struct CadetChannel *ch,
933 struct CadetClient *c,
934 struct GNUNET_CADET_ClientChannelNumber ccn)
936 LOG (GNUNET_ERROR_TYPE_DEBUG,
937 "%s asks for destruction of %s\n",
940 GNUNET_assert (NULL != c);
941 if ( (NULL != ch->owner) &&
942 (c == ch->owner->c) &&
943 (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
945 free_channel_client (ch->owner);
948 else if ( (NULL != ch->dest) &&
949 (c == ch->dest->c) &&
950 (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
952 free_channel_client (ch->dest);
960 if (GNUNET_YES == ch->destroy)
962 /* other end already destroyed, with the local client gone, no need
963 to finish transmissions, just destroy immediately. */
964 channel_destroy (ch);
967 if ( (NULL != ch->head_sent) ||
968 (NULL != ch->owner) ||
971 /* Wait for other end to destroy us as well,
972 and otherwise allow send queue to be transmitted first */
973 ch->destroy = GNUNET_YES;
976 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
977 if (CADET_CHANNEL_NEW != ch->state)
978 GCT_send_channel_destroy (ch->t,
980 /* Nothing left to do, just finish destruction */
981 channel_destroy (ch);
986 * We got an acknowledgement for the creation of the channel
987 * (the port is open on the other side). Begin transmissions.
989 * @param ch channel to destroy
992 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
996 case CADET_CHANNEL_NEW:
997 /* this should be impossible */
1000 case CADET_CHANNEL_OPEN_SENT:
1001 if (NULL == ch->owner)
1003 /* We're not the owner, wrong direction! */
1004 GNUNET_break_op (0);
1007 LOG (GNUNET_ERROR_TYPE_DEBUG,
1008 "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1010 if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1012 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1013 ch->retry_control_task = NULL;
1015 ch->state = CADET_CHANNEL_READY;
1016 /* On first connect, send client as many ACKs as we allow messages
1018 for (unsigned int i=0;i<ch->max_pending_messages;i++)
1019 send_ack_to_client (ch,
1022 case CADET_CHANNEL_READY:
1023 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1024 LOG (GNUNET_ERROR_TYPE_DEBUG,
1025 "Received duplicate channel OPEN_ACK for %s\n",
1027 GNUNET_STATISTICS_update (stats,
1028 "# duplicate CREATE_ACKs",
1037 * Test if element @a e1 comes before element @a e2.
1039 * TODO: use opportunity to create generic list insertion sort
1040 * logic in container!
1042 * @param cls closure, our `struct CadetChannel`
1043 * @param e1 an element of to sort
1044 * @param e2 another element to sort
1045 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1048 is_before (void *cls,
1052 struct CadetOutOfOrderMessage *m1 = e1;
1053 struct CadetOutOfOrderMessage *m2 = e2;
1054 uint32_t v1 = ntohl (m1->mid.mid);
1055 uint32_t v2 = ntohl (m2->mid.mid);
1059 if (delta > (uint32_t) INT_MAX)
1061 /* in overflow range, we can safely assume we wrapped around */
1072 * We got payload data for a channel. Pass it on to the client
1073 * and send an ACK to the other end (once flow control allows it!)
1075 * @param ch channel that got data
1076 * @param msg message that was received
1079 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1080 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1082 struct GNUNET_MQ_Envelope *env;
1083 struct GNUNET_CADET_LocalData *ld;
1084 struct CadetChannelClient *ccc;
1085 struct CadetOutOfOrderMessage *com;
1086 size_t payload_size;
1088 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1089 payload_size = ntohs (msg->header.size) - sizeof (*msg);
1090 env = GNUNET_MQ_msg_extra (ld,
1092 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1093 ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1094 GNUNET_memcpy (&ld[1],
1097 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1098 if ( (GNUNET_YES == ccc->client_ready) &&
1099 ( (GNUNET_YES == ch->out_of_order) ||
1100 (msg->mid.mid == ch->mid_recv.mid) ) )
1102 LOG (GNUNET_ERROR_TYPE_DEBUG,
1103 "Giving %u bytes of payload from %s to client %s\n",
1104 (unsigned int) payload_size,
1107 ccc->client_ready = GNUNET_NO;
1108 GSC_send_to_client (ccc->c,
1110 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1111 ch->mid_futures >>= 1;
1115 /* FIXME-SECURITY: if the element is WAY too far ahead,
1116 drop it (can't buffer too much!) */
1117 LOG (GNUNET_ERROR_TYPE_DEBUG,
1118 "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n",
1119 (GNUNET_YES == ccc->client_ready)
1121 : "client-not-ready",
1122 (unsigned int) payload_size,
1124 ntohl (msg->mid.mid),
1125 ntohl (ch->mid_recv.mid));
1127 com = GNUNET_new (struct CadetOutOfOrderMessage);
1128 com->mid = msg->mid;
1130 /* sort into list ordered by "is_before" */
1131 if ( (NULL == ccc->head_recv) ||
1132 (GNUNET_YES == is_before (ch,
1136 GNUNET_CONTAINER_DLL_insert (ccc->head_recv,
1142 struct CadetOutOfOrderMessage *pos;
1144 for (pos = ccc->head_recv;
1155 GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv,
1159 GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv,
1169 * We got an acknowledgement for payload data for a channel.
1170 * Possibly resume transmissions.
1172 * @param ch channel that got the ack
1173 * @param ack details about what was received
1176 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1177 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1179 struct CadetReliableMessage *crm;
1181 GNUNET_break (GNUNET_NO == ch->is_loopback);
1182 if (GNUNET_NO == ch->reliable)
1184 /* not expecting ACKs on unreliable channel, odd */
1185 GNUNET_break_op (0);
1188 for (crm = ch->head_sent;
1191 if (ack->mid.mid == crm->data_message->mid.mid)
1195 /* ACK for message we already dropped, might have been a
1196 duplicate ACK? Ignore. */
1197 LOG (GNUNET_ERROR_TYPE_DEBUG,
1198 "Duplicate DATA_ACK on %s, ignoring\n",
1200 GNUNET_STATISTICS_update (stats,
1201 "# duplicate DATA_ACKs",
1206 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1209 GNUNET_free (crm->data_message);
1211 ch->pending_messages--;
1212 send_ack_to_client (ch,
1216 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1217 LOG (GNUNET_ERROR_TYPE_DEBUG,
1218 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1220 (unsigned int) ntohl (ack->mid.mid),
1221 ch->pending_messages);
1222 send_ack_to_client (ch,
1230 * Destroy channel, based on the other peer closing the
1231 * connection. Also needs to remove this channel from
1234 * @param ch channel to destroy
1237 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1239 struct CadetChannelClient *ccc;
1241 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1242 LOG (GNUNET_ERROR_TYPE_DEBUG,
1243 "Received remote channel DESTROY for %s\n",
1245 if (GNUNET_YES == ch->destroy)
1247 /* Local client already gone, this is instant-death. */
1248 channel_destroy (ch);
1251 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1252 if (NULL != ccc->head_recv)
1254 LOG (GNUNET_ERROR_TYPE_WARNING,
1255 "Lost end of transmission due to remote shutdown on %s\n",
1257 /* FIXME: change API to notify client about truncated transmission! */
1259 ch->destroy = GNUNET_YES;
1260 GSC_handle_remote_channel_destroy (ccc->c,
1263 channel_destroy (ch);
1268 * Function called once the tunnel has sent one of our messages.
1269 * If the message is unreliable, simply frees the `crm`. If the
1270 * message was reliable, calculate retransmission time and
1271 * wait for ACK (or retransmit).
1273 * @param cls the `struct CadetReliableMessage` that was sent
1276 data_sent_cb (void *cls);
1280 * We need to retry a transmission, the last one took too long to
1283 * @param cls the `struct CadetChannel` where we need to retransmit
1286 retry_transmission (void *cls)
1288 struct CadetChannel *ch = cls;
1289 struct CadetReliableMessage *crm = ch->head_sent;
1291 ch->retry_data_task = NULL;
1292 GNUNET_assert (NULL == crm->qe);
1293 crm->qe = GCT_send (ch->t,
1294 &crm->data_message->header,
1301 * Function called once the tunnel has sent one of our messages.
1302 * If the message is unreliable, simply frees the `crm`. If the
1303 * message was reliable, calculate retransmission time and
1304 * wait for ACK (or retransmit).
1306 * @param cls the `struct CadetReliableMessage` that was sent
1309 data_sent_cb (void *cls)
1311 struct CadetReliableMessage *crm = cls;
1312 struct CadetChannel *ch = crm->ch;
1313 struct CadetReliableMessage *off;
1315 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1317 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1320 if (GNUNET_NO == ch->reliable)
1323 ch->pending_messages--;
1324 send_ack_to_client (ch,
1330 if (0 == crm->retry_delay.rel_value_us)
1331 crm->retry_delay = ch->expected_delay;
1332 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1334 /* find position for re-insertion into the DLL */
1335 if ( (NULL == ch->head_sent) ||
1336 (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1338 /* insert at HEAD, also (re)schedule retry task! */
1339 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1342 if (NULL != ch->retry_data_task)
1343 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1345 = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1346 &retry_transmission,
1350 for (off = ch->head_sent; NULL != off; off = off->next)
1351 if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1355 /* insert at tail */
1356 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1362 /* insert before off */
1363 GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1372 * Handle data given by a client.
1374 * Check whether the client is allowed to send in this tunnel, save if
1375 * channel is reliable and send an ACK to the client if there is still
1376 * buffer space in the tunnel.
1378 * @param ch Channel.
1379 * @param sender_ccn ccn of the sender
1380 * @param buf payload to transmit.
1381 * @param buf_len number of bytes in @a buf
1382 * @return #GNUNET_OK if everything goes well,
1383 * #GNUNET_SYSERR in case of an error.
1386 GCCH_handle_local_data (struct CadetChannel *ch,
1387 struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1391 struct CadetReliableMessage *crm;
1393 if (ch->pending_messages > ch->max_pending_messages)
1396 return GNUNET_SYSERR;
1398 ch->pending_messages++;
1400 if (GNUNET_YES == ch->is_loopback)
1402 struct CadetChannelClient *receiver;
1403 struct GNUNET_MQ_Envelope *env;
1404 struct GNUNET_CADET_LocalData *ld;
1407 env = GNUNET_MQ_msg_extra (ld,
1409 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1410 if (sender_ccn.channel_of_client ==
1411 ch->owner->ccn.channel_of_client)
1413 receiver = ch->dest;
1414 to_owner = GNUNET_NO;
1418 GNUNET_assert (sender_ccn.channel_of_client ==
1419 ch->dest->ccn.channel_of_client);
1420 receiver = ch->owner;
1421 to_owner = GNUNET_YES;
1423 ld->ccn = receiver->ccn;
1424 GNUNET_memcpy (&ld[1],
1427 /* FIXME: this does not provide for flow control! */
1428 GSC_send_to_client (receiver->c,
1430 send_ack_to_client (ch,
1435 /* Everything is correct, send the message. */
1436 crm = GNUNET_malloc (sizeof (*crm));
1438 crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1440 crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1441 crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1442 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1443 crm->data_message->mid = ch->mid_send;
1444 crm->data_message->ctn = ch->ctn;
1445 GNUNET_memcpy (&crm->data_message[1],
1448 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1451 LOG (GNUNET_ERROR_TYPE_DEBUG,
1452 "Sending %u bytes from local client to %s\n",
1455 crm->qe = GCT_send (ch->t,
1456 &crm->data_message->header,
1464 * Handle ACK from client on local channel. Means the client is ready
1465 * for more data, see if we have any for it.
1467 * @param ch channel to destroy
1468 * @param client_ccn ccn of the client sending the ack
1471 GCCH_handle_local_ack (struct CadetChannel *ch,
1472 struct GNUNET_CADET_ClientChannelNumber client_ccn)
1474 struct CadetChannelClient *ccc;
1475 struct CadetOutOfOrderMessage *com;
1477 if ( (NULL != ch->owner) &&
1478 (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1480 else if ( (NULL != ch->dest) &&
1481 (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1485 ccc->client_ready = GNUNET_YES;
1486 LOG (GNUNET_ERROR_TYPE_DEBUG,
1487 "Got LOCAL_ACK, client ready to receive more data!\n");
1488 com = ccc->head_recv;
1490 return; /* none pending */
1491 if ( (com->mid.mid != ch->mid_recv.mid) &&
1492 (GNUNET_NO == ch->out_of_order) )
1493 return; /* missing next one in-order */
1495 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1496 "Passing payload message to client on %s\n",
1499 /* all good, pass next message to client */
1500 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1503 /* FIXME: if unreliable, this is not aggressive
1504 enough, as it would be OK to have lost some! */
1505 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1506 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1507 ccc->client_ready = GNUNET_NO;
1508 GSC_send_to_client (ccc->c,
1511 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1512 (GNUNET_YES == ch->reliable) )
1514 /* The next 15 messages were also already received (0xFF), this
1515 suggests that the sender may be blocked on flow control
1516 urgently waiting for an ACK from us. (As we have an inherent
1517 maximum of 64 bits, and 15 is getting too close for comfort.)
1518 So we should send one now. */
1519 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1520 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1522 if (GNUNET_YES == ch->reliable)
1523 send_channel_data_ack (ch);
1526 if (NULL != ccc->head_recv)
1528 if (GNUNET_NO == ch->destroy)
1530 GCT_send_channel_destroy (ch->t,
1532 channel_destroy (ch);
1536 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1542 * @param ch Channel.
1543 * @param level Debug level to use.
1546 GCCH_debug (struct CadetChannel *ch,
1547 enum GNUNET_ErrorType level)
1551 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1553 __FILE__, __FUNCTION__, __LINE__);
1559 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1567 if (NULL != ch->owner)
1570 "CHN origin %s ready %s local-id: %u\n",
1571 GSC_2s (ch->owner->c),
1572 ch->owner->client_ready ? "YES" : "NO",
1573 ntohl (ch->owner->ccn.channel_of_client));
1575 if (NULL != ch->dest)
1578 "CHN destination %s ready %s local-id: %u\n",
1579 GSC_2s (ch->dest->c),
1580 ch->dest->client_ready ? "YES" : "NO",
1581 ntohl (ch->dest->ccn.channel_of_client));
1584 "CHN Message IDs recv: %d (%LLX), send: %d\n",
1585 ntohl (ch->mid_recv.mid),
1586 (unsigned long long) ch->mid_futures,
1587 ntohl (ch->mid_send.mid));
1592 /* end of gnunet-service-cadet-new_channel.c */