3 This file is part of GNUnet.
4 Copyright (C) 2001-2017 GNUnet e.V.
6 GNUnet is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published
8 by the Free Software Foundation; either version 3, or (at your
9 option) any later version.
11 GNUnet is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with GNUnet; see the file COPYING. If not, write to the
18 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19 Boston, MA 02110-1301, USA.
22 * @file cadet/gnunet-service-cadet-new_channel.c
23 * @brief logical links between CADET clients
24 * @author Bartlomiej Polot
25 * @author Christian Grothoff
28 * - introduce shutdown so we can have half-closed channels, modify
29 * destroy to include MID to have FIN-ACK equivalents, etc.
30 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31 * - check that '0xFFULL' really is sufficient for flow control!
32 * - revisit handling of 'unreliable' traffic!
33 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34 * - figure out flow control without ACKs (unreliable traffic!)
37 #include "gnunet_util_lib.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50 * How long do we initially wait before retransmitting?
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55 * How long do we wait before dropping state about incoming
56 * connection to closed port?
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
62 * All the states a connection can be in.
64 enum CadetChannelState
67 * Uninitialized status, should never appear in operation.
72 * Connection create message sent, waiting for ACK.
74 CADET_CHANNEL_OPEN_SENT,
77 * Connection confirmed, ready to carry traffic.
84 * Info needed to retry a message in case it gets lost.
85 * Note that we DO use this structure also for unreliable
88 struct CadetReliableMessage
91 * Double linked list, FIFO style
93 struct CadetReliableMessage *next;
96 * Double linked list, FIFO style
98 struct CadetReliableMessage *prev;
101 * Which channel is this message in?
103 struct CadetChannel *ch;
106 * Entry in the tunnels queue for this message, NULL if it has left
107 * the tunnel. Used to cancel transmission in case we receive an
110 struct CadetTunnelQueueEntry *qe;
113 * How soon should we retry if we fail to get an ACK?
114 * Messages in the queue are sorted by this value.
116 struct GNUNET_TIME_Absolute next_retry;
119 * How long do we wait for an ACK after transmission?
120 * Use for the back-off calculation.
122 struct GNUNET_TIME_Relative retry_delay;
125 * Data message we are trying to send.
127 struct GNUNET_CADET_ChannelAppDataMessage data_message;
129 /* followed by variable-size payload */
134 * List of received out-of-order data messages.
136 struct CadetOutOfOrderMessage
139 * Double linked list, FIFO style
141 struct CadetOutOfOrderMessage *next;
144 * Double linked list, FIFO style
146 struct CadetOutOfOrderMessage *prev;
149 * ID of the message (messages up to this point needed
150 * before we give this one to the client).
152 struct ChannelMessageIdentifier mid;
155 * The envelope with the payload of the out-of-order message
157 struct GNUNET_MQ_Envelope *env;
163 * Struct containing all information regarding a channel to a remote client.
168 * Tunnel this channel is in.
170 struct CadetTunnel *t;
173 * Client owner of the tunnel, if any.
174 * (Used if this channel represends the initiating end of the tunnel.)
176 struct CadetClient *owner;
179 * Client destination of the tunnel, if any.
180 * (Used if this channel represents the listening end of the tunnel.)
182 struct CadetClient *dest;
185 * Last entry in the tunnel's queue relating to control messages
186 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
187 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
188 * transmission in case we receive updated information.
190 struct CadetTunnelQueueEntry *last_control_qe;
193 * Head of DLL of messages sent and not yet ACK'd.
195 struct CadetReliableMessage *head_sent;
198 * Tail of DLL of messages sent and not yet ACK'd.
200 struct CadetReliableMessage *tail_sent;
203 * Head of DLL of messages received out of order or while client was unready.
205 struct CadetOutOfOrderMessage *head_recv;
208 * Tail DLL of messages received out of order or while client was unready.
210 struct CadetOutOfOrderMessage *tail_recv;
213 * Task to resend/poll in case no ACK is received.
215 struct GNUNET_SCHEDULER_Task *retry_control_task;
218 * Task to resend/poll in case no ACK is received.
220 struct GNUNET_SCHEDULER_Task *retry_data_task;
223 * Last time the channel was used
225 struct GNUNET_TIME_Absolute timestamp;
228 * Destination port of the channel.
230 struct GNUNET_HashCode port;
233 * Counter for exponential backoff.
235 struct GNUNET_TIME_Relative retry_time;
238 * How long does it usually take to get an ACK.
240 struct GNUNET_TIME_Relative expected_delay;
243 * Bitfield of already-received messages past @e mid_recv.
245 uint64_t mid_futures;
248 * Next MID expected for incoming traffic.
250 struct ChannelMessageIdentifier mid_recv;
253 * Next MID to use for outgoing traffic.
255 struct ChannelMessageIdentifier mid_send;
258 * Total (reliable) messages pending ACK for this channel.
260 unsigned int pending_messages;
263 * Maximum (reliable) messages pending ACK for this channel
264 * before we throttle the client.
266 unsigned int max_pending_messages;
269 * Number identifying this channel in its tunnel.
271 struct GNUNET_CADET_ChannelTunnelNumber ctn;
274 * Local tunnel number for local client @e owner owning the channel.
275 * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
277 struct GNUNET_CADET_ClientChannelNumber ccn_owner;
280 * Local tunnel number for local client @e dest owning the channel.
281 * (< #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
283 struct GNUNET_CADET_ClientChannelNumber ccn_dest;
288 enum CadetChannelState state;
291 * Can we send data to the client?
296 * Is the tunnel bufferless (minimum latency)?
301 * Is the tunnel reliable?
306 * Is the tunnel out-of-order?
311 * Is this channel a loopback channel, where the destination is us again?
316 * Flag to signal the destruction of the channel. If this is set to
317 * #GNUNET_YES the channel will be destroyed once the queue is
326 * Get the static string for identification of the channel.
330 * @return Static string with the channel IDs.
333 GCCH_2s (const struct CadetChannel *ch)
335 static char buf[128];
337 GNUNET_snprintf (buf,
339 "Channel %s:%s ctn:%X(%X/%X)",
340 GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
341 GNUNET_h2s (&ch->port),
343 ntohl (ch->ccn_owner.channel_of_client),
344 ntohl (ch->ccn_dest.channel_of_client));
350 * Get the channel's public ID.
354 * @return ID used to identify the channel with the remote peer.
356 struct GNUNET_CADET_ChannelTunnelNumber
357 GCCH_get_id (const struct CadetChannel *ch)
364 * Destroy the given channel.
366 * @param ch channel to destroy
369 channel_destroy (struct CadetChannel *ch)
371 struct CadetReliableMessage *crm;
372 struct CadetOutOfOrderMessage *com;
374 while (NULL != (crm = ch->head_sent))
376 GNUNET_assert (ch == crm->ch);
379 GCT_send_cancel (crm->qe);
382 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
387 while (NULL != (com = ch->head_recv))
389 GNUNET_CONTAINER_DLL_remove (ch->head_recv,
392 GNUNET_MQ_discard (com->env);
395 if (NULL != ch->last_control_qe)
397 GCT_send_cancel (ch->last_control_qe);
398 ch->last_control_qe = NULL;
400 if (NULL != ch->retry_data_task)
402 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
403 ch->retry_data_task = NULL;
405 if (NULL != ch->retry_control_task)
407 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
408 ch->retry_control_task = NULL;
410 if (GNUNET_NO == ch->is_loopback)
412 GCT_remove_channel (ch->t,
422 * Send a channel create message.
424 * @param cls Channel for which to send.
427 send_channel_open (void *cls);
431 * Function called once the tunnel confirms that we sent the
432 * create message. Delays for a bit until we retry.
434 * @param cls our `struct CadetChannel`.
437 channel_open_sent_cb (void *cls)
439 struct CadetChannel *ch = cls;
441 GNUNET_assert (NULL != ch->last_control_qe);
442 ch->last_control_qe = NULL;
443 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
444 LOG (GNUNET_ERROR_TYPE_DEBUG,
445 "Sent CHANNEL_OPEN on %s, retrying in %s\n",
447 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
449 ch->retry_control_task
450 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
457 * Send a channel open message.
459 * @param cls Channel for which to send.
462 send_channel_open (void *cls)
464 struct CadetChannel *ch = cls;
465 struct GNUNET_CADET_ChannelOpenMessage msgcc;
468 ch->retry_control_task = NULL;
469 LOG (GNUNET_ERROR_TYPE_DEBUG,
470 "Sending CHANNEL_OPEN message for %s\n",
474 options |= GNUNET_CADET_OPTION_NOBUFFER;
476 options |= GNUNET_CADET_OPTION_RELIABLE;
477 if (ch->out_of_order)
478 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
479 msgcc.header.size = htons (sizeof (msgcc));
480 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
481 msgcc.opt = htonl (options);
482 msgcc.port = ch->port;
484 ch->state = CADET_CHANNEL_OPEN_SENT;
485 ch->last_control_qe = GCT_send (ch->t,
487 &channel_open_sent_cb,
493 * Function called once and only once after a channel was bound
494 * to its tunnel via #GCT_add_channel() is ready for transmission.
495 * Note that this is only the case for channels that this peer
496 * initiates, as for incoming channels we assume that they are
497 * ready for transmission immediately upon receiving the open
498 * message. Used to bootstrap the #GCT_send() process.
500 * @param ch the channel for which the tunnel is now ready
503 GCCH_tunnel_up (struct CadetChannel *ch)
505 GNUNET_assert (NULL == ch->retry_control_task);
506 LOG (GNUNET_ERROR_TYPE_DEBUG,
507 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
509 ch->retry_control_task
510 = GNUNET_SCHEDULER_add_now (&send_channel_open,
516 * Create a new channel.
518 * @param owner local client owning the channel
519 * @param ccn local number of this channel at the @a owner
520 * @param destination peer to which we should build the channel
521 * @param port desired port at @a destination
522 * @param options options for the channel
523 * @return handle to the new channel
525 struct CadetChannel *
526 GCCH_channel_local_new (struct CadetClient *owner,
527 struct GNUNET_CADET_ClientChannelNumber ccn,
528 struct CadetPeer *destination,
529 const struct GNUNET_HashCode *port,
532 struct CadetChannel *ch;
534 ch = GNUNET_new (struct CadetChannel);
535 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
536 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
537 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
538 ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
542 if (0 == memcmp (&my_full_id,
543 GCP_get_id (destination),
544 sizeof (struct GNUNET_PeerIdentity)))
546 ch->is_loopback = GNUNET_YES;
547 ch->dest = GNUNET_CONTAINER_multihashmap_get (open_ports,
549 if (NULL == ch->dest)
551 /* port closed, wait for it to possibly open */
552 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
555 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
556 LOG (GNUNET_ERROR_TYPE_DEBUG,
557 "Created loose incoming loopback channel to port %s\n",
558 GNUNET_h2s (&ch->port));
568 ch->t = GCP_get_tunnel (destination,
570 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
571 ch->ctn = GCT_add_channel (ch->t,
574 GNUNET_STATISTICS_update (stats,
578 LOG (GNUNET_ERROR_TYPE_DEBUG,
579 "Created channel to port %s at peer %s for %s using %s\n",
581 GCP_2s (destination),
583 (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
589 * We had an incoming channel to a port that is closed.
590 * It has not been opened for a while, drop it.
592 * @param cls the channel to drop
595 timeout_closed_cb (void *cls)
597 struct CadetChannel *ch = cls;
599 ch->retry_control_task = NULL;
600 LOG (GNUNET_ERROR_TYPE_DEBUG,
601 "Closing incoming channel to port %s from peer %s due to timeout\n",
602 GNUNET_h2s (&ch->port),
603 GCP_2s (GCT_get_destination (ch->t)));
604 channel_destroy (ch);
609 * Create a new channel based on a request coming in over the network.
611 * @param t tunnel to the remote peer
612 * @param ctn identifier of this channel in the tunnel
613 * @param port desired local port
614 * @param options options for the channel
615 * @return handle to the new channel
617 struct CadetChannel *
618 GCCH_channel_incoming_new (struct CadetTunnel *t,
619 struct GNUNET_CADET_ChannelTunnelNumber ctn,
620 const struct GNUNET_HashCode *port,
623 struct CadetChannel *ch;
624 struct CadetClient *c;
626 ch = GNUNET_new (struct CadetChannel);
630 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
631 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
632 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
633 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
634 ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
635 GNUNET_STATISTICS_update (stats,
640 c = GNUNET_CONTAINER_multihashmap_get (open_ports,
644 /* port closed, wait for it to possibly open */
645 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
648 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
649 ch->retry_control_task
650 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
653 LOG (GNUNET_ERROR_TYPE_DEBUG,
654 "Created loose incoming channel to port %s from peer %s\n",
655 GNUNET_h2s (&ch->port),
656 GCP_2s (GCT_get_destination (ch->t)));
663 GNUNET_STATISTICS_update (stats,
672 * Function called once the tunnel confirms that we sent the
673 * ACK message. Just remembers it was sent, we do not expect
676 * @param cls our `struct CadetChannel`.
679 send_ack_cb (void *cls)
681 struct CadetChannel *ch = cls;
683 GNUNET_assert (NULL != ch->last_control_qe);
684 ch->last_control_qe = NULL;
689 * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
691 * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
694 send_channel_data_ack (struct CadetChannel *ch)
696 struct GNUNET_CADET_ChannelDataAckMessage msg;
698 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
699 msg.header.size = htons (sizeof (msg));
701 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
702 msg.futures = GNUNET_htonll (ch->mid_futures);
703 if (NULL != ch->last_control_qe)
704 GCT_send_cancel (ch->last_control_qe);
705 ch->last_control_qe = GCT_send (ch->t,
713 * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
716 * @param cls the `struct CadetChannel`
719 send_open_ack (void *cls)
721 struct CadetChannel *ch = cls;
722 struct GNUNET_CADET_ChannelManageMessage msg;
724 LOG (GNUNET_ERROR_TYPE_DEBUG,
725 "Sending CHANNEL_OPEN_ACK on %s\n",
727 ch->retry_control_task = NULL;
728 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
729 msg.header.size = htons (sizeof (msg));
730 msg.reserved = htonl (0);
732 if (NULL != ch->last_control_qe)
733 GCT_send_cancel (ch->last_control_qe);
734 ch->last_control_qe = GCT_send (ch->t,
742 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
743 * this channel. If the binding was successful, (re)transmit the
744 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
746 * @param ch channel that got the duplicate open
749 GCCH_handle_duplicate_open (struct CadetChannel *ch)
751 if (NULL == ch->dest)
753 LOG (GNUNET_ERROR_TYPE_DEBUG,
754 "Ignoring duplicate channel OPEN on %s: port is closed\n",
758 if (NULL != ch->retry_control_task)
760 LOG (GNUNET_ERROR_TYPE_DEBUG,
761 "Ignoring duplicate channel OPEN on %s: control message is pending\n",
765 LOG (GNUNET_ERROR_TYPE_DEBUG,
766 "Retransmitting OPEN_ACK on %s\n",
768 ch->retry_control_task
769 = GNUNET_SCHEDULER_add_now (&send_open_ack,
775 * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL ACK to the client to solicit more messages.
777 * @param ch channel the ack is for
778 * @param c client to send the ACK to
781 send_ack_to_client (struct CadetChannel *ch,
782 struct CadetClient *c)
784 struct GNUNET_MQ_Envelope *env;
785 struct GNUNET_CADET_LocalAck *ack;
787 env = GNUNET_MQ_msg (ack,
788 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
789 ack->ccn = (c == ch->owner) ? ch->ccn_owner : ch->ccn_dest;
790 GSC_send_to_client (c,
796 * A client is bound to the port that we have a channel
797 * open to. Send the acknowledgement for the connection
798 * request and establish the link with the client.
800 * @param ch open incoming channel
801 * @param c client listening on the respective port
804 GCCH_bind (struct CadetChannel *ch,
805 struct CadetClient *c)
809 LOG (GNUNET_ERROR_TYPE_DEBUG,
810 "Binding %s from %s to port %s of %s\n",
813 GNUNET_h2s (&ch->port),
815 if (NULL != ch->retry_control_task)
817 /* there might be a timeout task here */
818 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
819 ch->retry_control_task = NULL;
823 options |= GNUNET_CADET_OPTION_NOBUFFER;
825 options |= GNUNET_CADET_OPTION_RELIABLE;
826 if (ch->out_of_order)
827 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
829 ch->ccn_dest = GSC_bind (c,
831 (GNUNET_YES == ch->is_loopback)
832 ? GCP_get (&my_full_id,
834 : GCT_get_destination (ch->t),
837 ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
838 if (GNUNET_YES == ch->is_loopback)
840 ch->state = CADET_CHANNEL_OPEN_SENT;
841 GCCH_handle_channel_open_ack (ch);
845 /* notify other peer that we accepted the connection */
846 ch->retry_control_task
847 = GNUNET_SCHEDULER_add_now (&send_open_ack,
850 /* give client it's initial supply of ACKs */
851 for (unsigned int i=0;i<ch->max_pending_messages;i++)
852 send_ack_to_client (ch,
858 * Destroy locally created channel. Called by the local client, so no
859 * need to tell the client.
861 * @param ch channel to destroy
862 * @param c client that caused the destruction
865 GCCH_channel_local_destroy (struct CadetChannel *ch,
866 struct CadetClient *c)
868 LOG (GNUNET_ERROR_TYPE_DEBUG,
869 "%s asks for destruction of %s\n",
872 GNUNET_assert (NULL != c);
875 else if (c == ch->dest)
879 if (GNUNET_YES == ch->destroy)
881 /* other end already destroyed, with the local client gone, no need
882 to finish transmissions, just destroy immediately. */
883 channel_destroy (ch);
886 if ( (NULL != ch->head_sent) ||
887 (NULL != ch->owner) ||
890 /* Wait for other end to destroy us as well,
891 and otherwise allow send queue to be transmitted first */
892 ch->destroy = GNUNET_YES;
895 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
896 if (CADET_CHANNEL_NEW != ch->state)
897 GCT_send_channel_destroy (ch->t,
899 /* Nothing left to do, just finish destruction */
900 channel_destroy (ch);
905 * We got an acknowledgement for the creation of the channel
906 * (the port is open on the other side). Begin transmissions.
908 * @param ch channel to destroy
911 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
915 case CADET_CHANNEL_NEW:
916 /* this should be impossible */
919 case CADET_CHANNEL_OPEN_SENT:
920 if (NULL == ch->owner)
922 /* We're not the owner, wrong direction! */
926 LOG (GNUNET_ERROR_TYPE_DEBUG,
927 "Received channel OPEN_ACK for waiting %s, entering READY state\n",
929 if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
931 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
932 ch->retry_control_task = NULL;
934 ch->state = CADET_CHANNEL_READY;
935 /* On first connect, send client as many ACKs as we allow messages
937 for (unsigned int i=0;i<ch->max_pending_messages;i++)
938 send_ack_to_client (ch,
941 case CADET_CHANNEL_READY:
942 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
943 LOG (GNUNET_ERROR_TYPE_DEBUG,
944 "Received duplicate channel OPEN_ACK for %s\n",
946 GNUNET_STATISTICS_update (stats,
947 "# duplicate CREATE_ACKs",
956 * Test if element @a e1 comes before element @a e2.
958 * TODO: use opportunity to create generic list insertion sort
959 * logic in container!
961 * @param cls closure, our `struct CadetChannel`
962 * @param e1 an element of to sort
963 * @param e2 another element to sort
964 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
967 is_before (void *cls,
971 struct CadetOutOfOrderMessage *m1 = e1;
972 struct CadetOutOfOrderMessage *m2 = e2;
973 uint32_t v1 = ntohl (m1->mid.mid);
974 uint32_t v2 = ntohl (m2->mid.mid);
978 if (delta > (uint32_t) INT_MAX)
980 /* in overflow range, we can safely assume we wrapped around */
991 * We got payload data for a channel. Pass it on to the client
992 * and send an ACK to the other end (once flow control allows it!)
994 * @param ch channel that got data
997 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
998 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1000 struct GNUNET_MQ_Envelope *env;
1001 struct GNUNET_CADET_LocalData *ld;
1002 struct CadetOutOfOrderMessage *com;
1003 size_t payload_size;
1005 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1006 payload_size = ntohs (msg->header.size) - sizeof (*msg);
1007 LOG (GNUNET_ERROR_TYPE_DEBUG,
1008 "Receicved %u bytes of application data on %s\n",
1009 (unsigned int) payload_size,
1011 env = GNUNET_MQ_msg_extra (ld,
1013 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1014 ld->ccn = (NULL == ch->dest) ? ch->ccn_owner : ch->ccn_dest;
1015 GNUNET_memcpy (&ld[1],
1018 if ( (GNUNET_YES == ch->client_ready) &&
1019 ( (GNUNET_YES == ch->out_of_order) ||
1020 (msg->mid.mid == ch->mid_recv.mid) ) )
1022 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1024 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1025 ch->mid_futures >>= 1;
1029 /* FIXME-SECURITY: if the element is WAY too far ahead,
1030 drop it (can't buffer too much!) */
1031 com = GNUNET_new (struct CadetOutOfOrderMessage);
1032 com->mid = msg->mid;
1034 /* sort into list ordered by "is_before" */
1035 if ( (NULL == ch->head_recv) ||
1036 (GNUNET_YES == is_before (ch,
1040 GNUNET_CONTAINER_DLL_insert (ch->head_recv,
1046 struct CadetOutOfOrderMessage *pos;
1048 for (pos = ch->head_recv;
1059 GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
1063 GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
1073 * We got an acknowledgement for payload data for a channel.
1074 * Possibly resume transmissions.
1076 * @param ch channel that got the ack
1077 * @param ack details about what was received
1080 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1081 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1083 struct CadetReliableMessage *crm;
1085 GNUNET_break (GNUNET_NO == ch->is_loopback);
1086 if (GNUNET_NO == ch->reliable)
1088 /* not expecting ACKs on unreliable channel, odd */
1089 GNUNET_break_op (0);
1092 for (crm = ch->head_sent;
1095 if (ack->mid.mid == crm->data_message.mid.mid)
1099 /* ACK for message we already dropped, might have been a
1100 duplicate ACK? Ignore. */
1101 LOG (GNUNET_ERROR_TYPE_DEBUG,
1102 "Duplicate DATA_ACK on %s, ignoring\n",
1104 GNUNET_STATISTICS_update (stats,
1105 "# duplicate DATA_ACKs",
1110 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1113 ch->pending_messages--;
1115 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1116 LOG (GNUNET_ERROR_TYPE_DEBUG,
1117 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1119 (unsigned int) ntohl (ack->mid.mid),
1120 ch->pending_messages);
1121 send_ack_to_client (ch,
1122 (NULL == ch->owner) ? ch->dest : ch->owner);
1127 * Destroy channel, based on the other peer closing the
1128 * connection. Also needs to remove this channel from
1131 * @param ch channel to destroy
1134 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1136 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1137 LOG (GNUNET_ERROR_TYPE_DEBUG,
1138 "Received remote channel DESTROY for %s\n",
1140 if (GNUNET_YES == ch->destroy)
1142 /* Local client already gone, this is instant-death. */
1143 channel_destroy (ch);
1146 if (NULL != ch->head_recv)
1148 LOG (GNUNET_ERROR_TYPE_WARNING,
1149 "Lost end of transmission due to remote shutdown on %s\n",
1151 /* FIXME: change API to notify client about truncated transmission! */
1153 ch->destroy = GNUNET_YES;
1154 GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest,
1155 (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest,
1157 channel_destroy (ch);
1162 * Function called once the tunnel has sent one of our messages.
1163 * If the message is unreliable, simply frees the `crm`. If the
1164 * message was reliable, calculate retransmission time and
1165 * wait for ACK (or retransmit).
1167 * @param cls the `struct CadetReliableMessage` that was sent
1170 data_sent_cb (void *cls);
1174 * We need to retry a transmission, the last one took too long to
1177 * @param cls the `struct CadetChannel` where we need to retransmit
1180 retry_transmission (void *cls)
1182 struct CadetChannel *ch = cls;
1183 struct CadetReliableMessage *crm = ch->head_sent;
1185 ch->retry_data_task = NULL;
1186 GNUNET_assert (NULL == crm->qe);
1187 crm->qe = GCT_send (ch->t,
1188 &crm->data_message.header,
1195 * Check if we can now allow the client to transmit, and if so,
1196 * let the client know about it.
1198 * @param ch channel to check
1201 GCCH_check_allow_client (struct CadetChannel *ch)
1203 struct GNUNET_MQ_Envelope *env;
1204 struct GNUNET_CADET_LocalAck *msg;
1206 if (CADET_CHANNEL_READY != ch->state)
1208 /* destination did not yet ACK our CREATE! */
1209 LOG (GNUNET_ERROR_TYPE_DEBUG,
1210 "%s not yet ready, throttling client until ACK.\n",
1214 if (ch->pending_messages > ch->max_pending_messages)
1216 /* Too many messages in queue. */
1217 LOG (GNUNET_ERROR_TYPE_DEBUG,
1218 "Message queue still too long on %s, throttling client until ACK.\n",
1222 if ( (NULL != ch->head_sent) &&
1223 (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
1225 LOG (GNUNET_ERROR_TYPE_DEBUG,
1226 "Gap in ACKs too big on %s, throttling client until ACK.\n",
1231 LOG (GNUNET_ERROR_TYPE_DEBUG,
1232 "Sending local ack to %s client\n",
1234 env = GNUNET_MQ_msg (msg,
1235 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1236 msg->ccn = (NULL != ch->owner) ? ch->ccn_owner : ch->ccn_dest;
1237 GSC_send_to_client ((NULL != ch->owner) ? ch->owner : ch->dest,
1243 * Function called once the tunnel has sent one of our messages.
1244 * If the message is unreliable, simply frees the `crm`. If the
1245 * message was reliable, calculate retransmission time and
1246 * wait for ACK (or retransmit).
1248 * @param cls the `struct CadetReliableMessage` that was sent
1251 data_sent_cb (void *cls)
1253 struct CadetReliableMessage *crm = cls;
1254 struct CadetChannel *ch = crm->ch;
1255 struct CadetReliableMessage *off;
1258 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1261 if (GNUNET_NO == ch->reliable)
1264 ch->pending_messages--;
1265 GCCH_check_allow_client (ch);
1268 if (0 == crm->retry_delay.rel_value_us)
1269 crm->retry_delay = ch->expected_delay;
1270 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1272 /* find position for re-insertion into the DLL */
1273 if ( (NULL == ch->head_sent) ||
1274 (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1276 /* insert at HEAD, also (re)schedule retry task! */
1277 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1280 if (NULL != ch->retry_data_task)
1281 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1283 = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1284 &retry_transmission,
1288 for (off = ch->head_sent; NULL != off; off = off->next)
1289 if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1293 /* insert at tail */
1294 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1300 /* insert before off */
1301 GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1310 * Handle data given by a client.
1312 * Check whether the client is allowed to send in this tunnel, save if
1313 * channel is reliable and send an ACK to the client if there is still
1314 * buffer space in the tunnel.
1316 * @param ch Channel.
1317 * @param sender client sending the data
1318 * @param buf payload to transmit.
1319 * @param buf_len number of bytes in @a buf
1320 * @return #GNUNET_OK if everything goes well,
1321 * #GNUNET_SYSERR in case of an error.
1324 GCCH_handle_local_data (struct CadetChannel *ch,
1325 struct CadetClient *sender,
1329 struct CadetReliableMessage *crm;
1331 if (ch->pending_messages > ch->max_pending_messages)
1334 return GNUNET_SYSERR;
1336 ch->pending_messages++;
1338 if (GNUNET_YES == ch->is_loopback)
1340 struct CadetClient *receiver;
1341 struct GNUNET_MQ_Envelope *env;
1342 struct GNUNET_CADET_LocalData *ld;
1344 env = GNUNET_MQ_msg_extra (ld,
1346 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1347 receiver = (ch->owner == sender) ? ch->dest : ch->owner;
1348 ld->ccn = (ch->owner == sender) ? ch->ccn_dest : ch->ccn_owner;
1349 GNUNET_memcpy (&ld[1],
1352 /* FIXME: this does not provide for flow control! */
1353 GSC_send_to_client (receiver,
1355 send_ack_to_client (ch,
1360 /* Everything is correct, send the message. */
1361 crm = GNUNET_malloc (sizeof (*crm) + buf_len);
1363 crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1364 crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1365 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1366 crm->data_message.mid = ch->mid_send;
1367 crm->data_message.ctn = ch->ctn;
1368 GNUNET_memcpy (&crm[1],
1371 GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1374 LOG (GNUNET_ERROR_TYPE_DEBUG,
1375 "Sending %u bytes from local client to %s\n",
1378 crm->qe = GCT_send (ch->t,
1379 &crm->data_message.header,
1382 GCCH_check_allow_client (ch);
1388 * Try to deliver messages to the local client, if it is ready for more.
1390 * @param ch channel to process
1393 send_client_buffered_data (struct CadetChannel *ch)
1395 struct CadetOutOfOrderMessage *com;
1397 if (GNUNET_NO == ch->client_ready)
1398 return; /* client not ready */
1399 com = ch->head_recv;
1401 return; /* none pending */
1402 if ( (com->mid.mid != ch->mid_recv.mid) &&
1403 (GNUNET_NO == ch->out_of_order) )
1404 return; /* missing next one in-order */
1406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407 "Passing payload message to client on %s\n",
1410 /* all good, pass next message to client */
1411 GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1414 /* FIXME: if unreliable, this is not aggressive
1415 enough, as it would be OK to have lost some! */
1416 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1417 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1418 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1421 if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1422 (GNUNET_YES == ch->reliable) )
1424 /* The next 15 messages were also already received (0xFF), this
1425 suggests that the sender may be blocked on flow control
1426 urgently waiting for an ACK from us. (As we have an inherent
1427 maximum of 64 bits, and 15 is getting too close for comfort.)
1428 So we should send one now. */
1429 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1430 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1432 if (GNUNET_YES == ch->reliable)
1433 send_channel_data_ack (ch);
1436 if (NULL != ch->head_recv)
1438 if (GNUNET_NO == ch->destroy)
1440 GCT_send_channel_destroy (ch->t,
1442 channel_destroy (ch);
1447 * Handle ACK from client on local channel.
1449 * @param ch channel to destroy
1452 GCCH_handle_local_ack (struct CadetChannel *ch)
1454 ch->client_ready = GNUNET_YES;
1455 send_client_buffered_data (ch);
1459 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1465 * @param ch Channel.
1466 * @param level Debug level to use.
1469 GCCH_debug (struct CadetChannel *ch,
1470 enum GNUNET_ErrorType level)
1474 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1476 __FILE__, __FUNCTION__, __LINE__);
1482 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1490 if (NULL != ch->owner)
1493 "CHN origin %s ready %s local-id: %u\n",
1495 ch->client_ready ? "YES" : "NO",
1496 ntohl (ch->ccn_owner.channel_of_client));
1498 if (NULL != ch->dest)
1501 "CHN destination %s ready %s local-id: %u\n",
1503 ch->client_ready ? "YES" : "NO",
1504 ntohl (ch->ccn_dest.channel_of_client));
1507 "CHN Message IDs recv: %d (%LLX), send: %d\n",
1508 ntohl (ch->mid_recv.mid),
1509 (unsigned long long) ch->mid_futures,
1510 ntohl (ch->mid_send.mid));
1515 /* end of gnunet-service-cadet-new_channel.c */