3 This file is part of GNUnet.
4 Copyright (C) 2001-2017 GNUnet e.V.
6 GNUnet is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 3, or (at your
9 option) any later version.
11 GNUnet is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with GNUnet; see the file COPYING. If not, write to the
18 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19 Boston, MA 02110-1301, USA.
22 * @file cadet/gnunet-service-cadet-new_channel.c
23 * @brief logical links between CADET clients
24 * @author Bartlomiej Polot
25 * @author Christian Grothoff
28 * - introduce shutdown so we can have half-closed channels, modify
29 * destroy to include MID to have FIN-ACK equivalents, etc.
30 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31 * - check that '0xFFULL' really is sufficient for flow control!
32 * - revisit handling of 'unreliable' traffic!
33 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34 * - figure out flow control without ACKs (unreliable traffic!)
37 #include "gnunet_util_lib.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50 * How long do we initially wait before retransmitting?
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55 * How long do we wait before dropping state about incoming
56 * connection to closed port?
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
62 * All the states a connection can be in.
64 enum CadetChannelState
67 * Uninitialized status, should never appear in operation.
72 * Connection create message sent, waiting for ACK.
74 CADET_CHANNEL_OPEN_SENT,
77 * Connection confirmed, ready to carry traffic.
84 * Info needed to retry a message in case it gets lost.
85 * Note that we DO use this structure also for unreliable
88 struct CadetReliableMessage
91 * Double linked list, FIFO style
93 struct CadetReliableMessage *next;
96 * Double linked list, FIFO style
98 struct CadetReliableMessage *prev;
101 * Which channel is this message in?
103 struct CadetChannel *ch;
106 * Entry in the tunnels queue for this message, NULL if it has left
107 * the tunnel. Used to cancel transmission in case we receive an
110 struct CadetTunnelQueueEntry *qe;
113 * How soon should we retry if we fail to get an ACK?
114 * Messages in the queue are sorted by this value.
116 struct GNUNET_TIME_Absolute next_retry;
119 * How long do we wait for an ACK after transmission?
120 * Use for the back-off calculation.
122 struct GNUNET_TIME_Relative retry_delay;
125 * Data message we are trying to send.
127 struct GNUNET_CADET_ChannelAppDataMessage data_message;
129 /* followed by variable-size payload */
134 * List of received out-of-order data messages.
136 struct CadetOutOfOrderMessage
139 * Double linked list, FIFO style
141 struct CadetOutOfOrderMessage *next;
144 * Double linked list, FIFO style
146 struct CadetOutOfOrderMessage *prev;
149 * ID of the message (messages up to this point needed
150 * before we give this one to the client).
152 struct ChannelMessageIdentifier mid;
155 * The envelope with the payload of the out-of-order message
157 struct GNUNET_MQ_Envelope *env;
163 * Struct containing all information regarding a channel to a remote client.
168 * Tunnel this channel is in.
170 struct CadetTunnel *t;
173 * Client owner of the tunnel, if any.
174 * (Used if this channel represends the initiating end of the tunnel.)
176 struct CadetClient *owner;
179 * Client destination of the tunnel, if any.
180 * (Used if this channel represents the listening end of the tunnel.)
182 struct CadetClient *dest;
185 * Last entry in the tunnel's queue relating to control messages
186 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
187 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
188 * transmission in case we receive updated information.
190 struct CadetTunnelQueueEntry *last_control_qe;
193 * Head of DLL of messages sent and not yet ACK'd.
195 struct CadetReliableMessage *head_sent;
198 * Tail of DLL of messages sent and not yet ACK'd.
200 struct CadetReliableMessage *tail_sent;
203 * Head of DLL of messages received out of order or while client was unready.
205 struct CadetOutOfOrderMessage *head_recv;
208 * Tail DLL of messages received out of order or while client was unready.
210 struct CadetOutOfOrderMessage *tail_recv;
213 * Task to resend/poll in case no ACK is received.
215 struct GNUNET_SCHEDULER_Task *retry_control_task;
218 * Task to resend/poll in case no ACK is received.
220 struct GNUNET_SCHEDULER_Task *retry_data_task;
223 * Last time the channel was used
225 struct GNUNET_TIME_Absolute timestamp;
228 * Destination port of the channel.
230 struct GNUNET_HashCode port;
233 * Counter for exponential backoff.
235 struct GNUNET_TIME_Relative retry_time;
238 * How long does it usually take to get an ACK.
240 struct GNUNET_TIME_Relative expected_delay;
243 * Bitfield of already-received messages past @e mid_recv.
245 uint64_t mid_futures;
248 * Next MID expected for incoming traffic.
250 struct ChannelMessageIdentifier mid_recv;
253 * Next MID to use for outgoing traffic.
255 struct ChannelMessageIdentifier mid_send;
258 * Total (reliable) messages pending ACK for this channel.
260 unsigned int pending_messages;
263 * Maximum (reliable) messages pending ACK for this channel
264 * before we throttle the client.
266 unsigned int max_pending_messages;
269 * Number identifying this channel in its tunnel.
271 struct GNUNET_CADET_ChannelTunnelNumber ctn;
274 * Local tunnel number for local client owning the channel.
275 * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
277 struct GNUNET_CADET_ClientChannelNumber ccn;
282 enum CadetChannelState state;
285 * Can we send data to the client?
290 * Can the client send data to us?
295 * Is the tunnel bufferless (minimum latency)?
300 * Is the tunnel reliable?
305 * Is the tunnel out-of-order?
310 * Flag to signal the destruction of the channel. If this is set to
311 * #GNUNET_YES the channel will be destroyed once the queue is
320 * Get the static string for identification of the channel.
324 * @return Static string with the channel IDs.
327 GCCH_2s (const struct CadetChannel *ch)
329 static char buf[128];
331 GNUNET_snprintf (buf,
333 "Channel %s:%s ctn:%X(%X)",
334 GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
335 GNUNET_h2s (&ch->port),
337 ntohl (ch->ccn.channel_of_client));
343 * Get the channel's public ID.
347 * @return ID used to identify the channel with the remote peer.
349 struct GNUNET_CADET_ChannelTunnelNumber
350 GCCH_get_id (const struct CadetChannel *ch)
357 * Destroy the given channel.
359 * @param ch channel to destroy
362 channel_destroy (struct CadetChannel *ch)
364 struct CadetReliableMessage *crm;
365 struct CadetOutOfOrderMessage *com;
367 while (NULL != (crm = ch->head_sent))
369 GNUNET_assert (ch == crm->ch);
372 GCT_send_cancel (crm->qe);
375 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
380 while (NULL != (com = ch->head_recv))
382 GNUNET_CONTAINER_DLL_remove (ch->head_recv,
385 GNUNET_MQ_discard (com->env);
388 if (NULL != ch->last_control_qe)
390 GCT_send_cancel (ch->last_control_qe);
391 ch->last_control_qe = NULL;
393 if (NULL != ch->retry_data_task)
395 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
396 ch->retry_data_task = NULL;
398 if (NULL != ch->retry_control_task)
400 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
401 ch->retry_control_task = NULL;
403 GCT_remove_channel (ch->t,
411 * Send a channel create message.
413 * @param cls Channel for which to send.
416 send_channel_open (void *cls);
420 * Function called once the tunnel confirms that we sent the
421 * create message. Delays for a bit until we retry.
423 * @param cls our `struct CadetChannel`.
426 channel_open_sent_cb (void *cls)
428 struct CadetChannel *ch = cls;
430 GNUNET_assert (NULL != ch->last_control_qe);
431 ch->last_control_qe = NULL;
432 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
433 LOG (GNUNET_ERROR_TYPE_DEBUG,
434 "Sent CHANNEL_OPEN on %s, retrying in %s\n",
436 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
438 ch->retry_control_task
439 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
446 * Send a channel open message.
448 * @param cls Channel for which to send.
451 send_channel_open (void *cls)
453 struct CadetChannel *ch = cls;
454 struct GNUNET_CADET_ChannelOpenMessage msgcc;
457 ch->retry_control_task = NULL;
458 LOG (GNUNET_ERROR_TYPE_DEBUG,
459 "Sending CHANNEL_OPEN message for %s\n",
463 options |= GNUNET_CADET_OPTION_NOBUFFER;
465 options |= GNUNET_CADET_OPTION_RELIABLE;
466 if (ch->out_of_order)
467 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
468 msgcc.header.size = htons (sizeof (msgcc));
469 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
470 msgcc.opt = htonl (options);
471 msgcc.port = ch->port;
473 ch->state = CADET_CHANNEL_OPEN_SENT;
474 ch->last_control_qe = GCT_send (ch->t,
476 &channel_open_sent_cb,
482 * Function called once and only once after a channel was bound
483 * to its tunnel via #GCT_add_channel() is ready for transmission.
484 * Note that this is only the case for channels that this peer
485 * initiates, as for incoming channels we assume that they are
486 * ready for transmission immediately upon receiving the open
487 * message. Used to bootstrap the #GCT_send() process.
489 * @param ch the channel for which the tunnel is now ready
492 GCCH_tunnel_up (struct CadetChannel *ch)
494 GNUNET_assert (NULL == ch->retry_control_task);
495 LOG (GNUNET_ERROR_TYPE_DEBUG,
496 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
498 ch->retry_control_task
499 = GNUNET_SCHEDULER_add_now (&send_channel_open,
505 * Create a new channel.
507 * @param owner local client owning the channel
508 * @param ccn local number of this channel at the @a owner
509 * @param destination peer to which we should build the channel
510 * @param port desired port at @a destination
511 * @param options options for the channel
512 * @return handle to the new channel
514 struct CadetChannel *
515 GCCH_channel_local_new (struct CadetClient *owner,
516 struct GNUNET_CADET_ClientChannelNumber ccn,
517 struct CadetPeer *destination,
518 const struct GNUNET_HashCode *port,
521 struct CadetChannel *ch;
523 ch = GNUNET_new (struct CadetChannel);
524 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
525 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
526 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
527 ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
531 ch->t = GCP_get_tunnel (destination,
533 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
534 ch->ctn = GCT_add_channel (ch->t,
536 GNUNET_STATISTICS_update (stats,
540 LOG (GNUNET_ERROR_TYPE_DEBUG,
541 "Created channel to port %s at peer %s for %s using %s\n",
543 GCP_2s (destination),
551 * We had an incoming channel to a port that is closed.
552 * It has not been opened for a while, drop it.
554 * @param cls the channel to drop
557 timeout_closed_cb (void *cls)
559 struct CadetChannel *ch = cls;
561 ch->retry_control_task = NULL;
562 LOG (GNUNET_ERROR_TYPE_DEBUG,
563 "Closing incoming channel to port %s from peer %s due to timeout\n",
564 GNUNET_h2s (&ch->port),
565 GCP_2s (GCT_get_destination (ch->t)));
566 channel_destroy (ch);
571 * Create a new channel based on a request coming in over the network.
573 * @param t tunnel to the remote peer
574 * @param ctn identifier of this channel in the tunnel
575 * @param port desired local port
576 * @param options options for the channel
577 * @return handle to the new channel
579 struct CadetChannel *
580 GCCH_channel_incoming_new (struct CadetTunnel *t,
581 struct GNUNET_CADET_ChannelTunnelNumber ctn,
582 const struct GNUNET_HashCode *port,
585 struct CadetChannel *ch;
586 struct CadetClient *c;
588 ch = GNUNET_new (struct CadetChannel);
592 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
593 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
594 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
595 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
596 ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
597 GNUNET_STATISTICS_update (stats,
602 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
606 /* port closed, wait for it to possibly open */
607 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
610 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
611 ch->retry_control_task
612 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
615 LOG (GNUNET_ERROR_TYPE_DEBUG,
616 "Created loose incoming channel to port %s from peer %s\n",
617 GNUNET_h2s (&ch->port),
618 GCP_2s (GCT_get_destination (ch->t)));
625 GNUNET_STATISTICS_update (stats,
634 * Function called once the tunnel confirms that we sent the
635 * ACK message. Just remembers it was sent, we do not expect
638 * @param cls our `struct CadetChannel`.
641 send_ack_cb (void *cls)
643 struct CadetChannel *ch = cls;
645 GNUNET_assert (NULL != ch->last_control_qe);
646 ch->last_control_qe = NULL;
651 * Compute and send the current ACK to the other peer.
653 * @param ch channel to send the ACK for
656 send_channel_data_ack (struct CadetChannel *ch)
658 struct GNUNET_CADET_ChannelDataAckMessage msg;
660 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
661 msg.header.size = htons (sizeof (msg));
663 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
664 msg.futures = GNUNET_htonll (ch->mid_futures);
665 if (NULL != ch->last_control_qe)
666 GCT_send_cancel (ch->last_control_qe);
667 ch->last_control_qe = GCT_send (ch->t,
675 * Send our initial ACK to the client confirming that the
678 * @param cls the `struct CadetChannel`
681 send_open_ack (void *cls)
683 struct CadetChannel *ch = cls;
684 struct GNUNET_CADET_ChannelManageMessage msg;
686 LOG (GNUNET_ERROR_TYPE_DEBUG,
687 "Sending CHANNEL_OPEN_ACK on channel %s\n",
689 ch->retry_control_task = NULL;
690 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
691 msg.header.size = htons (sizeof (msg));
692 msg.reserved = htonl (0);
694 if (NULL != ch->last_control_qe)
695 GCT_send_cancel (ch->last_control_qe);
696 ch->last_control_qe = GCT_send (ch->t,
704 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
705 * this channel. If the binding was successful, (re)transmit the
706 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
708 * @param ch channel that got the duplicate open
711 GCCH_handle_duplicate_open (struct CadetChannel *ch)
713 if (NULL == ch->dest)
715 LOG (GNUNET_ERROR_TYPE_DEBUG,
716 "Ignoring duplicate channel OPEN on %s: port is closed\n",
720 if (NULL != ch->retry_control_task)
722 LOG (GNUNET_ERROR_TYPE_DEBUG,
723 "Ignoring duplicate channel OPEN on %s: control message is pending\n",
727 LOG (GNUNET_ERROR_TYPE_DEBUG,
728 "Retransmitting OPEN_ACK on channel %s\n",
730 ch->retry_control_task
731 = GNUNET_SCHEDULER_add_now (&send_open_ack,
737 * Send a LOCAL ACK to the client to solicit more messages.
739 * @param ch channel the ack is for
740 * @param c client to send the ACK to
743 send_ack_to_client (struct CadetChannel *ch,
744 struct CadetClient *c)
746 struct GNUNET_MQ_Envelope *env;
747 struct GNUNET_CADET_LocalAck *ack;
749 env = GNUNET_MQ_msg (ack,
750 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
752 GSC_send_to_client (c,
758 * A client is bound to the port that we have a channel
759 * open to. Send the acknowledgement for the connection
760 * request and establish the link with the client.
762 * @param ch open incoming channel
763 * @param c client listening on the respective port
766 GCCH_bind (struct CadetChannel *ch,
767 struct CadetClient *c)
769 struct GNUNET_MQ_Envelope *env;
770 struct GNUNET_CADET_LocalChannelCreateMessage *tcm;
773 LOG (GNUNET_ERROR_TYPE_DEBUG,
774 "Binding %s from %s to port %s of %s\n",
777 GNUNET_h2s (&ch->port),
779 if (NULL != ch->retry_control_task)
781 /* there might be a timeout task here */
782 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
783 ch->retry_control_task = NULL;
787 options |= GNUNET_CADET_OPTION_NOBUFFER;
789 options |= GNUNET_CADET_OPTION_RELIABLE;
790 if (ch->out_of_order)
791 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
793 ch->ccn = GSC_bind (c,
795 GCT_get_destination (ch->t),
798 ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
800 /* notify other peer that we accepted the connection */
801 ch->retry_control_task
802 = GNUNET_SCHEDULER_add_now (&send_open_ack,
804 /* give client it's initial supply of ACKs */
805 env = GNUNET_MQ_msg (tcm,
806 GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
808 tcm->peer = *GCP_get_id (GCT_get_destination (ch->t));
809 tcm->port = ch->port;
810 tcm->opt = htonl (options);
811 GSC_send_to_client (ch->dest,
813 for (unsigned int i=0;i<ch->max_pending_messages;i++)
814 send_ack_to_client (ch,
820 * Destroy locally created channel. Called by the
821 * local client, so no need to tell the client.
823 * @param ch channel to destroy
826 GCCH_channel_local_destroy (struct CadetChannel *ch)
828 LOG (GNUNET_ERROR_TYPE_DEBUG,
829 "Local client asks for destruction of %s which it initiated\n",
831 if (GNUNET_YES == ch->destroy)
833 /* other end already destroyed, with the local client gone, no need
834 to finish transmissions, just destroy immediately. */
835 channel_destroy (ch);
838 if (NULL != ch->head_sent)
840 /* allow send queue to train first */
841 ch->destroy = GNUNET_YES;
844 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
845 if (CADET_CHANNEL_NEW != ch->state)
846 GCT_send_channel_destroy (ch->t,
848 /* Now finish our clean up */
849 channel_destroy (ch);
854 * Destroy channel that was incoming. Called by the
855 * local client, so no need to tell the client.
857 * @param ch channel to destroy
860 GCCH_channel_incoming_destroy (struct CadetChannel *ch)
862 LOG (GNUNET_ERROR_TYPE_DEBUG,
863 "Local client asks for destruction of %s which it accepted\n",
865 if (GNUNET_YES == ch->destroy)
867 /* other end already destroyed, with the remote client gone, no need
868 to finish transmissions, just destroy immediately. */
869 channel_destroy (ch);
872 if (NULL != ch->head_recv)
874 /* allow local client to see all data first */
875 ch->destroy = GNUNET_YES;
878 /* Nothing left to do, just finish destruction */
879 GCT_send_channel_destroy (ch->t,
881 channel_destroy (ch);
886 * We got an acknowledgement for the creation of the channel
887 * (the port is open on the other side). Begin transmissions.
889 * @param ch channel to destroy
892 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
896 case CADET_CHANNEL_NEW:
897 /* this should be impossible */
900 case CADET_CHANNEL_OPEN_SENT:
901 if (NULL == ch->owner)
903 /* We're not the owner, wrong direction! */
907 LOG (GNUNET_ERROR_TYPE_DEBUG,
908 "Received channel OPEN_ACK for waiting %s, entering READY state\n",
910 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
911 ch->retry_control_task = NULL;
912 ch->state = CADET_CHANNEL_READY;
913 /* On first connect, send client as many ACKs as we allow messages
915 for (unsigned int i=0;i<ch->max_pending_messages;i++)
916 send_ack_to_client (ch,
919 case CADET_CHANNEL_READY:
920 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
921 LOG (GNUNET_ERROR_TYPE_DEBUG,
922 "Received duplicate channel OPEN_ACK for %s\n",
924 GNUNET_STATISTICS_update (stats,
925 "# duplicate CREATE_ACKs",
934 * Test if element @a e1 comes before element @a e2.
936 * TODO: use opportunity to create generic list insertion sort
937 * logic in container!
939 * @param cls closure, our `struct CadetChannel`
940 * @param e1 an element of to sort
941 * @param e2 another element to sort
942 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
945 is_before (void *cls,
949 struct CadetOutOfOrderMessage *m1 = e1;
950 struct CadetOutOfOrderMessage *m2 = e2;
951 uint32_t v1 = ntohl (m1->mid.mid);
952 uint32_t v2 = ntohl (m2->mid.mid);
956 if (delta > (uint32_t) INT_MAX)
958 /* in overflow range, we can safely assume we wrapped around */
969 * We got payload data for a channel. Pass it on to the client
970 * and send an ACK to the other end (once flow control allows it!)
972 * @param ch channel that got data
975 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
976 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
978 struct GNUNET_MQ_Envelope *env;
979 struct GNUNET_CADET_LocalData *ld;
980 struct CadetOutOfOrderMessage *com;
983 payload_size = ntohs (msg->header.size) - sizeof (*msg);
984 LOG (GNUNET_ERROR_TYPE_DEBUG,
985 "Receicved %u bytes of application data on %s\n",
986 (unsigned int) payload_size,
988 env = GNUNET_MQ_msg_extra (ld,
990 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
992 GNUNET_memcpy (&ld[1],
995 if ( (GNUNET_YES == ch->client_ready) &&
996 ( (GNUNET_YES == ch->out_of_order) ||
997 (msg->mid.mid == ch->mid_recv.mid) ) )
999 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1001 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1002 ch->mid_futures >>= 1;
1006 /* FIXME-SECURITY: if the element is WAY too far ahead,
1007 drop it (can't buffer too much!) */
1008 com = GNUNET_new (struct CadetOutOfOrderMessage);
1009 com->mid = msg->mid;
1011 /* sort into list ordered by "is_before" */
1012 if ( (NULL == ch->head_recv) ||
1013 (GNUNET_YES == is_before (ch,
1017 GNUNET_CONTAINER_DLL_insert (ch->head_recv,
1023 struct CadetOutOfOrderMessage *pos;
1025 for (pos = ch->head_recv;
1036 GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
1040 GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
1050 * We got an acknowledgement for payload data for a channel.
1051 * Possibly resume transmissions.
1053 * @param ch channel that got the ack
1054 * @param ack details about what was received
1057 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1058 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1060 struct CadetReliableMessage *crm;
1062 if (GNUNET_NO == ch->reliable)
1064 /* not expecting ACKs on unreliable channel, odd */
1065 GNUNET_break_op (0);
1068 for (crm = ch->head_sent;
1071 if (ack->mid.mid == crm->data_message.mid.mid)
1075 /* ACK for message we already dropped, might have been a
1076 duplicate ACK? Ignore. */
1077 LOG (GNUNET_ERROR_TYPE_DEBUG,
1078 "Duplicate DATA_ACK on %s, ignoring\n",
1080 GNUNET_STATISTICS_update (stats,
1081 "# duplicate DATA_ACKs",
1086 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1089 ch->pending_messages--;
1091 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1092 LOG (GNUNET_ERROR_TYPE_DEBUG,
1093 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1095 (unsigned int) ntohl (ack->mid.mid),
1096 ch->pending_messages);
1097 send_ack_to_client (ch,
1098 (NULL == ch->owner) ? ch->dest : ch->owner);
1103 * Destroy channel, based on the other peer closing the
1104 * connection. Also needs to remove this channel from
1107 * @param ch channel to destroy
1110 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1112 LOG (GNUNET_ERROR_TYPE_DEBUG,
1113 "Received remote channel DESTROY for %s\n",
1115 ch->destroy = GNUNET_YES;
1116 GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest,
1119 channel_destroy (ch);
1124 * Function called once the tunnel has sent one of our messages.
1125 * If the message is unreliable, simply frees the `crm`. If the
1126 * message was reliable, calculate retransmission time and
1127 * wait for ACK (or retransmit).
1129 * @param cls the `struct CadetReliableMessage` that was sent
1132 data_sent_cb (void *cls);
1136 * We need to retry a transmission, the last one took too long to
1139 * @param cls the `struct CadetChannel` where we need to retransmit
1142 retry_transmission (void *cls)
1144 struct CadetChannel *ch = cls;
1145 struct CadetReliableMessage *crm = ch->head_sent;
1147 ch->retry_data_task = NULL;
1148 GNUNET_assert (NULL == crm->qe);
1149 crm->qe = GCT_send (ch->t,
1150 &crm->data_message.header,
1157 * Check if we can now allow the client to transmit, and if so,
1158 * let the client know about it.
1160 * @param ch channel to check
1163 GCCH_check_allow_client (struct CadetChannel *ch)
1165 struct GNUNET_MQ_Envelope *env;
1166 struct GNUNET_CADET_LocalAck *msg;
1168 if (GNUNET_YES == ch->client_allowed)
1169 return; /* client already allowed! */
1170 if (CADET_CHANNEL_READY != ch->state)
1172 /* destination did not yet ACK our CREATE! */
1173 LOG (GNUNET_ERROR_TYPE_DEBUG,
1174 "%s not yet ready, throttling client until ACK.\n",
1178 if (ch->pending_messages > ch->max_pending_messages)
1180 /* Too many messages in queue. */
1181 LOG (GNUNET_ERROR_TYPE_DEBUG,
1182 "Message queue still too long on %s, throttling client until ACK.\n",
1186 if ( (NULL != ch->head_sent) &&
1187 (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
1189 LOG (GNUNET_ERROR_TYPE_DEBUG,
1190 "Gap in ACKs too big on %s, throttling client until ACK.\n",
1194 ch->client_allowed = GNUNET_YES;
1197 LOG (GNUNET_ERROR_TYPE_DEBUG,
1198 "Sending local ack to %s client\n",
1200 env = GNUNET_MQ_msg (msg,
1201 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1203 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1209 * Function called once the tunnel has sent one of our messages.
1210 * If the message is unreliable, simply frees the `crm`. If the
1211 * message was reliable, calculate retransmission time and
1212 * wait for ACK (or retransmit).
1214 * @param cls the `struct CadetReliableMessage` that was sent
1217 data_sent_cb (void *cls)
1219 struct CadetReliableMessage *crm = cls;
1220 struct CadetChannel *ch = crm->ch;
1221 struct CadetReliableMessage *off;
1224 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1227 if (GNUNET_NO == ch->reliable)
1230 ch->pending_messages--;
1231 GCCH_check_allow_client (ch);
1234 if (0 == crm->retry_delay.rel_value_us)
1235 crm->retry_delay = ch->expected_delay;
1236 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1238 /* find position for re-insertion into the DLL */
1239 if ( (NULL == ch->head_sent) ||
1240 (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1242 /* insert at HEAD, also (re)schedule retry task! */
1243 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1246 if (NULL != ch->retry_data_task)
1247 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1249 = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1250 &retry_transmission,
1254 for (off = ch->head_sent; NULL != off; off = off->next)
1255 if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1259 /* insert at tail */
1260 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1266 /* insert before off */
1267 GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1276 * Handle data given by a client.
1278 * Check whether the client is allowed to send in this tunnel, save if
1279 * channel is reliable and send an ACK to the client if there is still
1280 * buffer space in the tunnel.
1282 * @param ch Channel.
1283 * @param message payload to transmit.
1284 * @return #GNUNET_OK if everything goes well,
1285 * #GNUNET_SYSERR in case of an error.
1288 GCCH_handle_local_data (struct CadetChannel *ch,
1289 const struct GNUNET_MessageHeader *message)
1291 uint16_t payload_size = ntohs (message->size);
1292 struct CadetReliableMessage *crm;
1294 if (GNUNET_NO == ch->client_allowed)
1296 GNUNET_break_op (0);
1297 return GNUNET_SYSERR;
1299 ch->client_allowed = GNUNET_NO;
1300 ch->pending_messages++;
1302 /* Everything is correct, send the message. */
1303 crm = GNUNET_malloc (sizeof (*crm) + payload_size);
1305 crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size);
1306 crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1307 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1308 crm->data_message.mid = ch->mid_send;
1309 crm->data_message.ctn = ch->ctn;
1310 GNUNET_memcpy (&crm[1],
1313 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1316 LOG (GNUNET_ERROR_TYPE_DEBUG,
1317 "Sending %u bytes from local client to %s\n",
1320 crm->qe = GCT_send (ch->t,
1321 &crm->data_message.header,
1324 GCCH_check_allow_client (ch);
1330 * Try to deliver messages to the local client, if it is ready for more.
1332 * @param ch channel to process
1335 send_client_buffered_data (struct CadetChannel *ch)
1337 struct CadetOutOfOrderMessage *com;
1339 if (GNUNET_NO == ch->client_ready)
1340 return; /* client not ready */
1341 com = ch->head_recv;
1343 return; /* none pending */
1344 if ( (com->mid.mid != ch->mid_recv.mid) &&
1345 (GNUNET_NO == ch->out_of_order) )
1346 return; /* missing next one in-order */
1348 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349 "Passing payload message to client on %s\n",
1352 /* all good, pass next message to client */
1353 GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1356 /* FIXME: if unreliable, this is not aggressive
1357 enough, as it would be OK to have lost some! */
1358 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1359 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1360 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1363 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1364 (GNUNET_YES == ch->reliable) )
1366 /* The next 15 messages were also already received (0xFF), this
1367 suggests that the sender may be blocked on flow control
1368 urgently waiting for an ACK from us. (As we have an inherent
1369 maximum of 64 bits, and 15 is getting too close for comfort.)
1370 So we should send one now. */
1371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1372 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1374 if (GNUNET_YES == ch->reliable)
1375 send_channel_data_ack (ch);
1378 if (NULL != ch->head_recv)
1380 if (GNUNET_NO == ch->destroy)
1382 GCT_send_channel_destroy (ch->t,
1384 channel_destroy (ch);
1389 * Handle ACK from client on local channel.
1391 * @param ch channel to destroy
1394 GCCH_handle_local_ack (struct CadetChannel *ch)
1396 ch->client_ready = GNUNET_YES;
1397 send_client_buffered_data (ch);
1401 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1407 * @param ch Channel.
1408 * @param level Debug level to use.
1411 GCCH_debug (struct CadetChannel *ch,
1412 enum GNUNET_ErrorType level)
1416 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1418 __FILE__, __FUNCTION__, __LINE__);
1424 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1432 if (NULL != ch->owner)
1435 "CHN origin %s ready %s local-id: %u\n",
1437 ch->client_ready ? "YES" : "NO",
1438 ntohl (ch->ccn.channel_of_client));
1440 if (NULL != ch->dest)
1443 "CHN destination %s ready %s local-id: %u\n",
1445 ch->client_ready ? "YES" : "NO",
1446 ntohl (ch->ccn.channel_of_client));
1449 "CHN Message IDs recv: %d (%LLX), send: %d\n",
1450 ntohl (ch->mid_recv.mid),
1451 (unsigned long long) ch->mid_futures,
1452 ntohl (ch->mid_send.mid));
1457 /* end of gnunet-service-cadet-new_channel.c */