3 This file is part of GNUnet.
4 Copyright (C) 2001-2017 GNUnet e.V.
6 GNUnet is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 3, or (at your
9 option) any later version.
11 GNUnet is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with GNUnet; see the file COPYING. If not, write to the
18 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19 Boston, MA 02110-1301, USA.
22 * @file cadet/gnunet-service-cadet-new_channel.c
23 * @brief logical links between CADET clients
24 * @author Bartlomiej Polot
25 * @author Christian Grothoff
28 * - introduce shutdown so we can have half-closed channels, modify
29 * destroy to include MID to have FIN-ACK equivalents, etc.
30 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31 * - check that '0xFFULL' really is sufficient for flow control!
32 * - revisit handling of 'unreliable' traffic!
33 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34 * - figure out flow control without ACKs (unreliable traffic!)
37 #include "gnunet_util_lib.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50 * How long do we initially wait before retransmitting?
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55 * How long do we wait before dropping state about incoming
56 * connection to closed port?
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
62 * All the states a connection can be in.
64 enum CadetChannelState
67 * Uninitialized status, should never appear in operation.
72 * Connection create message sent, waiting for ACK.
74 CADET_CHANNEL_OPEN_SENT,
77 * Connection confirmed, ready to carry traffic.
84 * Info needed to retry a message in case it gets lost.
85 * Note that we DO use this structure also for unreliable
88 struct CadetReliableMessage
91 * Double linked list, FIFO style
93 struct CadetReliableMessage *next;
96 * Double linked list, FIFO style
98 struct CadetReliableMessage *prev;
101 * Which channel is this message in?
103 struct CadetChannel *ch;
106 * Entry in the tunnels queue for this message, NULL if it has left
107 * the tunnel. Used to cancel transmission in case we receive an
110 struct CadetTunnelQueueEntry *qe;
113 * How soon should we retry if we fail to get an ACK?
114 * Messages in the queue are sorted by this value.
116 struct GNUNET_TIME_Absolute next_retry;
119 * How long do we wait for an ACK after transmission?
120 * Use for the back-off calculation.
122 struct GNUNET_TIME_Relative retry_delay;
125 * Data message we are trying to send.
127 struct GNUNET_CADET_ChannelAppDataMessage data_message;
129 /* followed by variable-size payload */
134 * List of received out-of-order data messages.
136 struct CadetOutOfOrderMessage
139 * Double linked list, FIFO style
141 struct CadetOutOfOrderMessage *next;
144 * Double linked list, FIFO style
146 struct CadetOutOfOrderMessage *prev;
149 * ID of the message (messages up to this point needed
150 * before we give this one to the client).
152 struct ChannelMessageIdentifier mid;
155 * The envelope with the payload of the out-of-order message
157 struct GNUNET_MQ_Envelope *env;
163 * Struct containing all information regarding a channel to a remote client.
168 * Tunnel this channel is in.
170 struct CadetTunnel *t;
173 * Client owner of the tunnel, if any.
174 * (Used if this channel represends the initiating end of the tunnel.)
176 struct CadetClient *owner;
179 * Client destination of the tunnel, if any.
180 * (Used if this channel represents the listening end of the tunnel.)
182 struct CadetClient *dest;
185 * Last entry in the tunnel's queue relating to control messages
186 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
187 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
188 * transmission in case we receive updated information.
190 struct CadetTunnelQueueEntry *last_control_qe;
193 * Head of DLL of messages sent and not yet ACK'd.
195 struct CadetReliableMessage *head_sent;
198 * Tail of DLL of messages sent and not yet ACK'd.
200 struct CadetReliableMessage *tail_sent;
203 * Head of DLL of messages received out of order or while client was unready.
205 struct CadetOutOfOrderMessage *head_recv;
208 * Tail DLL of messages received out of order or while client was unready.
210 struct CadetOutOfOrderMessage *tail_recv;
213 * Task to resend/poll in case no ACK is received.
215 struct GNUNET_SCHEDULER_Task *retry_control_task;
218 * Task to resend/poll in case no ACK is received.
220 struct GNUNET_SCHEDULER_Task *retry_data_task;
223 * Last time the channel was used
225 struct GNUNET_TIME_Absolute timestamp;
228 * Destination port of the channel.
230 struct GNUNET_HashCode port;
233 * Counter for exponential backoff.
235 struct GNUNET_TIME_Relative retry_time;
238 * How long does it usually take to get an ACK.
240 struct GNUNET_TIME_Relative expected_delay;
243 * Bitfield of already-received messages past @e mid_recv.
245 uint64_t mid_futures;
248 * Next MID expected for incoming traffic.
250 struct ChannelMessageIdentifier mid_recv;
253 * Next MID to use for outgoing traffic.
255 struct ChannelMessageIdentifier mid_send;
258 * Total (reliable) messages pending ACK for this channel.
260 unsigned int pending_messages;
263 * Maximum (reliable) messages pending ACK for this channel
264 * before we throttle the client.
266 unsigned int max_pending_messages;
269 * Number identifying this channel in its tunnel.
271 struct GNUNET_CADET_ChannelTunnelNumber ctn;
274 * Local tunnel number for local client @e owner owning the channel.
275 * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
277 struct GNUNET_CADET_ClientChannelNumber ccn_owner;
280 * Local tunnel number for local client @e dest owning the channel.
281 * (< #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
283 struct GNUNET_CADET_ClientChannelNumber ccn_dest;
288 enum CadetChannelState state;
291 * Can we send data to the client?
296 * Is the tunnel bufferless (minimum latency)?
301 * Is the tunnel reliable?
306 * Is the tunnel out-of-order?
311 * Is this channel a loopback channel, where the destination is us again?
316 * Flag to signal the destruction of the channel. If this is set to
317 * #GNUNET_YES the channel will be destroyed once the queue is
326 * Get the static string for identification of the channel.
330 * @return Static string with the channel IDs.
333 GCCH_2s (const struct CadetChannel *ch)
335 static char buf[128];
337 GNUNET_snprintf (buf,
339 "Channel %s:%s ctn:%X(%X/%X)",
340 (GNUNET_YES == ch->is_loopback)
342 : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
343 GNUNET_h2s (&ch->port),
345 ntohl (ch->ccn_owner.channel_of_client),
346 ntohl (ch->ccn_dest.channel_of_client));
352 * Get the channel's public ID.
356 * @return ID used to identify the channel with the remote peer.
358 struct GNUNET_CADET_ChannelTunnelNumber
359 GCCH_get_id (const struct CadetChannel *ch)
366 * Destroy the given channel.
368 * @param ch channel to destroy
371 channel_destroy (struct CadetChannel *ch)
373 struct CadetReliableMessage *crm;
374 struct CadetOutOfOrderMessage *com;
376 while (NULL != (crm = ch->head_sent))
378 GNUNET_assert (ch == crm->ch);
381 GCT_send_cancel (crm->qe);
384 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
389 while (NULL != (com = ch->head_recv))
391 GNUNET_CONTAINER_DLL_remove (ch->head_recv,
394 GNUNET_MQ_discard (com->env);
397 if (NULL != ch->last_control_qe)
399 GCT_send_cancel (ch->last_control_qe);
400 ch->last_control_qe = NULL;
402 if (NULL != ch->retry_data_task)
404 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
405 ch->retry_data_task = NULL;
407 if (NULL != ch->retry_control_task)
409 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
410 ch->retry_control_task = NULL;
412 if (GNUNET_NO == ch->is_loopback)
414 GCT_remove_channel (ch->t,
424 * Send a channel create message.
426 * @param cls Channel for which to send.
429 send_channel_open (void *cls);
433 * Function called once the tunnel confirms that we sent the
434 * create message. Delays for a bit until we retry.
436 * @param cls our `struct CadetChannel`.
439 channel_open_sent_cb (void *cls)
441 struct CadetChannel *ch = cls;
443 GNUNET_assert (NULL != ch->last_control_qe);
444 ch->last_control_qe = NULL;
445 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
446 LOG (GNUNET_ERROR_TYPE_DEBUG,
447 "Sent CHANNEL_OPEN on %s, retrying in %s\n",
449 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
451 ch->retry_control_task
452 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
459 * Send a channel open message.
461 * @param cls Channel for which to send.
464 send_channel_open (void *cls)
466 struct CadetChannel *ch = cls;
467 struct GNUNET_CADET_ChannelOpenMessage msgcc;
470 ch->retry_control_task = NULL;
471 LOG (GNUNET_ERROR_TYPE_DEBUG,
472 "Sending CHANNEL_OPEN message for %s\n",
476 options |= GNUNET_CADET_OPTION_NOBUFFER;
478 options |= GNUNET_CADET_OPTION_RELIABLE;
479 if (ch->out_of_order)
480 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
481 msgcc.header.size = htons (sizeof (msgcc));
482 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
483 msgcc.opt = htonl (options);
484 msgcc.port = ch->port;
486 ch->state = CADET_CHANNEL_OPEN_SENT;
487 ch->last_control_qe = GCT_send (ch->t,
489 &channel_open_sent_cb,
495 * Function called once and only once after a channel was bound
496 * to its tunnel via #GCT_add_channel() is ready for transmission.
497 * Note that this is only the case for channels that this peer
498 * initiates, as for incoming channels we assume that they are
499 * ready for transmission immediately upon receiving the open
500 * message. Used to bootstrap the #GCT_send() process.
502 * @param ch the channel for which the tunnel is now ready
505 GCCH_tunnel_up (struct CadetChannel *ch)
507 GNUNET_assert (NULL == ch->retry_control_task);
508 LOG (GNUNET_ERROR_TYPE_DEBUG,
509 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
511 ch->retry_control_task
512 = GNUNET_SCHEDULER_add_now (&send_channel_open,
518 * Create a new channel.
520 * @param owner local client owning the channel
521 * @param ccn local number of this channel at the @a owner
522 * @param destination peer to which we should build the channel
523 * @param port desired port at @a destination
524 * @param options options for the channel
525 * @return handle to the new channel
527 struct CadetChannel *
528 GCCH_channel_local_new (struct CadetClient *owner,
529 struct GNUNET_CADET_ClientChannelNumber ccn,
530 struct CadetPeer *destination,
531 const struct GNUNET_HashCode *port,
534 struct CadetChannel *ch;
536 ch = GNUNET_new (struct CadetChannel);
537 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
538 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
539 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
540 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
544 if (0 == memcmp (&my_full_id,
545 GCP_get_id (destination),
546 sizeof (struct GNUNET_PeerIdentity)))
548 ch->is_loopback = GNUNET_YES;
549 ch->dest = GNUNET_CONTAINER_multihashmap_get (open_ports,
551 if (NULL == ch->dest)
553 /* port closed, wait for it to possibly open */
554 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
557 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
558 LOG (GNUNET_ERROR_TYPE_DEBUG,
559 "Created loose incoming loopback channel to port %s\n",
560 GNUNET_h2s (&ch->port));
570 ch->t = GCP_get_tunnel (destination,
572 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
573 ch->ctn = GCT_add_channel (ch->t,
576 GNUNET_STATISTICS_update (stats,
580 LOG (GNUNET_ERROR_TYPE_DEBUG,
581 "Created channel to port %s at peer %s for %s using %s\n",
583 GCP_2s (destination),
585 (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
591 * We had an incoming channel to a port that is closed.
592 * It has not been opened for a while, drop it.
594 * @param cls the channel to drop
597 timeout_closed_cb (void *cls)
599 struct CadetChannel *ch = cls;
601 ch->retry_control_task = NULL;
602 LOG (GNUNET_ERROR_TYPE_DEBUG,
603 "Closing incoming channel to port %s from peer %s due to timeout\n",
604 GNUNET_h2s (&ch->port),
605 GCP_2s (GCT_get_destination (ch->t)));
606 channel_destroy (ch);
611 * Create a new channel based on a request coming in over the network.
613 * @param t tunnel to the remote peer
614 * @param ctn identifier of this channel in the tunnel
615 * @param port desired local port
616 * @param options options for the channel
617 * @return handle to the new channel
619 struct CadetChannel *
620 GCCH_channel_incoming_new (struct CadetTunnel *t,
621 struct GNUNET_CADET_ChannelTunnelNumber ctn,
622 const struct GNUNET_HashCode *port,
625 struct CadetChannel *ch;
626 struct CadetClient *c;
628 ch = GNUNET_new (struct CadetChannel);
632 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
633 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
634 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
635 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
636 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
637 GNUNET_STATISTICS_update (stats,
642 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
646 /* port closed, wait for it to possibly open */
647 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
650 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
651 ch->retry_control_task
652 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
655 LOG (GNUNET_ERROR_TYPE_DEBUG,
656 "Created loose incoming channel to port %s from peer %s\n",
657 GNUNET_h2s (&ch->port),
658 GCP_2s (GCT_get_destination (ch->t)));
665 GNUNET_STATISTICS_update (stats,
674 * Function called once the tunnel confirms that we sent the
675 * ACK message. Just remembers it was sent, we do not expect
678 * @param cls our `struct CadetChannel`.
681 send_ack_cb (void *cls)
683 struct CadetChannel *ch = cls;
685 GNUNET_assert (NULL != ch->last_control_qe);
686 ch->last_control_qe = NULL;
691 * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
693 * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
696 send_channel_data_ack (struct CadetChannel *ch)
698 struct GNUNET_CADET_ChannelDataAckMessage msg;
700 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
701 msg.header.size = htons (sizeof (msg));
703 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
704 msg.futures = GNUNET_htonll (ch->mid_futures);
705 if (NULL != ch->last_control_qe)
706 GCT_send_cancel (ch->last_control_qe);
707 ch->last_control_qe = GCT_send (ch->t,
715 * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
718 * @param cls the `struct CadetChannel`
721 send_open_ack (void *cls)
723 struct CadetChannel *ch = cls;
724 struct GNUNET_CADET_ChannelManageMessage msg;
726 LOG (GNUNET_ERROR_TYPE_DEBUG,
727 "Sending CHANNEL_OPEN_ACK on %s\n",
729 ch->retry_control_task = NULL;
730 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
731 msg.header.size = htons (sizeof (msg));
732 msg.reserved = htonl (0);
734 if (NULL != ch->last_control_qe)
735 GCT_send_cancel (ch->last_control_qe);
736 ch->last_control_qe = GCT_send (ch->t,
744 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
745 * this channel. If the binding was successful, (re)transmit the
746 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
748 * @param ch channel that got the duplicate open
751 GCCH_handle_duplicate_open (struct CadetChannel *ch)
753 if (NULL == ch->dest)
755 LOG (GNUNET_ERROR_TYPE_DEBUG,
756 "Ignoring duplicate channel OPEN on %s: port is closed\n",
760 if (NULL != ch->retry_control_task)
762 LOG (GNUNET_ERROR_TYPE_DEBUG,
763 "Ignoring duplicate channel OPEN on %s: control message is pending\n",
767 LOG (GNUNET_ERROR_TYPE_DEBUG,
768 "Retransmitting OPEN_ACK on %s\n",
770 ch->retry_control_task
771 = GNUNET_SCHEDULER_add_now (&send_open_ack,
777 * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
779 * @param ch channel the ack is for
780 * @param to_owner #GNUNET_YES to send to owner,
781 * #GNUNET_NO to send to dest
784 send_ack_to_client (struct CadetChannel *ch,
787 struct GNUNET_MQ_Envelope *env;
788 struct GNUNET_CADET_LocalAck *ack;
789 struct CadetClient *c;
791 env = GNUNET_MQ_msg (ack,
792 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
793 ack->ccn = (GNUNET_YES == to_owner) ? ch->ccn_owner : ch->ccn_dest;
794 c = (GNUNET_YES == to_owner)
797 LOG (GNUNET_ERROR_TYPE_DEBUG,
798 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n",
800 (GNUNET_YES == to_owner) ? "owner" : "dest",
801 ntohl (ack->ccn.channel_of_client));
802 GSC_send_to_client (c,
808 * A client is bound to the port that we have a channel
809 * open to. Send the acknowledgement for the connection
810 * request and establish the link with the client.
812 * @param ch open incoming channel
813 * @param c client listening on the respective port
816 GCCH_bind (struct CadetChannel *ch,
817 struct CadetClient *c)
821 LOG (GNUNET_ERROR_TYPE_DEBUG,
822 "Binding %s from %s to port %s of %s\n",
825 GNUNET_h2s (&ch->port),
827 if (NULL != ch->retry_control_task)
829 /* there might be a timeout task here */
830 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
831 ch->retry_control_task = NULL;
835 options |= GNUNET_CADET_OPTION_NOBUFFER;
837 options |= GNUNET_CADET_OPTION_RELIABLE;
838 if (ch->out_of_order)
839 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
841 ch->ccn_dest = GSC_bind (c,
843 (GNUNET_YES == ch->is_loopback)
844 ? GCP_get (&my_full_id,
846 : GCT_get_destination (ch->t),
849 GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) <
850 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
851 ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
852 if (GNUNET_YES == ch->is_loopback)
854 ch->state = CADET_CHANNEL_OPEN_SENT;
855 GCCH_handle_channel_open_ack (ch);
859 /* notify other peer that we accepted the connection */
860 ch->retry_control_task
861 = GNUNET_SCHEDULER_add_now (&send_open_ack,
864 /* give client it's initial supply of ACKs */
865 GNUNET_assert (ntohl (ch->ccn_dest.channel_of_client) <
866 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
867 for (unsigned int i=0;i<ch->max_pending_messages;i++)
868 send_ack_to_client (ch,
874 * Destroy locally created channel. Called by the local client, so no
875 * need to tell the client.
877 * @param ch channel to destroy
878 * @param c client that caused the destruction
881 GCCH_channel_local_destroy (struct CadetChannel *ch,
882 struct CadetClient *c)
884 LOG (GNUNET_ERROR_TYPE_DEBUG,
885 "%s asks for destruction of %s\n",
888 GNUNET_assert (NULL != c);
891 else if (c == ch->dest)
895 if (GNUNET_YES == ch->destroy)
897 /* other end already destroyed, with the local client gone, no need
898 to finish transmissions, just destroy immediately. */
899 channel_destroy (ch);
902 if ( (NULL != ch->head_sent) ||
903 (NULL != ch->owner) ||
906 /* Wait for other end to destroy us as well,
907 and otherwise allow send queue to be transmitted first */
908 ch->destroy = GNUNET_YES;
911 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
912 if (CADET_CHANNEL_NEW != ch->state)
913 GCT_send_channel_destroy (ch->t,
915 /* Nothing left to do, just finish destruction */
916 channel_destroy (ch);
921 * We got an acknowledgement for the creation of the channel
922 * (the port is open on the other side). Begin transmissions.
924 * @param ch channel to destroy
927 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
931 case CADET_CHANNEL_NEW:
932 /* this should be impossible */
935 case CADET_CHANNEL_OPEN_SENT:
936 if (NULL == ch->owner)
938 /* We're not the owner, wrong direction! */
942 LOG (GNUNET_ERROR_TYPE_DEBUG,
943 "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
945 if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
947 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
948 ch->retry_control_task = NULL;
950 ch->state = CADET_CHANNEL_READY;
951 /* On first connect, send client as many ACKs as we allow messages
953 for (unsigned int i=0;i<ch->max_pending_messages;i++)
954 send_ack_to_client (ch,
957 case CADET_CHANNEL_READY:
958 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
959 LOG (GNUNET_ERROR_TYPE_DEBUG,
960 "Received duplicate channel OPEN_ACK for %s\n",
962 GNUNET_STATISTICS_update (stats,
963 "# duplicate CREATE_ACKs",
972 * Test if element @a e1 comes before element @a e2.
974 * TODO: use opportunity to create generic list insertion sort
975 * logic in container!
977 * @param cls closure, our `struct CadetChannel`
978 * @param e1 an element of to sort
979 * @param e2 another element to sort
980 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
983 is_before (void *cls,
987 struct CadetOutOfOrderMessage *m1 = e1;
988 struct CadetOutOfOrderMessage *m2 = e2;
989 uint32_t v1 = ntohl (m1->mid.mid);
990 uint32_t v2 = ntohl (m2->mid.mid);
994 if (delta > (uint32_t) INT_MAX)
996 /* in overflow range, we can safely assume we wrapped around */
1007 * We got payload data for a channel. Pass it on to the client
1008 * and send an ACK to the other end (once flow control allows it!)
1010 * @param ch channel that got data
1013 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1014 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1016 struct GNUNET_MQ_Envelope *env;
1017 struct GNUNET_CADET_LocalData *ld;
1018 struct CadetOutOfOrderMessage *com;
1019 size_t payload_size;
1021 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1022 payload_size = ntohs (msg->header.size) - sizeof (*msg);
1023 env = GNUNET_MQ_msg_extra (ld,
1025 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1026 ld->ccn = (NULL == ch->dest) ? ch->ccn_owner : ch->ccn_dest;
1027 GNUNET_memcpy (&ld[1],
1030 if ( (GNUNET_YES == ch->client_ready) &&
1031 ( (GNUNET_YES == ch->out_of_order) ||
1032 (msg->mid.mid == ch->mid_recv.mid) ) )
1034 LOG (GNUNET_ERROR_TYPE_DEBUG,
1035 "Giving %u bytes of payload from %s to client %s\n",
1036 (unsigned int) payload_size,
1038 GSC_2s (ch->owner ? ch->owner : ch->dest));
1039 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1041 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1042 ch->mid_futures >>= 1;
1046 /* FIXME-SECURITY: if the element is WAY too far ahead,
1047 drop it (can't buffer too much!) */
1048 LOG (GNUNET_ERROR_TYPE_DEBUG,
1049 "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n",
1050 (GNUNET_YES == ch->client_ready)
1052 : "client-not-ready",
1053 (unsigned int) payload_size,
1055 ntohl (msg->mid.mid),
1056 ntohl (ch->mid_recv.mid));
1058 com = GNUNET_new (struct CadetOutOfOrderMessage);
1059 com->mid = msg->mid;
1061 /* sort into list ordered by "is_before" */
1062 if ( (NULL == ch->head_recv) ||
1063 (GNUNET_YES == is_before (ch,
1067 GNUNET_CONTAINER_DLL_insert (ch->head_recv,
1073 struct CadetOutOfOrderMessage *pos;
1075 for (pos = ch->head_recv;
1086 GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
1090 GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
1100 * We got an acknowledgement for payload data for a channel.
1101 * Possibly resume transmissions.
1103 * @param ch channel that got the ack
1104 * @param ack details about what was received
1107 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1108 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1110 struct CadetReliableMessage *crm;
1112 GNUNET_break (GNUNET_NO == ch->is_loopback);
1113 if (GNUNET_NO == ch->reliable)
1115 /* not expecting ACKs on unreliable channel, odd */
1116 GNUNET_break_op (0);
1119 for (crm = ch->head_sent;
1122 if (ack->mid.mid == crm->data_message.mid.mid)
1126 /* ACK for message we already dropped, might have been a
1127 duplicate ACK? Ignore. */
1128 LOG (GNUNET_ERROR_TYPE_DEBUG,
1129 "Duplicate DATA_ACK on %s, ignoring\n",
1131 GNUNET_STATISTICS_update (stats,
1132 "# duplicate DATA_ACKs",
1137 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1140 ch->pending_messages--;
1141 send_ack_to_client (ch,
1146 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1147 LOG (GNUNET_ERROR_TYPE_DEBUG,
1148 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1150 (unsigned int) ntohl (ack->mid.mid),
1151 ch->pending_messages);
1152 send_ack_to_client (ch,
1160 * Destroy channel, based on the other peer closing the
1161 * connection. Also needs to remove this channel from
1164 * @param ch channel to destroy
1167 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1169 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1170 LOG (GNUNET_ERROR_TYPE_DEBUG,
1171 "Received remote channel DESTROY for %s\n",
1173 if (GNUNET_YES == ch->destroy)
1175 /* Local client already gone, this is instant-death. */
1176 channel_destroy (ch);
1179 if (NULL != ch->head_recv)
1181 LOG (GNUNET_ERROR_TYPE_WARNING,
1182 "Lost end of transmission due to remote shutdown on %s\n",
1184 /* FIXME: change API to notify client about truncated transmission! */
1186 ch->destroy = GNUNET_YES;
1187 GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest,
1188 (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest,
1190 channel_destroy (ch);
1195 * Function called once the tunnel has sent one of our messages.
1196 * If the message is unreliable, simply frees the `crm`. If the
1197 * message was reliable, calculate retransmission time and
1198 * wait for ACK (or retransmit).
1200 * @param cls the `struct CadetReliableMessage` that was sent
1203 data_sent_cb (void *cls);
1207 * We need to retry a transmission, the last one took too long to
1210 * @param cls the `struct CadetChannel` where we need to retransmit
1213 retry_transmission (void *cls)
1215 struct CadetChannel *ch = cls;
1216 struct CadetReliableMessage *crm = ch->head_sent;
1218 ch->retry_data_task = NULL;
1219 GNUNET_assert (NULL == crm->qe);
1220 crm->qe = GCT_send (ch->t,
1221 &crm->data_message.header,
1228 * Function called once the tunnel has sent one of our messages.
1229 * If the message is unreliable, simply frees the `crm`. If the
1230 * message was reliable, calculate retransmission time and
1231 * wait for ACK (or retransmit).
1233 * @param cls the `struct CadetReliableMessage` that was sent
1236 data_sent_cb (void *cls)
1238 struct CadetReliableMessage *crm = cls;
1239 struct CadetChannel *ch = crm->ch;
1240 struct CadetReliableMessage *off;
1242 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1244 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1247 if (GNUNET_NO == ch->reliable)
1250 ch->pending_messages--;
1251 send_ack_to_client (ch,
1257 if (0 == crm->retry_delay.rel_value_us)
1258 crm->retry_delay = ch->expected_delay;
1259 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1261 /* find position for re-insertion into the DLL */
1262 if ( (NULL == ch->head_sent) ||
1263 (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1265 /* insert at HEAD, also (re)schedule retry task! */
1266 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1269 if (NULL != ch->retry_data_task)
1270 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1272 = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1273 &retry_transmission,
1277 for (off = ch->head_sent; NULL != off; off = off->next)
1278 if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1282 /* insert at tail */
1283 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1289 /* insert before off */
1290 GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1299 * Handle data given by a client.
1301 * Check whether the client is allowed to send in this tunnel, save if
1302 * channel is reliable and send an ACK to the client if there is still
1303 * buffer space in the tunnel.
1305 * @param ch Channel.
1306 * @param sender_ccn ccn of the sender
1307 * @param buf payload to transmit.
1308 * @param buf_len number of bytes in @a buf
1309 * @return #GNUNET_OK if everything goes well,
1310 * #GNUNET_SYSERR in case of an error.
1313 GCCH_handle_local_data (struct CadetChannel *ch,
1314 struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1318 struct CadetReliableMessage *crm;
1320 if (ch->pending_messages > ch->max_pending_messages)
1323 return GNUNET_SYSERR;
1325 ch->pending_messages++;
1327 if (GNUNET_YES == ch->is_loopback)
1329 struct CadetClient *receiver;
1330 struct GNUNET_MQ_Envelope *env;
1331 struct GNUNET_CADET_LocalData *ld;
1334 env = GNUNET_MQ_msg_extra (ld,
1336 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1337 if (sender_ccn.channel_of_client ==
1338 ch->ccn_owner.channel_of_client)
1340 receiver = ch->dest;
1341 ld->ccn = ch->ccn_dest;
1342 to_owner = GNUNET_NO;
1346 GNUNET_assert (sender_ccn.channel_of_client ==
1347 ch->ccn_dest.channel_of_client);
1348 receiver = ch->owner;
1349 ld->ccn = ch->ccn_owner;
1350 to_owner = GNUNET_YES;
1352 GNUNET_memcpy (&ld[1],
1355 /* FIXME: this does not provide for flow control! */
1356 GSC_send_to_client (receiver,
1358 send_ack_to_client (ch,
1363 /* Everything is correct, send the message. */
1364 crm = GNUNET_malloc (sizeof (*crm) + buf_len);
1366 crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1367 crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1368 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1369 crm->data_message.mid = ch->mid_send;
1370 crm->data_message.ctn = ch->ctn;
1371 GNUNET_memcpy (&crm[1],
1374 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1377 LOG (GNUNET_ERROR_TYPE_DEBUG,
1378 "Sending %u bytes from local client to %s\n",
1381 crm->qe = GCT_send (ch->t,
1382 &crm->data_message.header,
1390 * Try to deliver messages to the local client, if it is ready for more.
1392 * @param ch channel to process
1395 send_client_buffered_data (struct CadetChannel *ch)
1397 struct CadetOutOfOrderMessage *com;
1399 if (GNUNET_NO == ch->client_ready)
1400 return; /* client not ready */
1401 com = ch->head_recv;
1403 return; /* none pending */
1404 if ( (com->mid.mid != ch->mid_recv.mid) &&
1405 (GNUNET_NO == ch->out_of_order) )
1406 return; /* missing next one in-order */
1408 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1409 "Passing payload message to client on %s\n",
1412 /* all good, pass next message to client */
1413 GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1416 /* FIXME: if unreliable, this is not aggressive
1417 enough, as it would be OK to have lost some! */
1418 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1419 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1420 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1423 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1424 (GNUNET_YES == ch->reliable) )
1426 /* The next 15 messages were also already received (0xFF), this
1427 suggests that the sender may be blocked on flow control
1428 urgently waiting for an ACK from us. (As we have an inherent
1429 maximum of 64 bits, and 15 is getting too close for comfort.)
1430 So we should send one now. */
1431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1432 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1434 if (GNUNET_YES == ch->reliable)
1435 send_channel_data_ack (ch);
1438 if (NULL != ch->head_recv)
1440 if (GNUNET_NO == ch->destroy)
1442 GCT_send_channel_destroy (ch->t,
1444 channel_destroy (ch);
1449 * Handle ACK from client on local channel.
1451 * @param ch channel to destroy
1452 * @param client_ccn ccn of the client sending the ack
1455 GCCH_handle_local_ack (struct CadetChannel *ch,
1456 struct GNUNET_CADET_ClientChannelNumber client_ccn)
1458 ch->client_ready = GNUNET_YES;
1459 LOG (GNUNET_ERROR_TYPE_DEBUG,
1460 "Got LOCAL_ACK, client ready to receive more data!\n");
1461 send_client_buffered_data (ch);
1465 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1471 * @param ch Channel.
1472 * @param level Debug level to use.
1475 GCCH_debug (struct CadetChannel *ch,
1476 enum GNUNET_ErrorType level)
1480 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1482 __FILE__, __FUNCTION__, __LINE__);
1488 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1496 if (NULL != ch->owner)
1499 "CHN origin %s ready %s local-id: %u\n",
1501 ch->client_ready ? "YES" : "NO",
1502 ntohl (ch->ccn_owner.channel_of_client));
1504 if (NULL != ch->dest)
1507 "CHN destination %s ready %s local-id: %u\n",
1509 ch->client_ready ? "YES" : "NO",
1510 ntohl (ch->ccn_dest.channel_of_client));
1513 "CHN Message IDs recv: %d (%LLX), send: %d\n",
1514 ntohl (ch->mid_recv.mid),
1515 (unsigned long long) ch->mid_futures,
1516 ntohl (ch->mid_send.mid));
1521 /* end of gnunet-service-cadet-new_channel.c */