3 This file is part of GNUnet.
4 Copyright (C) 2001-2017 GNUnet e.V.
6 GNUnet is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 3, or (at your
9 option) any later version.
11 GNUnet is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with GNUnet; see the file COPYING. If not, write to the
18 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19 Boston, MA 02110-1301, USA.
22 * @file cadet/gnunet-service-cadet-new_channel.c
23 * @brief logical links between CADET clients
24 * @author Bartlomiej Polot
25 * @author Christian Grothoff
28 * - introduce shutdown so we can have half-closed channels, modify
29 * destroy to include MID to have FIN-ACK equivalents, etc.
30 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31 * - check that '0xFFULL' really is sufficient for flow control!
32 * - revisit handling of 'unreliable' traffic!
33 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34 * - figure out flow control without ACKs (unreliable traffic!)
37 #include "gnunet_util_lib.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50 * How long do we initially wait before retransmitting?
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55 * How long do we wait before dropping state about incoming
56 * connection to closed port?
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
62 * All the states a connection can be in.
64 enum CadetChannelState
67 * Uninitialized status, should never appear in operation.
72 * Connection create message sent, waiting for ACK.
74 CADET_CHANNEL_OPEN_SENT,
77 * Connection confirmed, ready to carry traffic.
84 * Info needed to retry a message in case it gets lost.
85 * Note that we DO use this structure also for unreliable
88 struct CadetReliableMessage
91 * Double linked list, FIFO style
93 struct CadetReliableMessage *next;
96 * Double linked list, FIFO style
98 struct CadetReliableMessage *prev;
101 * Which channel is this message in?
103 struct CadetChannel *ch;
106 * Entry in the tunnels queue for this message, NULL if it has left
107 * the tunnel. Used to cancel transmission in case we receive an
110 struct CadetTunnelQueueEntry *qe;
113 * How soon should we retry if we fail to get an ACK?
114 * Messages in the queue are sorted by this value.
116 struct GNUNET_TIME_Absolute next_retry;
119 * How long do we wait for an ACK after transmission?
120 * Use for the back-off calculation.
122 struct GNUNET_TIME_Relative retry_delay;
125 * Data message we are trying to send.
127 struct GNUNET_CADET_ChannelAppDataMessage data_message;
129 /* followed by variable-size payload */
134 * List of received out-of-order data messages.
136 struct CadetOutOfOrderMessage
139 * Double linked list, FIFO style
141 struct CadetOutOfOrderMessage *next;
144 * Double linked list, FIFO style
146 struct CadetOutOfOrderMessage *prev;
149 * ID of the message (messages up to this point needed
150 * before we give this one to the client).
152 struct ChannelMessageIdentifier mid;
155 * The envelope with the payload of the out-of-order message
157 struct GNUNET_MQ_Envelope *env;
163 * Struct containing all information regarding a channel to a remote client.
168 * Tunnel this channel is in.
170 struct CadetTunnel *t;
173 * Client owner of the tunnel, if any.
174 * (Used if this channel represends the initiating end of the tunnel.)
176 struct CadetClient *owner;
179 * Client destination of the tunnel, if any.
180 * (Used if this channel represents the listening end of the tunnel.)
182 struct CadetClient *dest;
185 * Last entry in the tunnel's queue relating to control messages
186 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
187 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
188 * transmission in case we receive updated information.
190 struct CadetTunnelQueueEntry *last_control_qe;
193 * Head of DLL of messages sent and not yet ACK'd.
195 struct CadetReliableMessage *head_sent;
198 * Tail of DLL of messages sent and not yet ACK'd.
200 struct CadetReliableMessage *tail_sent;
203 * Head of DLL of messages received out of order or while client was unready.
205 struct CadetOutOfOrderMessage *head_recv;
208 * Tail DLL of messages received out of order or while client was unready.
210 struct CadetOutOfOrderMessage *tail_recv;
213 * Task to resend/poll in case no ACK is received.
215 struct GNUNET_SCHEDULER_Task *retry_control_task;
218 * Task to resend/poll in case no ACK is received.
220 struct GNUNET_SCHEDULER_Task *retry_data_task;
223 * Last time the channel was used
225 struct GNUNET_TIME_Absolute timestamp;
228 * Destination port of the channel.
230 struct GNUNET_HashCode port;
233 * Counter for exponential backoff.
235 struct GNUNET_TIME_Relative retry_time;
238 * How long does it usually take to get an ACK.
240 struct GNUNET_TIME_Relative expected_delay;
243 * Bitfield of already-received messages past @e mid_recv.
245 uint64_t mid_futures;
248 * Next MID expected for incoming traffic.
250 struct ChannelMessageIdentifier mid_recv;
253 * Next MID to use for outgoing traffic.
255 struct ChannelMessageIdentifier mid_send;
258 * Total (reliable) messages pending ACK for this channel.
260 unsigned int pending_messages;
263 * Maximum (reliable) messages pending ACK for this channel
264 * before we throttle the client.
266 unsigned int max_pending_messages;
269 * Number identifying this channel in its tunnel.
271 struct GNUNET_CADET_ChannelTunnelNumber ctn;
274 * Local tunnel number for local client @e owner owning the channel.
275 * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
277 struct GNUNET_CADET_ClientChannelNumber ccn_owner;
280 * Local tunnel number for local client @e dest owning the channel.
281 * (< #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
283 struct GNUNET_CADET_ClientChannelNumber ccn_dest;
288 enum CadetChannelState state;
291 * Can we send data to the client?
296 * Is the tunnel bufferless (minimum latency)?
301 * Is the tunnel reliable?
306 * Is the tunnel out-of-order?
311 * Is this channel a loopback channel, where the destination is us again?
316 * Flag to signal the destruction of the channel. If this is set to
317 * #GNUNET_YES the channel will be destroyed once the queue is
326 * Get the static string for identification of the channel.
330 * @return Static string with the channel IDs.
333 GCCH_2s (const struct CadetChannel *ch)
335 static char buf[128];
337 GNUNET_snprintf (buf,
339 "Channel %s:%s ctn:%X(%X/%X)",
340 GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
341 GNUNET_h2s (&ch->port),
343 ntohl (ch->ccn_owner.channel_of_client),
344 ntohl (ch->ccn_dest.channel_of_client));
350 * Get the channel's public ID.
354 * @return ID used to identify the channel with the remote peer.
356 struct GNUNET_CADET_ChannelTunnelNumber
357 GCCH_get_id (const struct CadetChannel *ch)
364 * Destroy the given channel.
366 * @param ch channel to destroy
369 channel_destroy (struct CadetChannel *ch)
371 struct CadetReliableMessage *crm;
372 struct CadetOutOfOrderMessage *com;
374 while (NULL != (crm = ch->head_sent))
376 GNUNET_assert (ch == crm->ch);
379 GCT_send_cancel (crm->qe);
382 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
387 while (NULL != (com = ch->head_recv))
389 GNUNET_CONTAINER_DLL_remove (ch->head_recv,
392 GNUNET_MQ_discard (com->env);
395 if (NULL != ch->last_control_qe)
397 GCT_send_cancel (ch->last_control_qe);
398 ch->last_control_qe = NULL;
400 if (NULL != ch->retry_data_task)
402 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
403 ch->retry_data_task = NULL;
405 if (NULL != ch->retry_control_task)
407 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
408 ch->retry_control_task = NULL;
410 if (GNUNET_NO == ch->is_loopback)
412 GCT_remove_channel (ch->t,
422 * Send a channel create message.
424 * @param cls Channel for which to send.
427 send_channel_open (void *cls);
431 * Function called once the tunnel confirms that we sent the
432 * create message. Delays for a bit until we retry.
434 * @param cls our `struct CadetChannel`.
437 channel_open_sent_cb (void *cls)
439 struct CadetChannel *ch = cls;
441 GNUNET_assert (NULL != ch->last_control_qe);
442 ch->last_control_qe = NULL;
443 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
444 LOG (GNUNET_ERROR_TYPE_DEBUG,
445 "Sent CHANNEL_OPEN on %s, retrying in %s\n",
447 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
449 ch->retry_control_task
450 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
457 * Send a channel open message.
459 * @param cls Channel for which to send.
462 send_channel_open (void *cls)
464 struct CadetChannel *ch = cls;
465 struct GNUNET_CADET_ChannelOpenMessage msgcc;
468 ch->retry_control_task = NULL;
469 LOG (GNUNET_ERROR_TYPE_DEBUG,
470 "Sending CHANNEL_OPEN message for %s\n",
474 options |= GNUNET_CADET_OPTION_NOBUFFER;
476 options |= GNUNET_CADET_OPTION_RELIABLE;
477 if (ch->out_of_order)
478 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
479 msgcc.header.size = htons (sizeof (msgcc));
480 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
481 msgcc.opt = htonl (options);
482 msgcc.port = ch->port;
484 ch->state = CADET_CHANNEL_OPEN_SENT;
485 ch->last_control_qe = GCT_send (ch->t,
487 &channel_open_sent_cb,
493 * Function called once and only once after a channel was bound
494 * to its tunnel via #GCT_add_channel() is ready for transmission.
495 * Note that this is only the case for channels that this peer
496 * initiates, as for incoming channels we assume that they are
497 * ready for transmission immediately upon receiving the open
498 * message. Used to bootstrap the #GCT_send() process.
500 * @param ch the channel for which the tunnel is now ready
503 GCCH_tunnel_up (struct CadetChannel *ch)
505 GNUNET_assert (NULL == ch->retry_control_task);
506 LOG (GNUNET_ERROR_TYPE_DEBUG,
507 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
509 ch->retry_control_task
510 = GNUNET_SCHEDULER_add_now (&send_channel_open,
516 * Create a new channel.
518 * @param owner local client owning the channel
519 * @param ccn local number of this channel at the @a owner
520 * @param destination peer to which we should build the channel
521 * @param port desired port at @a destination
522 * @param options options for the channel
523 * @return handle to the new channel
525 struct CadetChannel *
526 GCCH_channel_local_new (struct CadetClient *owner,
527 struct GNUNET_CADET_ClientChannelNumber ccn,
528 struct CadetPeer *destination,
529 const struct GNUNET_HashCode *port,
532 struct CadetChannel *ch;
534 ch = GNUNET_new (struct CadetChannel);
535 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
536 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
537 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
538 ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
542 if (0 == memcmp (&my_full_id,
543 GCP_get_id (destination),
544 sizeof (struct GNUNET_PeerIdentity)))
546 ch->is_loopback = GNUNET_YES;
547 ch->dest = GNUNET_CONTAINER_multihashmap_get (open_ports,
549 if (NULL == ch->dest)
551 /* port closed, wait for it to possibly open */
552 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
555 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
556 LOG (GNUNET_ERROR_TYPE_DEBUG,
557 "Created loose incoming loopback channel to port %s\n",
558 GNUNET_h2s (&ch->port));
568 ch->t = GCP_get_tunnel (destination,
570 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
571 ch->ctn = GCT_add_channel (ch->t,
574 GNUNET_STATISTICS_update (stats,
578 LOG (GNUNET_ERROR_TYPE_DEBUG,
579 "Created channel to port %s at peer %s for %s using %s\n",
581 GCP_2s (destination),
583 (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
589 * We had an incoming channel to a port that is closed.
590 * It has not been opened for a while, drop it.
592 * @param cls the channel to drop
595 timeout_closed_cb (void *cls)
597 struct CadetChannel *ch = cls;
599 ch->retry_control_task = NULL;
600 LOG (GNUNET_ERROR_TYPE_DEBUG,
601 "Closing incoming channel to port %s from peer %s due to timeout\n",
602 GNUNET_h2s (&ch->port),
603 GCP_2s (GCT_get_destination (ch->t)));
604 channel_destroy (ch);
609 * Create a new channel based on a request coming in over the network.
611 * @param t tunnel to the remote peer
612 * @param ctn identifier of this channel in the tunnel
613 * @param port desired local port
614 * @param options options for the channel
615 * @return handle to the new channel
617 struct CadetChannel *
618 GCCH_channel_incoming_new (struct CadetTunnel *t,
619 struct GNUNET_CADET_ChannelTunnelNumber ctn,
620 const struct GNUNET_HashCode *port,
623 struct CadetChannel *ch;
624 struct CadetClient *c;
626 ch = GNUNET_new (struct CadetChannel);
630 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
631 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
632 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
633 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
634 ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
635 GNUNET_STATISTICS_update (stats,
640 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
644 /* port closed, wait for it to possibly open */
645 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
648 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
649 ch->retry_control_task
650 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
653 LOG (GNUNET_ERROR_TYPE_DEBUG,
654 "Created loose incoming channel to port %s from peer %s\n",
655 GNUNET_h2s (&ch->port),
656 GCP_2s (GCT_get_destination (ch->t)));
663 GNUNET_STATISTICS_update (stats,
672 * Function called once the tunnel confirms that we sent the
673 * ACK message. Just remembers it was sent, we do not expect
676 * @param cls our `struct CadetChannel`.
679 send_ack_cb (void *cls)
681 struct CadetChannel *ch = cls;
683 GNUNET_assert (NULL != ch->last_control_qe);
684 ch->last_control_qe = NULL;
689 * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
691 * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
694 send_channel_data_ack (struct CadetChannel *ch)
696 struct GNUNET_CADET_ChannelDataAckMessage msg;
698 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
699 msg.header.size = htons (sizeof (msg));
701 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
702 msg.futures = GNUNET_htonll (ch->mid_futures);
703 if (NULL != ch->last_control_qe)
704 GCT_send_cancel (ch->last_control_qe);
705 ch->last_control_qe = GCT_send (ch->t,
713 * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
716 * @param cls the `struct CadetChannel`
719 send_open_ack (void *cls)
721 struct CadetChannel *ch = cls;
722 struct GNUNET_CADET_ChannelManageMessage msg;
724 LOG (GNUNET_ERROR_TYPE_DEBUG,
725 "Sending CHANNEL_OPEN_ACK on channel %s\n",
727 ch->retry_control_task = NULL;
728 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
729 msg.header.size = htons (sizeof (msg));
730 msg.reserved = htonl (0);
732 if (NULL != ch->last_control_qe)
733 GCT_send_cancel (ch->last_control_qe);
734 ch->last_control_qe = GCT_send (ch->t,
742 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
743 * this channel. If the binding was successful, (re)transmit the
744 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
746 * @param ch channel that got the duplicate open
749 GCCH_handle_duplicate_open (struct CadetChannel *ch)
751 if (NULL == ch->dest)
753 LOG (GNUNET_ERROR_TYPE_DEBUG,
754 "Ignoring duplicate channel OPEN on %s: port is closed\n",
758 if (NULL != ch->retry_control_task)
760 LOG (GNUNET_ERROR_TYPE_DEBUG,
761 "Ignoring duplicate channel OPEN on %s: control message is pending\n",
765 LOG (GNUNET_ERROR_TYPE_DEBUG,
766 "Retransmitting OPEN_ACK on channel %s\n",
768 ch->retry_control_task
769 = GNUNET_SCHEDULER_add_now (&send_open_ack,
775 * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL ACK to the client to solicit more messages.
777 * @param ch channel the ack is for
778 * @param c client to send the ACK to
781 send_ack_to_client (struct CadetChannel *ch,
782 struct CadetClient *c)
784 struct GNUNET_MQ_Envelope *env;
785 struct GNUNET_CADET_LocalAck *ack;
787 env = GNUNET_MQ_msg (ack,
788 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
789 ack->ccn = (c == ch->owner) ? ch->ccn_owner : ch->ccn_dest;
790 GSC_send_to_client (c,
796 * A client is bound to the port that we have a channel
797 * open to. Send the acknowledgement for the connection
798 * request and establish the link with the client.
800 * @param ch open incoming channel
801 * @param c client listening on the respective port
804 GCCH_bind (struct CadetChannel *ch,
805 struct CadetClient *c)
807 struct GNUNET_MQ_Envelope *env;
808 struct GNUNET_CADET_LocalChannelCreateMessage *tcm;
811 LOG (GNUNET_ERROR_TYPE_DEBUG,
812 "Binding %s from %s to port %s of %s\n",
815 GNUNET_h2s (&ch->port),
817 if (NULL != ch->retry_control_task)
819 /* there might be a timeout task here */
820 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
821 ch->retry_control_task = NULL;
825 options |= GNUNET_CADET_OPTION_NOBUFFER;
827 options |= GNUNET_CADET_OPTION_RELIABLE;
828 if (ch->out_of_order)
829 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
831 ch->ccn_dest = GSC_bind (c,
833 (GNUNET_YES == ch->is_loopback)
834 ? GCP_get (&my_full_id,
836 : GCT_get_destination (ch->t),
839 ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
840 if (GNUNET_YES == ch->is_loopback)
842 ch->state = CADET_CHANNEL_OPEN_SENT;
843 GCCH_handle_channel_open_ack (ch);
847 /* notify other peer that we accepted the connection */
848 ch->retry_control_task
849 = GNUNET_SCHEDULER_add_now (&send_open_ack,
852 /* give client it's initial supply of ACKs */
853 env = GNUNET_MQ_msg (tcm,
854 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
855 tcm->ccn = ch->ccn_dest;
856 if (GNUNET_YES == ch->is_loopback)
857 tcm->peer = my_full_id;
859 tcm->peer = *GCP_get_id (GCT_get_destination (ch->t));
860 tcm->port = ch->port;
861 tcm->opt = htonl (options);
862 GSC_send_to_client (ch->dest,
864 for (unsigned int i=0;i<ch->max_pending_messages;i++)
865 send_ack_to_client (ch,
871 * Destroy locally created channel. Called by the local client, so no
872 * need to tell the client.
874 * @param ch channel to destroy
875 * @param c client that caused the destruction
878 GCCH_channel_local_destroy (struct CadetChannel *ch,
879 struct CadetClient *c)
881 LOG (GNUNET_ERROR_TYPE_DEBUG,
882 "%s asks for destruction of %s\n",
885 GNUNET_assert (NULL != c);
888 else if (c == ch->dest)
892 if (GNUNET_YES == ch->destroy)
894 /* other end already destroyed, with the local client gone, no need
895 to finish transmissions, just destroy immediately. */
896 channel_destroy (ch);
899 if ( (NULL != ch->head_sent) ||
900 (NULL != ch->owner) ||
903 /* Wait for other end to destroy us as well,
904 and otherwise allow send queue to be transmitted first */
905 ch->destroy = GNUNET_YES;
908 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
909 if (CADET_CHANNEL_NEW != ch->state)
910 GCT_send_channel_destroy (ch->t,
912 /* Nothing left to do, just finish destruction */
913 channel_destroy (ch);
918 * We got an acknowledgement for the creation of the channel
919 * (the port is open on the other side). Begin transmissions.
921 * @param ch channel to destroy
924 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
928 case CADET_CHANNEL_NEW:
929 /* this should be impossible */
932 case CADET_CHANNEL_OPEN_SENT:
933 if (NULL == ch->owner)
935 /* We're not the owner, wrong direction! */
939 LOG (GNUNET_ERROR_TYPE_DEBUG,
940 "Received channel OPEN_ACK for waiting %s, entering READY state\n",
942 if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
944 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
945 ch->retry_control_task = NULL;
947 ch->state = CADET_CHANNEL_READY;
948 /* On first connect, send client as many ACKs as we allow messages
950 for (unsigned int i=0;i<ch->max_pending_messages;i++)
951 send_ack_to_client (ch,
954 case CADET_CHANNEL_READY:
955 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
956 LOG (GNUNET_ERROR_TYPE_DEBUG,
957 "Received duplicate channel OPEN_ACK for %s\n",
959 GNUNET_STATISTICS_update (stats,
960 "# duplicate CREATE_ACKs",
969 * Test if element @a e1 comes before element @a e2.
971 * TODO: use opportunity to create generic list insertion sort
972 * logic in container!
974 * @param cls closure, our `struct CadetChannel`
975 * @param e1 an element of to sort
976 * @param e2 another element to sort
977 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
980 is_before (void *cls,
984 struct CadetOutOfOrderMessage *m1 = e1;
985 struct CadetOutOfOrderMessage *m2 = e2;
986 uint32_t v1 = ntohl (m1->mid.mid);
987 uint32_t v2 = ntohl (m2->mid.mid);
991 if (delta > (uint32_t) INT_MAX)
993 /* in overflow range, we can safely assume we wrapped around */
1004 * We got payload data for a channel. Pass it on to the client
1005 * and send an ACK to the other end (once flow control allows it!)
1007 * @param ch channel that got data
1010 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1011 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1013 struct GNUNET_MQ_Envelope *env;
1014 struct GNUNET_CADET_LocalData *ld;
1015 struct CadetOutOfOrderMessage *com;
1016 size_t payload_size;
1018 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1019 payload_size = ntohs (msg->header.size) - sizeof (*msg);
1020 LOG (GNUNET_ERROR_TYPE_DEBUG,
1021 "Receicved %u bytes of application data on %s\n",
1022 (unsigned int) payload_size,
1024 env = GNUNET_MQ_msg_extra (ld,
1026 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1027 ld->ccn = (NULL == ch->dest) ? ch->ccn_owner : ch->ccn_dest;
1028 GNUNET_memcpy (&ld[1],
1031 if ( (GNUNET_YES == ch->client_ready) &&
1032 ( (GNUNET_YES == ch->out_of_order) ||
1033 (msg->mid.mid == ch->mid_recv.mid) ) )
1035 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1037 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1038 ch->mid_futures >>= 1;
1042 /* FIXME-SECURITY: if the element is WAY too far ahead,
1043 drop it (can't buffer too much!) */
1044 com = GNUNET_new (struct CadetOutOfOrderMessage);
1045 com->mid = msg->mid;
1047 /* sort into list ordered by "is_before" */
1048 if ( (NULL == ch->head_recv) ||
1049 (GNUNET_YES == is_before (ch,
1053 GNUNET_CONTAINER_DLL_insert (ch->head_recv,
1059 struct CadetOutOfOrderMessage *pos;
1061 for (pos = ch->head_recv;
1072 GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
1076 GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
1086 * We got an acknowledgement for payload data for a channel.
1087 * Possibly resume transmissions.
1089 * @param ch channel that got the ack
1090 * @param ack details about what was received
1093 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1094 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1096 struct CadetReliableMessage *crm;
1098 GNUNET_break (GNUNET_NO == ch->is_loopback);
1099 if (GNUNET_NO == ch->reliable)
1101 /* not expecting ACKs on unreliable channel, odd */
1102 GNUNET_break_op (0);
1105 for (crm = ch->head_sent;
1108 if (ack->mid.mid == crm->data_message.mid.mid)
1112 /* ACK for message we already dropped, might have been a
1113 duplicate ACK? Ignore. */
1114 LOG (GNUNET_ERROR_TYPE_DEBUG,
1115 "Duplicate DATA_ACK on %s, ignoring\n",
1117 GNUNET_STATISTICS_update (stats,
1118 "# duplicate DATA_ACKs",
1123 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1126 ch->pending_messages--;
1128 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1129 LOG (GNUNET_ERROR_TYPE_DEBUG,
1130 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1132 (unsigned int) ntohl (ack->mid.mid),
1133 ch->pending_messages);
1134 send_ack_to_client (ch,
1135 (NULL == ch->owner) ? ch->dest : ch->owner);
1140 * Destroy channel, based on the other peer closing the
1141 * connection. Also needs to remove this channel from
1144 * @param ch channel to destroy
1147 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1149 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1150 LOG (GNUNET_ERROR_TYPE_DEBUG,
1151 "Received remote channel DESTROY for %s\n",
1153 if (GNUNET_YES == ch->destroy)
1155 /* Local client already gone, this is instant-death. */
1156 channel_destroy (ch);
1159 if (NULL != ch->head_recv)
1161 LOG (GNUNET_ERROR_TYPE_WARNING,
1162 "Lost end of transmission due to remote shutdown on channel %s\n",
1164 /* FIXME: change API to notify client about truncated transmission! */
1166 ch->destroy = GNUNET_YES;
1167 GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest,
1168 (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest,
1170 channel_destroy (ch);
1175 * Function called once the tunnel has sent one of our messages.
1176 * If the message is unreliable, simply frees the `crm`. If the
1177 * message was reliable, calculate retransmission time and
1178 * wait for ACK (or retransmit).
1180 * @param cls the `struct CadetReliableMessage` that was sent
1183 data_sent_cb (void *cls);
1187 * We need to retry a transmission, the last one took too long to
1190 * @param cls the `struct CadetChannel` where we need to retransmit
1193 retry_transmission (void *cls)
1195 struct CadetChannel *ch = cls;
1196 struct CadetReliableMessage *crm = ch->head_sent;
1198 ch->retry_data_task = NULL;
1199 GNUNET_assert (NULL == crm->qe);
1200 crm->qe = GCT_send (ch->t,
1201 &crm->data_message.header,
1208 * Check if we can now allow the client to transmit, and if so,
1209 * let the client know about it.
1211 * @param ch channel to check
1214 GCCH_check_allow_client (struct CadetChannel *ch)
1216 struct GNUNET_MQ_Envelope *env;
1217 struct GNUNET_CADET_LocalAck *msg;
1219 if (CADET_CHANNEL_READY != ch->state)
1221 /* destination did not yet ACK our CREATE! */
1222 LOG (GNUNET_ERROR_TYPE_DEBUG,
1223 "%s not yet ready, throttling client until ACK.\n",
1227 if (ch->pending_messages > ch->max_pending_messages)
1229 /* Too many messages in queue. */
1230 LOG (GNUNET_ERROR_TYPE_DEBUG,
1231 "Message queue still too long on %s, throttling client until ACK.\n",
1235 if ( (NULL != ch->head_sent) &&
1236 (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
1238 LOG (GNUNET_ERROR_TYPE_DEBUG,
1239 "Gap in ACKs too big on %s, throttling client until ACK.\n",
1244 LOG (GNUNET_ERROR_TYPE_DEBUG,
1245 "Sending local ack to %s client\n",
1247 env = GNUNET_MQ_msg (msg,
1248 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1249 msg->ccn = (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest;
1250 GSC_send_to_client ((NULL != ch->owner) ? ch->owner : ch->dest,
1256 * Function called once the tunnel has sent one of our messages.
1257 * If the message is unreliable, simply frees the `crm`. If the
1258 * message was reliable, calculate retransmission time and
1259 * wait for ACK (or retransmit).
1261 * @param cls the `struct CadetReliableMessage` that was sent
1264 data_sent_cb (void *cls)
1266 struct CadetReliableMessage *crm = cls;
1267 struct CadetChannel *ch = crm->ch;
1268 struct CadetReliableMessage *off;
1271 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1274 if (GNUNET_NO == ch->reliable)
1277 ch->pending_messages--;
1278 GCCH_check_allow_client (ch);
1281 if (0 == crm->retry_delay.rel_value_us)
1282 crm->retry_delay = ch->expected_delay;
1283 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1285 /* find position for re-insertion into the DLL */
1286 if ( (NULL == ch->head_sent) ||
1287 (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1289 /* insert at HEAD, also (re)schedule retry task! */
1290 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1293 if (NULL != ch->retry_data_task)
1294 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1296 = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1297 &retry_transmission,
1301 for (off = ch->head_sent; NULL != off; off = off->next)
1302 if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1306 /* insert at tail */
1307 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1313 /* insert before off */
1314 GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1323 * Handle data given by a client.
1325 * Check whether the client is allowed to send in this tunnel, save if
1326 * channel is reliable and send an ACK to the client if there is still
1327 * buffer space in the tunnel.
1329 * @param ch Channel.
1330 * @param buf payload to transmit.
1331 * @param buf_len number of bytes in @a buf
1332 * @return #GNUNET_OK if everything goes well,
1333 * #GNUNET_SYSERR in case of an error.
1336 GCCH_handle_local_data (struct CadetChannel *ch,
1340 struct CadetReliableMessage *crm;
1342 if (ch->pending_messages > ch->max_pending_messages)
1345 return GNUNET_SYSERR;
1347 ch->pending_messages++;
1349 if (GNUNET_YES == ch->is_loopback)
1351 GNUNET_break (0); // fIXME: not implemented
1352 return GNUNET_SYSERR;
1355 /* Everything is correct, send the message. */
1356 crm = GNUNET_malloc (sizeof (*crm) + buf_len);
1358 crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1359 crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1360 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1361 crm->data_message.mid = ch->mid_send;
1362 crm->data_message.ctn = ch->ctn;
1363 GNUNET_memcpy (&crm[1],
1366 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1369 LOG (GNUNET_ERROR_TYPE_DEBUG,
1370 "Sending %u bytes from local client to %s\n",
1373 crm->qe = GCT_send (ch->t,
1374 &crm->data_message.header,
1377 GCCH_check_allow_client (ch);
1383 * Try to deliver messages to the local client, if it is ready for more.
1385 * @param ch channel to process
1388 send_client_buffered_data (struct CadetChannel *ch)
1390 struct CadetOutOfOrderMessage *com;
1392 if (GNUNET_NO == ch->client_ready)
1393 return; /* client not ready */
1394 com = ch->head_recv;
1396 return; /* none pending */
1397 if ( (com->mid.mid != ch->mid_recv.mid) &&
1398 (GNUNET_NO == ch->out_of_order) )
1399 return; /* missing next one in-order */
1401 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1402 "Passing payload message to client on %s\n",
1405 /* all good, pass next message to client */
1406 GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1409 /* FIXME: if unreliable, this is not aggressive
1410 enough, as it would be OK to have lost some! */
1411 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1412 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1413 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1416 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1417 (GNUNET_YES == ch->reliable) )
1419 /* The next 15 messages were also already received (0xFF), this
1420 suggests that the sender may be blocked on flow control
1421 urgently waiting for an ACK from us. (As we have an inherent
1422 maximum of 64 bits, and 15 is getting too close for comfort.)
1423 So we should send one now. */
1424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1427 if (GNUNET_YES == ch->reliable)
1428 send_channel_data_ack (ch);
1431 if (NULL != ch->head_recv)
1433 if (GNUNET_NO == ch->destroy)
1435 GCT_send_channel_destroy (ch->t,
1437 channel_destroy (ch);
1442 * Handle ACK from client on local channel.
1444 * @param ch channel to destroy
1447 GCCH_handle_local_ack (struct CadetChannel *ch)
1449 ch->client_ready = GNUNET_YES;
1450 send_client_buffered_data (ch);
1454 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1460 * @param ch Channel.
1461 * @param level Debug level to use.
1464 GCCH_debug (struct CadetChannel *ch,
1465 enum GNUNET_ErrorType level)
1469 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1471 __FILE__, __FUNCTION__, __LINE__);
1477 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1485 if (NULL != ch->owner)
1488 "CHN origin %s ready %s local-id: %u\n",
1490 ch->client_ready ? "YES" : "NO",
1491 ntohl (ch->ccn_owner.channel_of_client));
1493 if (NULL != ch->dest)
1496 "CHN destination %s ready %s local-id: %u\n",
1498 ch->client_ready ? "YES" : "NO",
1499 ntohl (ch->ccn_dest.channel_of_client));
1502 "CHN Message IDs recv: %d (%LLX), send: %d\n",
1503 ntohl (ch->mid_recv.mid),
1504 (unsigned long long) ch->mid_futures,
1505 ntohl (ch->mid_send.mid));
1510 /* end of gnunet-service-cadet-new_channel.c */