2 This file is part of GNUnet.
3 Copyright (C) 2001-2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
16 * @file cadet/gnunet-service-cadet_channel.c
17 * @brief logical links between CADET clients
18 * @author Bartlomiej Polot
19 * @author Christian Grothoff
22 * - Congestion/flow control:
23 * + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
24 * (and figure out how/where to use this!)
25 * + figure out flow control without ACKs (unreliable traffic!)
26 * - revisit handling of 'unbuffered' traffic!
27 * (need to push down through tunnel into connection selection)
28 * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
29 * reserve more bits in 'options' to allow for buffer size control?
33 #include "gnunet_statistics_service.h"
34 #include "gnunet-service-cadet_channel.h"
35 #include "gnunet-service-cadet_connection.h"
36 #include "gnunet-service-cadet_tunnels.h"
37 #include "gnunet-service-cadet_paths.h"
39 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
42 * How long do we initially wait before retransmitting?
44 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
47 * How long do we wait before dropping state about incoming
48 * connection to closed port?
50 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
53 * How long do we wait at least before retransmitting ever?
55 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
58 * Maximum message ID into the future we accept for out-of-order messages.
59 * If the message is more than this into the future, we drop it. This is
60 * important both to detect values that are actually in the past, as well
61 * as to limit adversarially triggerable memory consumption.
63 * Note that right now we have "max_pending_messages = 4" hard-coded in
64 * the logic below, so a value of 4 would suffice here. But we plan to
65 * allow larger windows in the future...
67 #define MAX_OUT_OF_ORDER_DISTANCE 1024
71 * All the states a channel can be in.
73 enum CadetChannelState
76 * Uninitialized status, should never appear in operation.
81 * Channel is to a port that is not open, we're waiting for the
87 * CHANNEL_OPEN message sent, waiting for CHANNEL_OPEN_ACK.
89 CADET_CHANNEL_OPEN_SENT,
92 * Connection confirmed, ready to carry traffic.
99 * Info needed to retry a message in case it gets lost.
100 * Note that we DO use this structure also for unreliable
103 struct CadetReliableMessage
106 * Double linked list, FIFO style
108 struct CadetReliableMessage *next;
111 * Double linked list, FIFO style
113 struct CadetReliableMessage *prev;
116 * Which channel is this message in?
118 struct CadetChannel *ch;
121 * Entry in the tunnels queue for this message, NULL if it has left
122 * the tunnel. Used to cancel transmission in case we receive an
125 struct CadetTunnelQueueEntry *qe;
128 * Data message we are trying to send.
130 struct GNUNET_CADET_ChannelAppDataMessage *data_message;
133 * How soon should we retry if we fail to get an ACK?
134 * Messages in the queue are sorted by this value.
136 struct GNUNET_TIME_Absolute next_retry;
139 * How long do we wait for an ACK after transmission?
140 * Use for the back-off calculation.
142 struct GNUNET_TIME_Relative retry_delay;
145 * Time when we first successfully transmitted the message
146 * (that is, set @e num_transmissions to 1).
148 struct GNUNET_TIME_Absolute first_transmission_time;
151 * Identifier of the connection that this message took when it
152 * was first transmitted. Only useful if @e num_transmissions is 1.
154 struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken;
157 * How often was this message transmitted? #GNUNET_SYSERR if there
158 * was an error transmitting the message, #GNUNET_NO if it was not
159 * yet transmitted ever, otherwise the number of (re) transmissions.
161 int num_transmissions;
167 * List of received out-of-order data messages.
169 struct CadetOutOfOrderMessage
172 * Double linked list, FIFO style
174 struct CadetOutOfOrderMessage *next;
177 * Double linked list, FIFO style
179 struct CadetOutOfOrderMessage *prev;
182 * ID of the message (messages up to this point needed
183 * before we give this one to the client).
185 struct ChannelMessageIdentifier mid;
188 * The envelope with the payload of the out-of-order message
190 struct GNUNET_MQ_Envelope *env;
196 * Client endpoint of a `struct CadetChannel`. A channel may be a
197 * loopback channel, in which case it has two of these endpoints.
198 * Note that flow control also is required in both directions.
200 struct CadetChannelClient
203 * Client handle. Not by itself sufficient to designate
204 * the client endpoint, as the same client handle may
205 * be used for both the owner and the destination, and
206 * we thus also need the channel ID to identify the client.
208 struct CadetClient *c;
211 * Head of DLL of messages received out of order or while client was unready.
213 struct CadetOutOfOrderMessage *head_recv;
216 * Tail DLL of messages received out of order or while client was unready.
218 struct CadetOutOfOrderMessage *tail_recv;
221 * Local tunnel number for this client.
222 * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
223 * otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
225 struct GNUNET_CADET_ClientChannelNumber ccn;
228 * Number of entries currently in @a head_recv DLL.
230 unsigned int num_recv;
233 * Can we send data to the client?
241 * Struct containing all information regarding a channel to a remote client.
246 * Tunnel this channel is in.
248 struct CadetTunnel *t;
251 * Client owner of the tunnel, if any.
252 * (Used if this channel represends the initiating end of the tunnel.)
254 struct CadetChannelClient *owner;
257 * Client destination of the tunnel, if any.
258 * (Used if this channel represents the listening end of the tunnel.)
260 struct CadetChannelClient *dest;
263 * Last entry in the tunnel's queue relating to control messages
264 * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
265 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK). Used to cancel
266 * transmission in case we receive updated information.
268 struct CadetTunnelQueueEntry *last_control_qe;
271 * Head of DLL of messages sent and not yet ACK'd.
273 struct CadetReliableMessage *head_sent;
276 * Tail of DLL of messages sent and not yet ACK'd.
278 struct CadetReliableMessage *tail_sent;
281 * Task to resend/poll in case no ACK is received.
283 struct GNUNET_SCHEDULER_Task *retry_control_task;
286 * Task to resend/poll in case no ACK is received.
288 struct GNUNET_SCHEDULER_Task *retry_data_task;
291 * Last time the channel was used
293 struct GNUNET_TIME_Absolute timestamp;
296 * Destination port of the channel.
298 struct GNUNET_HashCode port;
301 * Hash'ed port of the channel with initiator and destination PID.
303 struct GNUNET_HashCode h_port;
306 * Counter for exponential backoff.
308 struct GNUNET_TIME_Relative retry_time;
311 * Bitfield of already-received messages past @e mid_recv.
313 uint64_t mid_futures;
316 * Next MID expected for incoming traffic.
318 struct ChannelMessageIdentifier mid_recv;
321 * Next MID to use for outgoing traffic.
323 struct ChannelMessageIdentifier mid_send;
326 * Total (reliable) messages pending ACK for this channel.
328 unsigned int pending_messages;
331 * Maximum (reliable) messages pending ACK for this channel
332 * before we throttle the client.
334 unsigned int max_pending_messages;
337 * Number identifying this channel in its tunnel.
339 struct GNUNET_CADET_ChannelTunnelNumber ctn;
344 enum CadetChannelState state;
347 * Count how many ACKs we skipped, used to prevent long
348 * sequences of ACK skipping.
350 unsigned int skip_ack_series;
353 * Is the tunnel bufferless (minimum latency)?
358 * Is the tunnel reliable?
363 * Is the tunnel out-of-order?
368 * Is this channel a loopback channel, where the destination is us again?
373 * Flag to signal the destruction of the channel. If this is set to
374 * #GNUNET_YES the channel will be destroyed once the queue is
383 * Get the static string for identification of the channel.
387 * @return Static string with the channel IDs.
390 GCCH_2s (const struct CadetChannel *ch)
392 static char buf[128];
394 GNUNET_snprintf (buf,
396 "Channel %s:%s ctn:%X(%X/%X)",
397 (GNUNET_YES == ch->is_loopback)
399 : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
400 GNUNET_h2s (&ch->port),
402 (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
403 (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
409 * Hash the @a port and @a initiator and @a listener to
410 * calculate the "challenge" @a h_port we send to the other
411 * peer on #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN.
413 * @param[out] h_port set to the hash of @a port, @a initiator and @a listener
414 * @param port cadet port, as seen by CADET clients
415 * @param listener peer that is listining on @a port
418 GCCH_hash_port (struct GNUNET_HashCode *h_port,
419 const struct GNUNET_HashCode *port,
420 const struct GNUNET_PeerIdentity *listener)
422 struct GNUNET_HashContext *hc;
424 hc = GNUNET_CRYPTO_hash_context_start ();
425 GNUNET_CRYPTO_hash_context_read (hc,
428 GNUNET_CRYPTO_hash_context_read (hc,
431 GNUNET_CRYPTO_hash_context_finish (hc,
433 LOG (GNUNET_ERROR_TYPE_DEBUG,
434 "Calculated port hash %s\n",
435 GNUNET_h2s (h_port));
440 * Get the channel's public ID.
444 * @return ID used to identify the channel with the remote peer.
446 struct GNUNET_CADET_ChannelTunnelNumber
447 GCCH_get_id (const struct CadetChannel *ch)
454 * Release memory associated with @a ccc
456 * @param ccc data structure to clean up
459 free_channel_client (struct CadetChannelClient *ccc)
461 struct CadetOutOfOrderMessage *com;
463 while (NULL != (com = ccc->head_recv))
465 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
469 GNUNET_MQ_discard (com->env);
477 * Destroy the given channel.
479 * @param ch channel to destroy
482 channel_destroy (struct CadetChannel *ch)
484 struct CadetReliableMessage *crm;
486 while (NULL != (crm = ch->head_sent))
488 GNUNET_assert (ch == crm->ch);
491 GCT_send_cancel (crm->qe);
494 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
497 GNUNET_free (crm->data_message);
500 if (NULL != ch->owner)
502 free_channel_client (ch->owner);
505 if (NULL != ch->dest)
507 free_channel_client (ch->dest);
510 if (NULL != ch->last_control_qe)
512 GCT_send_cancel (ch->last_control_qe);
513 ch->last_control_qe = NULL;
515 if (NULL != ch->retry_data_task)
517 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
518 ch->retry_data_task = NULL;
520 if (NULL != ch->retry_control_task)
522 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
523 ch->retry_control_task = NULL;
525 if (GNUNET_NO == ch->is_loopback)
527 GCT_remove_channel (ch->t,
537 * Send a channel create message.
539 * @param cls Channel for which to send.
542 send_channel_open (void *cls);
546 * Function called once the tunnel confirms that we sent the
547 * create message. Delays for a bit until we retry.
549 * @param cls our `struct CadetChannel`.
550 * @param cid identifier of the connection within the tunnel, NULL
551 * if transmission failed
554 channel_open_sent_cb (void *cls,
555 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
557 struct CadetChannel *ch = cls;
559 GNUNET_assert (NULL != ch->last_control_qe);
560 ch->last_control_qe = NULL;
561 ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
562 LOG (GNUNET_ERROR_TYPE_DEBUG,
563 "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
565 GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
567 ch->retry_control_task
568 = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
575 * Send a channel open message.
577 * @param cls Channel for which to send.
580 send_channel_open (void *cls)
582 struct CadetChannel *ch = cls;
583 struct GNUNET_CADET_ChannelOpenMessage msgcc;
586 ch->retry_control_task = NULL;
587 LOG (GNUNET_ERROR_TYPE_DEBUG,
588 "Sending CHANNEL_OPEN message for %s\n",
592 options |= GNUNET_CADET_OPTION_NOBUFFER;
594 options |= GNUNET_CADET_OPTION_RELIABLE;
595 if (ch->out_of_order)
596 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
597 msgcc.header.size = htons (sizeof (msgcc));
598 msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
599 msgcc.opt = htonl (options);
600 msgcc.h_port = ch->h_port;
602 ch->state = CADET_CHANNEL_OPEN_SENT;
603 if (NULL != ch->last_control_qe)
604 GCT_send_cancel (ch->last_control_qe);
605 ch->last_control_qe = GCT_send (ch->t,
607 &channel_open_sent_cb,
609 GNUNET_assert (NULL == ch->retry_control_task);
614 * Function called once and only once after a channel was bound
615 * to its tunnel via #GCT_add_channel() is ready for transmission.
616 * Note that this is only the case for channels that this peer
617 * initiates, as for incoming channels we assume that they are
618 * ready for transmission immediately upon receiving the open
619 * message. Used to bootstrap the #GCT_send() process.
621 * @param ch the channel for which the tunnel is now ready
624 GCCH_tunnel_up (struct CadetChannel *ch)
626 GNUNET_assert (NULL == ch->retry_control_task);
627 LOG (GNUNET_ERROR_TYPE_DEBUG,
628 "Tunnel up, sending CHANNEL_OPEN on %s now\n",
630 ch->retry_control_task
631 = GNUNET_SCHEDULER_add_now (&send_channel_open,
637 * Create a new channel.
639 * @param owner local client owning the channel
640 * @param ccn local number of this channel at the @a owner
641 * @param destination peer to which we should build the channel
642 * @param port desired port at @a destination
643 * @param options options for the channel
644 * @return handle to the new channel
646 struct CadetChannel *
647 GCCH_channel_local_new (struct CadetClient *owner,
648 struct GNUNET_CADET_ClientChannelNumber ccn,
649 struct CadetPeer *destination,
650 const struct GNUNET_HashCode *port,
653 struct CadetChannel *ch;
654 struct CadetChannelClient *ccco;
656 ccco = GNUNET_new (struct CadetChannelClient);
659 ccco->client_ready = GNUNET_YES;
661 ch = GNUNET_new (struct CadetChannel);
662 ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
663 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
664 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
665 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
666 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
669 GCCH_hash_port (&ch->h_port,
671 GCP_get_id (destination));
672 if (0 == memcmp (&my_full_id,
673 GCP_get_id (destination),
674 sizeof (struct GNUNET_PeerIdentity)))
678 ch->is_loopback = GNUNET_YES;
679 op = GNUNET_CONTAINER_multihashmap_get (open_ports,
683 /* port closed, wait for it to possibly open */
684 ch->state = CADET_CHANNEL_LOOSE;
685 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
688 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
689 LOG (GNUNET_ERROR_TYPE_DEBUG,
690 "Created loose incoming loopback channel to port %s\n",
691 GNUNET_h2s (&ch->port));
702 ch->t = GCP_get_tunnel (destination,
704 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
705 ch->ctn = GCT_add_channel (ch->t,
708 GNUNET_STATISTICS_update (stats,
712 LOG (GNUNET_ERROR_TYPE_DEBUG,
713 "Created channel to port %s at peer %s for %s using %s\n",
715 GCP_2s (destination),
717 (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
723 * We had an incoming channel to a port that is closed.
724 * It has not been opened for a while, drop it.
726 * @param cls the channel to drop
729 timeout_closed_cb (void *cls)
731 struct CadetChannel *ch = cls;
733 ch->retry_control_task = NULL;
734 LOG (GNUNET_ERROR_TYPE_DEBUG,
735 "Closing incoming channel to port %s from peer %s due to timeout\n",
736 GNUNET_h2s (&ch->port),
737 GCP_2s (GCT_get_destination (ch->t)));
738 channel_destroy (ch);
743 * Create a new channel based on a request coming in over the network.
745 * @param t tunnel to the remote peer
746 * @param ctn identifier of this channel in the tunnel
747 * @param h_port desired hash of local port
748 * @param options options for the channel
749 * @return handle to the new channel
751 struct CadetChannel *
752 GCCH_channel_incoming_new (struct CadetTunnel *t,
753 struct GNUNET_CADET_ChannelTunnelNumber ctn,
754 const struct GNUNET_HashCode *h_port,
757 struct CadetChannel *ch;
760 ch = GNUNET_new (struct CadetChannel);
761 ch->h_port = *h_port;
764 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
765 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
766 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
767 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
768 ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
769 GNUNET_STATISTICS_update (stats,
774 op = GNUNET_CONTAINER_multihashmap_get (open_ports,
778 /* port closed, wait for it to possibly open */
779 ch->state = CADET_CHANNEL_LOOSE;
780 (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
783 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
784 GNUNET_assert (NULL == ch->retry_control_task);
785 ch->retry_control_task
786 = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
789 LOG (GNUNET_ERROR_TYPE_DEBUG,
790 "Created loose incoming channel to port %s from peer %s\n",
791 GNUNET_h2s (&ch->port),
792 GCP_2s (GCT_get_destination (ch->t)));
800 GNUNET_STATISTICS_update (stats,
809 * Function called once the tunnel confirms that we sent the
810 * ACK message. Just remembers it was sent, we do not expect
813 * @param cls our `struct CadetChannel`.
814 * @param cid identifier of the connection within the tunnel, NULL
815 * if transmission failed
818 send_ack_cb (void *cls,
819 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
821 struct CadetChannel *ch = cls;
823 GNUNET_assert (NULL != ch->last_control_qe);
824 ch->last_control_qe = NULL;
829 * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
831 * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
834 send_channel_data_ack (struct CadetChannel *ch)
836 struct GNUNET_CADET_ChannelDataAckMessage msg;
838 if (GNUNET_NO == ch->reliable)
839 return; /* no ACKs */
840 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
841 msg.header.size = htons (sizeof (msg));
843 msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
844 msg.futures = GNUNET_htonll (ch->mid_futures);
845 LOG (GNUNET_ERROR_TYPE_DEBUG,
846 "Sending DATA_ACK %u:%llX via %s\n",
847 (unsigned int) ntohl (msg.mid.mid),
848 (unsigned long long) ch->mid_futures,
850 if (NULL != ch->last_control_qe)
851 GCT_send_cancel (ch->last_control_qe);
852 ch->last_control_qe = GCT_send (ch->t,
860 * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
863 * @param cls the `struct CadetChannel`
866 send_open_ack (void *cls)
868 struct CadetChannel *ch = cls;
869 struct GNUNET_CADET_ChannelOpenAckMessage msg;
871 ch->retry_control_task = NULL;
872 LOG (GNUNET_ERROR_TYPE_DEBUG,
873 "Sending CHANNEL_OPEN_ACK on %s\n",
875 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
876 msg.header.size = htons (sizeof (msg));
877 msg.reserved = htonl (0);
880 if (NULL != ch->last_control_qe)
881 GCT_send_cancel (ch->last_control_qe);
882 ch->last_control_qe = GCT_send (ch->t,
890 * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
891 * this channel. If the binding was successful, (re)transmit the
892 * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
894 * @param ch channel that got the duplicate open
895 * @param cti identifier of the connection that delivered the message
898 GCCH_handle_duplicate_open (struct CadetChannel *ch,
899 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
901 if (NULL == ch->dest)
903 LOG (GNUNET_ERROR_TYPE_DEBUG,
904 "Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n",
908 if (NULL != ch->retry_control_task)
910 LOG (GNUNET_ERROR_TYPE_DEBUG,
911 "Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n",
915 LOG (GNUNET_ERROR_TYPE_DEBUG,
916 "Retransmitting CHANNEL_OPEN_ACK on %s\n",
918 ch->retry_control_task
919 = GNUNET_SCHEDULER_add_now (&send_open_ack,
925 * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
927 * @param ch channel the ack is for
928 * @param to_owner #GNUNET_YES to send to owner,
929 * #GNUNET_NO to send to dest
932 send_ack_to_client (struct CadetChannel *ch,
935 struct GNUNET_MQ_Envelope *env;
936 struct GNUNET_CADET_LocalAck *ack;
937 struct CadetChannelClient *ccc;
939 ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
942 /* This can happen if we are just getting ACKs after
943 our local client already disconnected. */
944 GNUNET_assert (GNUNET_YES == ch->destroy);
947 env = GNUNET_MQ_msg (ack,
948 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
950 LOG (GNUNET_ERROR_TYPE_DEBUG,
951 "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
953 (GNUNET_YES == to_owner) ? "owner" : "dest",
954 ntohl (ack->ccn.channel_of_client),
955 ch->pending_messages,
956 ch->max_pending_messages);
957 GSC_send_to_client (ccc->c,
963 * A client is bound to the port that we have a channel
964 * open to. Send the acknowledgement for the connection
965 * request and establish the link with the client.
967 * @param ch open incoming channel
968 * @param c client listening on the respective @a port
969 * @param port the port @a is listening on
972 GCCH_bind (struct CadetChannel *ch,
973 struct CadetClient *c,
974 const struct GNUNET_HashCode *port)
977 struct CadetChannelClient *cccd;
979 LOG (GNUNET_ERROR_TYPE_DEBUG,
980 "Binding %s from %s to port %s of %s\n",
983 GNUNET_h2s (&ch->port),
985 if (NULL != ch->retry_control_task)
987 /* there might be a timeout task here */
988 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
989 ch->retry_control_task = NULL;
993 options |= GNUNET_CADET_OPTION_NOBUFFER;
995 options |= GNUNET_CADET_OPTION_RELIABLE;
996 if (ch->out_of_order)
997 options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
998 cccd = GNUNET_new (struct CadetChannelClient);
999 GNUNET_assert (NULL == ch->dest);
1003 cccd->client_ready = GNUNET_YES;
1004 cccd->ccn = GSC_bind (c,
1006 (GNUNET_YES == ch->is_loopback)
1007 ? GCP_get (&my_full_id,
1009 : GCT_get_destination (ch->t),
1012 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
1013 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1014 ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
1015 if (GNUNET_YES == ch->is_loopback)
1017 ch->state = CADET_CHANNEL_OPEN_SENT;
1018 GCCH_handle_channel_open_ack (ch,
1024 /* notify other peer that we accepted the connection */
1025 ch->state = CADET_CHANNEL_READY;
1026 ch->retry_control_task
1027 = GNUNET_SCHEDULER_add_now (&send_open_ack,
1030 /* give client it's initial supply of ACKs */
1031 GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
1032 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1033 for (unsigned int i=0;i<ch->max_pending_messages;i++)
1034 send_ack_to_client (ch,
1040 * One of our clients has disconnected, tell the other one that we
1041 * are finished. Done asynchronously to avoid concurrent modification
1042 * issues if this is the same client.
1044 * @param cls the `struct CadetChannel` where one of the ends is now dead
1047 signal_remote_destroy_cb (void *cls)
1049 struct CadetChannel *ch = cls;
1050 struct CadetChannelClient *ccc;
1052 /* Find which end is left... */
1053 ch->retry_control_task = NULL;
1054 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1055 GSC_handle_remote_channel_destroy (ccc->c,
1058 channel_destroy (ch);
1063 * Destroy locally created channel. Called by the local client, so no
1064 * need to tell the client.
1066 * @param ch channel to destroy
1067 * @param c client that caused the destruction
1068 * @param ccn client number of the client @a c
1071 GCCH_channel_local_destroy (struct CadetChannel *ch,
1072 struct CadetClient *c,
1073 struct GNUNET_CADET_ClientChannelNumber ccn)
1075 LOG (GNUNET_ERROR_TYPE_DEBUG,
1076 "%s asks for destruction of %s\n",
1079 GNUNET_assert (NULL != c);
1080 if ( (NULL != ch->owner) &&
1081 (c == ch->owner->c) &&
1082 (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
1084 free_channel_client (ch->owner);
1087 else if ( (NULL != ch->dest) &&
1088 (c == ch->dest->c) &&
1089 (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
1091 free_channel_client (ch->dest);
1099 if (GNUNET_YES == ch->destroy)
1101 /* other end already destroyed, with the local client gone, no need
1102 to finish transmissions, just destroy immediately. */
1103 channel_destroy (ch);
1106 if ( (NULL != ch->head_sent) &&
1107 ( (NULL != ch->owner) ||
1108 (NULL != ch->dest) ) )
1110 /* Wait for other end to destroy us as well,
1111 and otherwise allow send queue to be transmitted first */
1112 ch->destroy = GNUNET_YES;
1115 if ( (GNUNET_YES == ch->is_loopback) &&
1116 ( (NULL != ch->owner) ||
1117 (NULL != ch->dest) ) )
1119 if (NULL != ch->retry_control_task)
1120 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1121 ch->retry_control_task
1122 = GNUNET_SCHEDULER_add_now (&signal_remote_destroy_cb,
1126 if (GNUNET_NO == ch->is_loopback)
1128 /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1131 case CADET_CHANNEL_NEW:
1132 /* We gave up on a channel that we created as a client to a remote
1133 target, but that never went anywhere. Nothing to do here. */
1135 case CADET_CHANNEL_LOOSE:
1136 GSC_drop_loose_channel (&ch->h_port,
1140 GCT_send_channel_destroy (ch->t,
1144 /* Nothing left to do, just finish destruction */
1145 channel_destroy (ch);
1150 * We got an acknowledgement for the creation of the channel
1151 * (the port is open on the other side). Verify that the
1152 * other end really has the right port, and begin transmissions.
1154 * @param ch channel to destroy
1155 * @param cti identifier of the connection that delivered the message
1156 * @param port port number (needed to verify receiver knows the port)
1159 GCCH_handle_channel_open_ack (struct CadetChannel *ch,
1160 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1161 const struct GNUNET_HashCode *port)
1165 case CADET_CHANNEL_NEW:
1166 /* this should be impossible */
1169 case CADET_CHANNEL_LOOSE:
1170 /* This makes no sense. */
1171 GNUNET_break_op (0);
1173 case CADET_CHANNEL_OPEN_SENT:
1174 if (NULL == ch->owner)
1176 /* We're not the owner, wrong direction! */
1177 GNUNET_break_op (0);
1180 if (0 != memcmp (&ch->port,
1182 sizeof (struct GNUNET_HashCode)))
1184 /* Other peer failed to provide the right port,
1185 refuse connection. */
1186 GNUNET_break_op (0);
1189 LOG (GNUNET_ERROR_TYPE_DEBUG,
1190 "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1192 if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1194 GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1195 ch->retry_control_task = NULL;
1197 ch->state = CADET_CHANNEL_READY;
1198 /* On first connect, send client as many ACKs as we allow messages
1200 for (unsigned int i=0;i<ch->max_pending_messages;i++)
1201 send_ack_to_client (ch,
1204 case CADET_CHANNEL_READY:
1205 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1206 LOG (GNUNET_ERROR_TYPE_DEBUG,
1207 "Received duplicate channel OPEN_ACK for %s\n",
1209 GNUNET_STATISTICS_update (stats,
1210 "# duplicate CREATE_ACKs",
1219 * Test if element @a e1 comes before element @a e2.
1221 * @param cls closure, to a flag where we indicate duplicate packets
1222 * @param m1 a message of to sort
1223 * @param m2 another message to sort
1224 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1227 is_before (void *cls,
1228 struct CadetOutOfOrderMessage *m1,
1229 struct CadetOutOfOrderMessage *m2)
1231 int *duplicate = cls;
1232 uint32_t v1 = ntohl (m1->mid.mid);
1233 uint32_t v2 = ntohl (m2->mid.mid);
1238 *duplicate = GNUNET_YES;
1239 if (delta > (uint32_t) INT_MAX)
1241 /* in overflow range, we can safely assume we wrapped around */
1246 /* result is small, thus v2 > v1, thus m1 < m2 */
1253 * We got payload data for a channel. Pass it on to the client
1254 * and send an ACK to the other end (once flow control allows it!)
1256 * @param ch channel that got data
1257 * @param cti identifier of the connection that delivered the message
1258 * @param msg message that was received
1261 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1262 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1263 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1265 struct GNUNET_MQ_Envelope *env;
1266 struct GNUNET_CADET_LocalData *ld;
1267 struct CadetChannelClient *ccc;
1268 size_t payload_size;
1269 struct CadetOutOfOrderMessage *com;
1276 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1277 if ( (NULL == ch->owner) &&
1278 (NULL == ch->dest) )
1280 /* This client is gone, but we still have messages to send to
1281 the other end (which is why @a ch is not yet dead). However,
1282 we cannot pass messages to our client anymore. */
1283 LOG (GNUNET_ERROR_TYPE_DEBUG,
1284 "Dropping incoming payload on %s as this end is already closed\n",
1286 /* send back DESTROY notification to stop further retransmissions! */
1287 if (GNUNET_YES == ch->destroy)
1288 GCT_send_channel_destroy (ch->t,
1292 payload_size = ntohs (msg->header.size) - sizeof (*msg);
1293 env = GNUNET_MQ_msg_extra (ld,
1295 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1296 ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1297 GNUNET_memcpy (&ld[1],
1300 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1301 if ( (GNUNET_YES == ccc->client_ready) &&
1302 ( (GNUNET_YES == ch->out_of_order) ||
1303 (msg->mid.mid == ch->mid_recv.mid) ) )
1305 LOG (GNUNET_ERROR_TYPE_DEBUG,
1306 "Giving %u bytes of payload with MID %u from %s to client %s\n",
1307 (unsigned int) payload_size,
1308 ntohl (msg->mid.mid),
1311 ccc->client_ready = GNUNET_NO;
1312 GSC_send_to_client (ccc->c,
1314 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1315 ch->mid_futures >>= 1;
1316 send_channel_data_ack (ch);
1320 if (GNUNET_YES == ch->reliable)
1322 /* check if message ought to be dropped because it is ancient/too distant/duplicate */
1323 mid_min = ntohl (ch->mid_recv.mid);
1324 mid_max = mid_min + ch->max_pending_messages;
1325 mid_msg = ntohl (msg->mid.mid);
1326 if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
1327 ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
1329 LOG (GNUNET_ERROR_TYPE_DEBUG,
1330 "%s at %u drops ancient or far-future message %u\n",
1332 (unsigned int) mid_min,
1333 ntohl (msg->mid.mid));
1335 GNUNET_STATISTICS_update (stats,
1336 "# duplicate DATA (ancient or future)",
1339 GNUNET_MQ_discard (env);
1340 send_channel_data_ack (ch);
1343 /* mark bit for future ACKs */
1344 delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
1347 if (0 != (ch->mid_futures & (1LLU << delta)))
1349 /* Duplicate within the queue, drop also */
1350 LOG (GNUNET_ERROR_TYPE_DEBUG,
1351 "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1352 (unsigned int) payload_size,
1354 ntohl (msg->mid.mid));
1355 GNUNET_STATISTICS_update (stats,
1359 GNUNET_MQ_discard (env);
1360 send_channel_data_ack (ch);
1363 ch->mid_futures |= (1LLU << delta);
1364 LOG (GNUNET_ERROR_TYPE_DEBUG,
1365 "Marked bit %llX for mid %u (base: %u); now: %llX\n",
1372 else /* ! ch->reliable */
1374 /* Channel is unreliable, so we do not ACK. But we also cannot
1375 allow buffering everything, so check if we have space... */
1376 if (ccc->num_recv >= ch->max_pending_messages)
1378 struct CadetOutOfOrderMessage *drop;
1380 /* Yep, need to drop. Drop the oldest message in
1382 LOG (GNUNET_ERROR_TYPE_DEBUG,
1383 "Queue full due slow client on %s, dropping oldest message\n",
1385 GNUNET_STATISTICS_update (stats,
1386 "# messages dropped due to slow client",
1389 drop = ccc->head_recv;
1390 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1394 GNUNET_MQ_discard (drop->env);
1399 /* Insert message into sorted out-of-order queue */
1400 com = GNUNET_new (struct CadetOutOfOrderMessage);
1401 com->mid = msg->mid;
1403 duplicate = GNUNET_NO;
1404 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1411 if (GNUNET_YES == duplicate)
1413 /* Duplicate within the queue, drop also (this is not covered by
1414 the case above if "delta" >= 64, which could be the case if
1415 max_pending_messages is also >= 64 or if our client is unready
1416 and we are seeing retransmissions of the message our client is
1418 LOG (GNUNET_ERROR_TYPE_DEBUG,
1419 "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1420 (unsigned int) payload_size,
1422 ntohl (msg->mid.mid));
1423 GNUNET_STATISTICS_update (stats,
1427 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1431 GNUNET_MQ_discard (com->env);
1433 send_channel_data_ack (ch);
1436 LOG (GNUNET_ERROR_TYPE_DEBUG,
1437 "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1438 (GNUNET_YES == ccc->client_ready)
1440 : "client-not-ready",
1441 (unsigned int) payload_size,
1443 ntohl (ccc->ccn.channel_of_client),
1445 ntohl (msg->mid.mid),
1446 ntohl (ch->mid_recv.mid));
1447 /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
1448 the sender may already be transmitting the previous one. Needs
1449 experimental evaluation to see if/when this ACK helps or
1450 hurts. (We might even want another option.) */
1451 send_channel_data_ack (ch);
1456 * Function called once the tunnel has sent one of our messages.
1457 * If the message is unreliable, simply frees the `crm`. If the
1458 * message was reliable, calculate retransmission time and
1459 * wait for ACK (or retransmit).
1461 * @param cls the `struct CadetReliableMessage` that was sent
1462 * @param cid identifier of the connection within the tunnel, NULL
1463 * if transmission failed
1466 data_sent_cb (void *cls,
1467 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid);
1471 * We need to retry a transmission, the last one took too long to
1474 * @param cls the `struct CadetChannel` where we need to retransmit
1477 retry_transmission (void *cls)
1479 struct CadetChannel *ch = cls;
1480 struct CadetReliableMessage *crm = ch->head_sent;
1482 ch->retry_data_task = NULL;
1483 GNUNET_assert (NULL == crm->qe);
1484 LOG (GNUNET_ERROR_TYPE_DEBUG,
1485 "Retrying transmission on %s of message %u\n",
1487 (unsigned int) ntohl (crm->data_message->mid.mid));
1488 crm->qe = GCT_send (ch->t,
1489 &crm->data_message->header,
1492 GNUNET_assert (NULL == ch->retry_data_task);
1497 * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1498 * the queue and tell our client that it can send more.
1500 * @param ch the channel that got the PLAINTEXT_DATA_ACK
1501 * @param cti identifier of the connection that delivered the message
1502 * @param crm the message that got acknowledged
1505 handle_matching_ack (struct CadetChannel *ch,
1506 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1507 struct CadetReliableMessage *crm)
1509 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1512 ch->pending_messages--;
1513 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1514 LOG (GNUNET_ERROR_TYPE_DEBUG,
1515 "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1517 (unsigned int) ntohl (crm->data_message->mid.mid),
1518 ch->pending_messages);
1519 if (NULL != crm->qe)
1521 GCT_send_cancel (crm->qe);
1524 if ( (1 == crm->num_transmissions) &&
1527 GCC_ack_observed (cti);
1528 if (0 == memcmp (cti,
1529 &crm->connection_taken,
1530 sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier)))
1532 GCC_latency_observed (cti,
1533 GNUNET_TIME_absolute_get_duration (crm->first_transmission_time));
1536 GNUNET_free (crm->data_message);
1538 send_ack_to_client (ch,
1546 * We got an acknowledgement for payload data for a channel.
1547 * Possibly resume transmissions.
1549 * @param ch channel that got the ack
1550 * @param cti identifier of the connection that delivered the message
1551 * @param ack details about what was received
1554 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1555 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1556 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1558 struct CadetReliableMessage *crm;
1559 struct CadetReliableMessage *crmn;
1565 GNUNET_break (GNUNET_NO == ch->is_loopback);
1566 if (GNUNET_NO == ch->reliable)
1568 /* not expecting ACKs on unreliable channel, odd */
1569 GNUNET_break_op (0);
1572 /* mid_base is the MID of the next message that the
1573 other peer expects (i.e. that is missing!), everything
1574 LOWER (but excluding mid_base itself) was received. */
1575 mid_base = ntohl (ack->mid.mid);
1576 mid_mask = GNUNET_htonll (ack->futures);
1578 for (crm = ch->head_sent;
1583 delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
1584 if (delta >= UINT_MAX - ch->max_pending_messages)
1586 /* overflow, means crm was a bit in the past, so this ACK counts for it. */
1587 LOG (GNUNET_ERROR_TYPE_DEBUG,
1588 "Got DATA_ACK with base %u satisfying past message %u on %s\n",
1589 (unsigned int) mid_base,
1590 ntohl (crm->data_message->mid.mid),
1592 handle_matching_ack (ch,
1601 LOG (GNUNET_ERROR_TYPE_DEBUG,
1602 "Testing bit %llX for mid %u (base: %u)\n",
1604 ntohl (crm->data_message->mid.mid),
1606 if (0 != (mid_mask & (1LLU << delta)))
1608 LOG (GNUNET_ERROR_TYPE_DEBUG,
1609 "Got DATA_ACK with mask for %u on %s\n",
1610 ntohl (crm->data_message->mid.mid),
1612 handle_matching_ack (ch,
1618 if (GNUNET_NO == found)
1620 /* ACK for message we already dropped, might have been a
1621 duplicate ACK? Ignore. */
1622 LOG (GNUNET_ERROR_TYPE_DEBUG,
1623 "Duplicate DATA_ACK on %s, ignoring\n",
1625 GNUNET_STATISTICS_update (stats,
1626 "# duplicate DATA_ACKs",
1631 if (NULL != ch->retry_data_task)
1633 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1634 ch->retry_data_task = NULL;
1636 if ( (NULL != ch->head_sent) &&
1637 (NULL == ch->head_sent->qe) )
1639 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1640 &retry_transmission,
1646 * Destroy channel, based on the other peer closing the
1647 * connection. Also needs to remove this channel from
1650 * @param ch channel to destroy
1651 * @param cti identifier of the connection that delivered the message,
1652 * NULL if we are simulating receiving a destroy due to shutdown
1655 GCCH_handle_remote_destroy (struct CadetChannel *ch,
1656 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1658 struct CadetChannelClient *ccc;
1660 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1661 LOG (GNUNET_ERROR_TYPE_DEBUG,
1662 "Received remote channel DESTROY for %s\n",
1664 if (GNUNET_YES == ch->destroy)
1666 /* Local client already gone, this is instant-death. */
1667 channel_destroy (ch);
1670 ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1671 if ( (NULL != ccc) &&
1672 (NULL != ccc->head_recv) )
1674 LOG (GNUNET_ERROR_TYPE_WARNING,
1675 "Lost end of transmission due to remote shutdown on %s\n",
1677 /* FIXME: change API to notify client about truncated transmission! */
1679 ch->destroy = GNUNET_YES;
1681 GSC_handle_remote_channel_destroy (ccc->c,
1684 channel_destroy (ch);
1689 * Test if element @a e1 comes before element @a e2.
1691 * @param cls closure, to a flag where we indicate duplicate packets
1692 * @param crm1 an element of to sort
1693 * @param crm2 another element to sort
1694 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1697 cmp_crm_by_next_retry (void *cls,
1698 struct CadetReliableMessage *crm1,
1699 struct CadetReliableMessage *crm2)
1701 if (crm1->next_retry.abs_value_us <
1702 crm2->next_retry.abs_value_us)
1709 * Function called once the tunnel has sent one of our messages.
1710 * If the message is unreliable, simply frees the `crm`. If the
1711 * message was reliable, calculate retransmission time and
1712 * wait for ACK (or retransmit).
1714 * @param cls the `struct CadetReliableMessage` that was sent
1715 * @param cid identifier of the connection within the tunnel, NULL
1716 * if transmission failed
1719 data_sent_cb (void *cls,
1720 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
1722 struct CadetReliableMessage *crm = cls;
1723 struct CadetChannel *ch = crm->ch;
1725 GNUNET_assert (GNUNET_NO == ch->is_loopback);
1726 GNUNET_assert (NULL != crm->qe);
1728 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1731 if (GNUNET_NO == ch->reliable)
1733 GNUNET_free (crm->data_message);
1735 ch->pending_messages--;
1736 send_ack_to_client (ch,
1744 /* There was an error sending. */
1745 crm->num_transmissions = GNUNET_SYSERR;
1747 else if (GNUNET_SYSERR != crm->num_transmissions)
1749 /* Increment transmission counter, and possibly store @a cid
1750 if this was the first transmission. */
1751 crm->num_transmissions++;
1752 if (1 == crm->num_transmissions)
1754 crm->first_transmission_time = GNUNET_TIME_absolute_get ();
1755 crm->connection_taken = *cid;
1756 GCC_ack_expected (cid);
1759 if ( (0 == crm->retry_delay.rel_value_us) &&
1762 struct CadetConnection *cc = GCC_lookup (cid);
1765 crm->retry_delay = GCC_get_metrics (cc)->aged_latency;
1767 crm->retry_delay = ch->retry_time;
1769 crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1770 crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1772 crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1774 GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1775 cmp_crm_by_next_retry,
1780 LOG (GNUNET_ERROR_TYPE_DEBUG,
1781 "Message %u sent, next transmission on %s in %s\n",
1782 (unsigned int) ntohl (crm->data_message->mid.mid),
1784 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1786 if (NULL == ch->head_sent->qe)
1788 if (NULL != ch->retry_data_task)
1789 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1791 = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1792 &retry_transmission,
1799 * Handle data given by a client.
1801 * Check whether the client is allowed to send in this tunnel, save if
1802 * channel is reliable and send an ACK to the client if there is still
1803 * buffer space in the tunnel.
1805 * @param ch Channel.
1806 * @param sender_ccn ccn of the sender
1807 * @param buf payload to transmit.
1808 * @param buf_len number of bytes in @a buf
1809 * @return #GNUNET_OK if everything goes well,
1810 * #GNUNET_SYSERR in case of an error.
1813 GCCH_handle_local_data (struct CadetChannel *ch,
1814 struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1818 struct CadetReliableMessage *crm;
1820 if (ch->pending_messages >= ch->max_pending_messages)
1823 return GNUNET_SYSERR;
1825 if (GNUNET_YES == ch->destroy)
1827 /* we are going down, drop messages */
1830 ch->pending_messages++;
1832 if (GNUNET_YES == ch->is_loopback)
1834 struct CadetChannelClient *receiver;
1835 struct GNUNET_MQ_Envelope *env;
1836 struct GNUNET_CADET_LocalData *ld;
1839 env = GNUNET_MQ_msg_extra (ld,
1841 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1842 if ( (NULL != ch->owner) &&
1843 (sender_ccn.channel_of_client ==
1844 ch->owner->ccn.channel_of_client) )
1846 receiver = ch->dest;
1847 ack_to_owner = GNUNET_YES;
1849 else if ( (NULL != ch->dest) &&
1850 (sender_ccn.channel_of_client ==
1851 ch->dest->ccn.channel_of_client) )
1853 receiver = ch->owner;
1854 ack_to_owner = GNUNET_NO;
1859 return GNUNET_SYSERR;
1861 GNUNET_assert (NULL != receiver);
1862 ld->ccn = receiver->ccn;
1863 GNUNET_memcpy (&ld[1],
1866 if (GNUNET_YES == receiver->client_ready)
1868 ch->pending_messages--;
1869 GSC_send_to_client (receiver->c,
1871 send_ack_to_client (ch,
1876 struct CadetOutOfOrderMessage *oom;
1878 oom = GNUNET_new (struct CadetOutOfOrderMessage);
1880 GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1881 receiver->tail_recv,
1883 receiver->num_recv++;
1888 /* Everything is correct, send the message. */
1889 crm = GNUNET_malloc (sizeof (*crm));
1891 crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1893 crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1894 crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1895 ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1896 crm->data_message->mid = ch->mid_send;
1897 crm->data_message->ctn = ch->ctn;
1898 GNUNET_memcpy (&crm->data_message[1],
1901 GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1904 LOG (GNUNET_ERROR_TYPE_DEBUG,
1905 "Sending message %u from local client to %s with %u bytes\n",
1906 ntohl (crm->data_message->mid.mid),
1909 if (NULL != ch->retry_data_task)
1911 GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1912 ch->retry_data_task = NULL;
1914 crm->qe = GCT_send (ch->t,
1915 &crm->data_message->header,
1918 GNUNET_assert (NULL == ch->retry_data_task);
1924 * Handle ACK from client on local channel. Means the client is ready
1925 * for more data, see if we have any for it.
1927 * @param ch channel to destroy
1928 * @param client_ccn ccn of the client sending the ack
1931 GCCH_handle_local_ack (struct CadetChannel *ch,
1932 struct GNUNET_CADET_ClientChannelNumber client_ccn)
1934 struct CadetChannelClient *ccc;
1935 struct CadetOutOfOrderMessage *com;
1937 if ( (NULL != ch->owner) &&
1938 (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1940 else if ( (NULL != ch->dest) &&
1941 (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1945 ccc->client_ready = GNUNET_YES;
1946 com = ccc->head_recv;
1949 LOG (GNUNET_ERROR_TYPE_DEBUG,
1950 "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1952 ntohl (client_ccn.channel_of_client),
1954 ntohl (ccc->ccn.channel_of_client),
1956 return; /* none pending */
1958 if (GNUNET_YES == ch->is_loopback)
1962 /* Messages are always in-order, just send */
1963 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1967 GSC_send_to_client (ccc->c,
1969 /* Notify sender that we can receive more */
1970 if ( (NULL != ch->owner) &&
1971 (ccc->ccn.channel_of_client ==
1972 ch->owner->ccn.channel_of_client) )
1974 to_owner = GNUNET_NO;
1978 GNUNET_assert ( (NULL != ch->dest) &&
1979 (ccc->ccn.channel_of_client ==
1980 ch->dest->ccn.channel_of_client) );
1981 to_owner = GNUNET_YES;
1983 send_ack_to_client (ch,
1989 if ( (com->mid.mid != ch->mid_recv.mid) &&
1990 (GNUNET_NO == ch->out_of_order) &&
1991 (GNUNET_YES == ch->reliable) )
1993 LOG (GNUNET_ERROR_TYPE_DEBUG,
1994 "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1996 ntohl (ccc->ccn.channel_of_client),
1997 ntohl (com->mid.mid),
1998 ntohl (ch->mid_recv.mid));
1999 return; /* missing next one in-order */
2002 LOG (GNUNET_ERROR_TYPE_DEBUG,
2003 "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
2004 ntohl (com->mid.mid),
2006 ntohl (ccc->ccn.channel_of_client),
2009 /* all good, pass next message to client */
2010 GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
2014 /* FIXME: if unreliable, this is not aggressive
2015 enough, as it would be OK to have lost some! */
2017 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
2018 ch->mid_futures >>= 1; /* equivalent to division by 2 */
2019 ccc->client_ready = GNUNET_NO;
2020 GSC_send_to_client (ccc->c,
2023 send_channel_data_ack (ch);
2024 if (NULL != ccc->head_recv)
2026 if (GNUNET_NO == ch->destroy)
2028 GCT_send_channel_destroy (ch->t,
2030 channel_destroy (ch);
2034 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
2040 * @param ch Channel.
2041 * @param level Debug level to use.
2044 GCCH_debug (struct CadetChannel *ch,
2045 enum GNUNET_ErrorType level)
2047 #if !defined(GNUNET_CULL_LOGGING)
2050 do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
2052 __FILE__, __FUNCTION__, __LINE__);
2058 LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
2066 if (NULL != ch->owner)
2069 "CHN origin %s ready %s local-id: %u\n",
2070 GSC_2s (ch->owner->c),
2071 ch->owner->client_ready ? "YES" : "NO",
2072 ntohl (ch->owner->ccn.channel_of_client));
2074 if (NULL != ch->dest)
2077 "CHN destination %s ready %s local-id: %u\n",
2078 GSC_2s (ch->dest->c),
2079 ch->dest->client_ready ? "YES" : "NO",
2080 ntohl (ch->dest->ccn.channel_of_client));
2083 "CHN Message IDs recv: %d (%LLX), send: %d\n",
2084 ntohl (ch->mid_recv.mid),
2085 (unsigned long long) ch->mid_futures,
2086 ntohl (ch->mid_send.mid));
2092 /* end of gnunet-service-cadet-new_channel.c */