implement random packet drop option, fix retransmission logic
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - Optimize ACKs by using 'mid_futures' properly!
29  * - calculate current RTT if possible, use that for initial retransmissions
30  *   (NOTE: needs us to learn which connection the tunnel uses for the message!)
31  * - introduce shutdown so we can have half-closed channels, modify
32  *   destroy to include MID to have FIN-ACK equivalents, etc.
33  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
34  *   (and figure out how/where to use this!)
35  * - check that '0xFFULL' really is sufficient for flow control!
36  *   (this is right now a big HACK!)
37  * - revisit handling of 'unreliable' traffic!
38  *   (has not seen enough review)
39  * - revisit handling of 'unbuffered' traffic!
40  *   (has not seen enough review)
41  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
42  * - figure out flow control without ACKs (unreliable traffic!)
43  */
44 #include "platform.h"
45 #include "gnunet_util_lib.h"
46 #include "cadet.h"
47 #include "gnunet_statistics_service.h"
48 #include "gnunet-service-cadet-new.h"
49 #include "gnunet-service-cadet-new_channel.h"
50 #include "gnunet-service-cadet-new_connection.h"
51 #include "gnunet-service-cadet-new_tunnels.h"
52 #include "gnunet-service-cadet-new_peer.h"
53 #include "gnunet-service-cadet-new_paths.h"
54
55 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
56
57 /**
58  * How long do we initially wait before retransmitting?
59  */
60 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
61
62 /**
63  * How long do we wait before dropping state about incoming
64  * connection to closed port?
65  */
66 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
67
68 /**
69  * How long do we wait at least before retransmitting ever?
70  */
71 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
72
73 /**
74  * Maximum message ID into the future we accept for out-of-order messages.
75  * If the message is more than this into the future, we drop it.  This is
76  * important both to detect values that are actually in the past, as well
77  * as to limit adversarially triggerable memory consumption.
78  *
79  * Note that right now we have "max_pending_messages = 4" hard-coded in
80  * the logic below, so a value of 4 would suffice here. But we plan to
81  * allow larger windows in the future...
82  */
83 #define MAX_OUT_OF_ORDER_DISTANCE 1024
84
85
86 /**
87  * All the states a connection can be in.
88  */
89 enum CadetChannelState
90 {
91   /**
92    * Uninitialized status, should never appear in operation.
93    */
94   CADET_CHANNEL_NEW,
95
96   /**
97    * Connection create message sent, waiting for ACK.
98    */
99   CADET_CHANNEL_OPEN_SENT,
100
101   /**
102    * Connection confirmed, ready to carry traffic.
103    */
104   CADET_CHANNEL_READY
105 };
106
107
108 /**
109  * Info needed to retry a message in case it gets lost.
110  * Note that we DO use this structure also for unreliable
111  * messages.
112  */
113 struct CadetReliableMessage
114 {
115   /**
116    * Double linked list, FIFO style
117    */
118   struct CadetReliableMessage *next;
119
120   /**
121    * Double linked list, FIFO style
122    */
123   struct CadetReliableMessage *prev;
124
125   /**
126    * Which channel is this message in?
127    */
128   struct CadetChannel *ch;
129
130   /**
131    * Entry in the tunnels queue for this message, NULL if it has left
132    * the tunnel.  Used to cancel transmission in case we receive an
133    * ACK in time.
134    */
135   struct CadetTunnelQueueEntry *qe;
136
137   /**
138    * How soon should we retry if we fail to get an ACK?
139    * Messages in the queue are sorted by this value.
140    */
141   struct GNUNET_TIME_Absolute next_retry;
142
143   /**
144    * How long do we wait for an ACK after transmission?
145    * Use for the back-off calculation.
146    */
147   struct GNUNET_TIME_Relative retry_delay;
148
149   /**
150    * Data message we are trying to send.
151    */
152   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
153
154 };
155
156
157 /**
158  * List of received out-of-order data messages.
159  */
160 struct CadetOutOfOrderMessage
161 {
162   /**
163    * Double linked list, FIFO style
164    */
165   struct CadetOutOfOrderMessage *next;
166
167   /**
168    * Double linked list, FIFO style
169    */
170   struct CadetOutOfOrderMessage *prev;
171
172   /**
173    * ID of the message (messages up to this point needed
174    * before we give this one to the client).
175    */
176   struct ChannelMessageIdentifier mid;
177
178   /**
179    * The envelope with the payload of the out-of-order message
180    */
181   struct GNUNET_MQ_Envelope *env;
182
183 };
184
185
186 /**
187  * Client endpoint of a `struct CadetChannel`.  A channel may be a
188  * loopback channel, in which case it has two of these endpoints.
189  * Note that flow control also is required in both directions.
190  */
191 struct CadetChannelClient
192 {
193   /**
194    * Client handle.  Not by itself sufficient to designate
195    * the client endpoint, as the same client handle may
196    * be used for both the owner and the destination, and
197    * we thus also need the channel ID to identify the client.
198    */
199   struct CadetClient *c;
200
201   /**
202    * Head of DLL of messages received out of order or while client was unready.
203    */
204   struct CadetOutOfOrderMessage *head_recv;
205
206   /**
207    * Tail DLL of messages received out of order or while client was unready.
208    */
209   struct CadetOutOfOrderMessage *tail_recv;
210
211   /**
212    * Local tunnel number for this client.
213    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
214    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
215    */
216   struct GNUNET_CADET_ClientChannelNumber ccn;
217
218   /**
219    * Can we send data to the client?
220    */
221   int client_ready;
222
223 };
224
225
226 /**
227  * Struct containing all information regarding a channel to a remote client.
228  */
229 struct CadetChannel
230 {
231   /**
232    * Tunnel this channel is in.
233    */
234   struct CadetTunnel *t;
235
236   /**
237    * Client owner of the tunnel, if any.
238    * (Used if this channel represends the initiating end of the tunnel.)
239    */
240   struct CadetChannelClient *owner;
241
242   /**
243    * Client destination of the tunnel, if any.
244    * (Used if this channel represents the listening end of the tunnel.)
245    */
246   struct CadetChannelClient *dest;
247
248   /**
249    * Last entry in the tunnel's queue relating to control messages
250    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
251    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
252    * transmission in case we receive updated information.
253    */
254   struct CadetTunnelQueueEntry *last_control_qe;
255
256   /**
257    * Head of DLL of messages sent and not yet ACK'd.
258    */
259   struct CadetReliableMessage *head_sent;
260
261   /**
262    * Tail of DLL of messages sent and not yet ACK'd.
263    */
264   struct CadetReliableMessage *tail_sent;
265
266   /**
267    * Task to resend/poll in case no ACK is received.
268    */
269   struct GNUNET_SCHEDULER_Task *retry_control_task;
270
271   /**
272    * Task to resend/poll in case no ACK is received.
273    */
274   struct GNUNET_SCHEDULER_Task *retry_data_task;
275
276   /**
277    * Last time the channel was used
278    */
279   struct GNUNET_TIME_Absolute timestamp;
280
281   /**
282    * Destination port of the channel.
283    */
284   struct GNUNET_HashCode port;
285
286   /**
287    * Counter for exponential backoff.
288    */
289   struct GNUNET_TIME_Relative retry_time;
290
291   /**
292    * How long does it usually take to get an ACK.
293    */
294   struct GNUNET_TIME_Relative expected_delay;
295
296   /**
297    * Bitfield of already-received messages past @e mid_recv.
298    *
299    * FIXME: not yet properly used (bits here are never set!)
300    */
301   uint64_t mid_futures;
302
303   /**
304    * Next MID expected for incoming traffic.
305    */
306   struct ChannelMessageIdentifier mid_recv;
307
308   /**
309    * Next MID to use for outgoing traffic.
310    */
311   struct ChannelMessageIdentifier mid_send;
312
313   /**
314    * Total (reliable) messages pending ACK for this channel.
315    */
316   unsigned int pending_messages;
317
318   /**
319    * Maximum (reliable) messages pending ACK for this channel
320    * before we throttle the client.
321    */
322   unsigned int max_pending_messages;
323
324   /**
325    * Number identifying this channel in its tunnel.
326    */
327   struct GNUNET_CADET_ChannelTunnelNumber ctn;
328
329   /**
330    * Channel state.
331    */
332   enum CadetChannelState state;
333
334   /**
335    * Is the tunnel bufferless (minimum latency)?
336    */
337   int nobuffer;
338
339   /**
340    * Is the tunnel reliable?
341    */
342   int reliable;
343
344   /**
345    * Is the tunnel out-of-order?
346    */
347   int out_of_order;
348
349   /**
350    * Is this channel a loopback channel, where the destination is us again?
351    */
352   int is_loopback;
353
354   /**
355    * Flag to signal the destruction of the channel.  If this is set to
356    * #GNUNET_YES the channel will be destroyed once the queue is
357    * empty.
358    */
359   int destroy;
360
361 };
362
363
364 /**
365  * Get the static string for identification of the channel.
366  *
367  * @param ch Channel.
368  *
369  * @return Static string with the channel IDs.
370  */
371 const char *
372 GCCH_2s (const struct CadetChannel *ch)
373 {
374   static char buf[128];
375
376   GNUNET_snprintf (buf,
377                    sizeof (buf),
378                    "Channel %s:%s ctn:%X(%X/%X)",
379                    (GNUNET_YES == ch->is_loopback)
380                    ? "loopback"
381                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
382                    GNUNET_h2s (&ch->port),
383                    ch->ctn,
384                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
385                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
386   return buf;
387 }
388
389
390 /**
391  * Get the channel's public ID.
392  *
393  * @param ch Channel.
394  *
395  * @return ID used to identify the channel with the remote peer.
396  */
397 struct GNUNET_CADET_ChannelTunnelNumber
398 GCCH_get_id (const struct CadetChannel *ch)
399 {
400   return ch->ctn;
401 }
402
403
404 /**
405  * Release memory associated with @a ccc
406  *
407  * @param ccc data structure to clean up
408  */
409 static void
410 free_channel_client (struct CadetChannelClient *ccc)
411 {
412   struct CadetOutOfOrderMessage *com;
413
414   while (NULL != (com = ccc->head_recv))
415   {
416     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
417                                  ccc->tail_recv,
418                                  com);
419     GNUNET_MQ_discard (com->env);
420     GNUNET_free (com);
421   }
422   GNUNET_free (ccc);
423 }
424
425
426 /**
427  * Destroy the given channel.
428  *
429  * @param ch channel to destroy
430  */
431 static void
432 channel_destroy (struct CadetChannel *ch)
433 {
434   struct CadetReliableMessage *crm;
435
436   while (NULL != (crm = ch->head_sent))
437   {
438     GNUNET_assert (ch == crm->ch);
439     if (NULL != crm->qe)
440     {
441       GCT_send_cancel (crm->qe);
442       crm->qe = NULL;
443     }
444     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
445                                  ch->tail_sent,
446                                  crm);
447     GNUNET_free (crm->data_message);
448     GNUNET_free (crm);
449   }
450   if (NULL != ch->owner)
451   {
452     free_channel_client (ch->owner);
453     ch->owner = NULL;
454   }
455   if (NULL != ch->dest)
456   {
457     free_channel_client (ch->dest);
458     ch->dest = NULL;
459   }
460   if (NULL != ch->last_control_qe)
461   {
462     GCT_send_cancel (ch->last_control_qe);
463     ch->last_control_qe = NULL;
464   }
465   if (NULL != ch->retry_data_task)
466   {
467     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
468     ch->retry_data_task = NULL;
469   }
470   if (NULL != ch->retry_control_task)
471   {
472     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
473     ch->retry_control_task = NULL;
474   }
475   if (GNUNET_NO == ch->is_loopback)
476   {
477     GCT_remove_channel (ch->t,
478                         ch,
479                         ch->ctn);
480     ch->t = NULL;
481   }
482   GNUNET_free (ch);
483 }
484
485
486 /**
487  * Send a channel create message.
488  *
489  * @param cls Channel for which to send.
490  */
491 static void
492 send_channel_open (void *cls);
493
494
495 /**
496  * Function called once the tunnel confirms that we sent the
497  * create message.  Delays for a bit until we retry.
498  *
499  * @param cls our `struct CadetChannel`.
500  */
501 static void
502 channel_open_sent_cb (void *cls)
503 {
504   struct CadetChannel *ch = cls;
505
506   GNUNET_assert (NULL != ch->last_control_qe);
507   ch->last_control_qe = NULL;
508   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
509   LOG (GNUNET_ERROR_TYPE_DEBUG,
510        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
511        GCCH_2s (ch),
512        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
513                                                GNUNET_YES));
514   ch->retry_control_task
515     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
516                                     &send_channel_open,
517                                     ch);
518 }
519
520
521 /**
522  * Send a channel open message.
523  *
524  * @param cls Channel for which to send.
525  */
526 static void
527 send_channel_open (void *cls)
528 {
529   struct CadetChannel *ch = cls;
530   struct GNUNET_CADET_ChannelOpenMessage msgcc;
531   uint32_t options;
532
533   ch->retry_control_task = NULL;
534   LOG (GNUNET_ERROR_TYPE_DEBUG,
535        "Sending CHANNEL_OPEN message for %s\n",
536        GCCH_2s (ch));
537   options = 0;
538   if (ch->nobuffer)
539     options |= GNUNET_CADET_OPTION_NOBUFFER;
540   if (ch->reliable)
541     options |= GNUNET_CADET_OPTION_RELIABLE;
542   if (ch->out_of_order)
543     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
544   msgcc.header.size = htons (sizeof (msgcc));
545   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
546   msgcc.opt = htonl (options);
547   msgcc.port = ch->port;
548   msgcc.ctn = ch->ctn;
549   ch->state = CADET_CHANNEL_OPEN_SENT;
550   ch->last_control_qe = GCT_send (ch->t,
551                                   &msgcc.header,
552                                   &channel_open_sent_cb,
553                                   ch);
554   GNUNET_assert (NULL == ch->retry_control_task);
555 }
556
557
558 /**
559  * Function called once and only once after a channel was bound
560  * to its tunnel via #GCT_add_channel() is ready for transmission.
561  * Note that this is only the case for channels that this peer
562  * initiates, as for incoming channels we assume that they are
563  * ready for transmission immediately upon receiving the open
564  * message.  Used to bootstrap the #GCT_send() process.
565  *
566  * @param ch the channel for which the tunnel is now ready
567  */
568 void
569 GCCH_tunnel_up (struct CadetChannel *ch)
570 {
571   GNUNET_assert (NULL == ch->retry_control_task);
572   LOG (GNUNET_ERROR_TYPE_DEBUG,
573        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
574        GCCH_2s (ch));
575   ch->retry_control_task
576     = GNUNET_SCHEDULER_add_now (&send_channel_open,
577                                 ch);
578 }
579
580
581 /**
582  * Create a new channel.
583  *
584  * @param owner local client owning the channel
585  * @param ccn local number of this channel at the @a owner
586  * @param destination peer to which we should build the channel
587  * @param port desired port at @a destination
588  * @param options options for the channel
589  * @return handle to the new channel
590  */
591 struct CadetChannel *
592 GCCH_channel_local_new (struct CadetClient *owner,
593                         struct GNUNET_CADET_ClientChannelNumber ccn,
594                         struct CadetPeer *destination,
595                         const struct GNUNET_HashCode *port,
596                         uint32_t options)
597 {
598   struct CadetChannel *ch;
599   struct CadetChannelClient *ccco;
600
601   ccco = GNUNET_new (struct CadetChannelClient);
602   ccco->c = owner;
603   ccco->ccn = ccn;
604   ccco->client_ready = GNUNET_YES;
605
606   ch = GNUNET_new (struct CadetChannel);
607   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
608   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
609   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
610   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
611   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
612   ch->owner = ccco;
613   ch->port = *port;
614   if (0 == memcmp (&my_full_id,
615                    GCP_get_id (destination),
616                    sizeof (struct GNUNET_PeerIdentity)))
617   {
618     struct CadetClient *c;
619
620     ch->is_loopback = GNUNET_YES;
621     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
622                                            port);
623     if (NULL == c)
624     {
625       /* port closed, wait for it to possibly open */
626       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
627                                                 port,
628                                                 ch,
629                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
630       LOG (GNUNET_ERROR_TYPE_DEBUG,
631            "Created loose incoming loopback channel to port %s\n",
632            GNUNET_h2s (&ch->port));
633     }
634     else
635     {
636       ch->dest = GNUNET_new (struct CadetChannelClient);
637       ch->dest->c = c;
638       ch->dest->client_ready = GNUNET_YES;
639       GCCH_bind (ch,
640                  ch->dest->c);
641     }
642   }
643   else
644   {
645     ch->t = GCP_get_tunnel (destination,
646                             GNUNET_YES);
647     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
648     ch->ctn = GCT_add_channel (ch->t,
649                                ch);
650   }
651   GNUNET_STATISTICS_update (stats,
652                             "# channels",
653                             1,
654                             GNUNET_NO);
655   LOG (GNUNET_ERROR_TYPE_DEBUG,
656        "Created channel to port %s at peer %s for %s using %s\n",
657        GNUNET_h2s (port),
658        GCP_2s (destination),
659        GSC_2s (owner),
660        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
661   return ch;
662 }
663
664
665 /**
666  * We had an incoming channel to a port that is closed.
667  * It has not been opened for a while, drop it.
668  *
669  * @param cls the channel to drop
670  */
671 static void
672 timeout_closed_cb (void *cls)
673 {
674   struct CadetChannel *ch = cls;
675
676   ch->retry_control_task = NULL;
677   LOG (GNUNET_ERROR_TYPE_DEBUG,
678        "Closing incoming channel to port %s from peer %s due to timeout\n",
679        GNUNET_h2s (&ch->port),
680        GCP_2s (GCT_get_destination (ch->t)));
681   channel_destroy (ch);
682 }
683
684
685 /**
686  * Create a new channel based on a request coming in over the network.
687  *
688  * @param t tunnel to the remote peer
689  * @param ctn identifier of this channel in the tunnel
690  * @param port desired local port
691  * @param options options for the channel
692  * @return handle to the new channel
693  */
694 struct CadetChannel *
695 GCCH_channel_incoming_new (struct CadetTunnel *t,
696                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
697                            const struct GNUNET_HashCode *port,
698                            uint32_t options)
699 {
700   struct CadetChannel *ch;
701   struct CadetClient *c;
702
703   ch = GNUNET_new (struct CadetChannel);
704   ch->port = *port;
705   ch->t = t;
706   ch->ctn = ctn;
707   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
708   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
709   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
710   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
711   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
712   GNUNET_STATISTICS_update (stats,
713                             "# channels",
714                             1,
715                             GNUNET_NO);
716
717   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
718                                          port);
719   if (NULL == c)
720   {
721     /* port closed, wait for it to possibly open */
722     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
723                                               port,
724                                               ch,
725                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
726     ch->retry_control_task
727       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
728                                       &timeout_closed_cb,
729                                       ch);
730     LOG (GNUNET_ERROR_TYPE_DEBUG,
731          "Created loose incoming channel to port %s from peer %s\n",
732          GNUNET_h2s (&ch->port),
733          GCP_2s (GCT_get_destination (ch->t)));
734   }
735   else
736   {
737     GCCH_bind (ch,
738                c);
739   }
740   GNUNET_STATISTICS_update (stats,
741                             "# channels",
742                             1,
743                             GNUNET_NO);
744   return ch;
745 }
746
747
748 /**
749  * Function called once the tunnel confirms that we sent the
750  * ACK message.  Just remembers it was sent, we do not expect
751  * ACKs for ACKs ;-).
752  *
753  * @param cls our `struct CadetChannel`.
754  */
755 static void
756 send_ack_cb (void *cls)
757 {
758   struct CadetChannel *ch = cls;
759
760   GNUNET_assert (NULL != ch->last_control_qe);
761   ch->last_control_qe = NULL;
762 }
763
764
765 /**
766  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
767  *
768  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
769  */
770 static void
771 send_channel_data_ack (struct CadetChannel *ch)
772 {
773   struct GNUNET_CADET_ChannelDataAckMessage msg;
774
775   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
776   msg.header.size = htons (sizeof (msg));
777   msg.ctn = ch->ctn;
778   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
779   msg.futures = GNUNET_htonll (ch->mid_futures);
780   if (NULL != ch->last_control_qe)
781     GCT_send_cancel (ch->last_control_qe);
782   LOG (GNUNET_ERROR_TYPE_DEBUG,
783        "Sending DATA_ACK %u:%llX via %s\n",
784        (unsigned int) ntohl (msg.mid.mid),
785        (unsigned long long) ch->mid_futures,
786        GCCH_2s (ch));
787   ch->last_control_qe = GCT_send (ch->t,
788                                   &msg.header,
789                                   &send_ack_cb,
790                                   ch);
791 }
792
793
794 /**
795  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
796  * connection is up.
797  *
798  * @param cls the `struct CadetChannel`
799  */
800 static void
801 send_open_ack (void *cls)
802 {
803   struct CadetChannel *ch = cls;
804   struct GNUNET_CADET_ChannelManageMessage msg;
805
806   LOG (GNUNET_ERROR_TYPE_DEBUG,
807        "Sending CHANNEL_OPEN_ACK on %s\n",
808        GCCH_2s (ch));
809   ch->retry_control_task = NULL;
810   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
811   msg.header.size = htons (sizeof (msg));
812   msg.reserved = htonl (0);
813   msg.ctn = ch->ctn;
814   if (NULL != ch->last_control_qe)
815     GCT_send_cancel (ch->last_control_qe);
816   ch->last_control_qe = GCT_send (ch->t,
817                                   &msg.header,
818                                   &send_ack_cb,
819                                   ch);
820 }
821
822
823 /**
824  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
825  * this channel.  If the binding was successful, (re)transmit the
826  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
827  *
828  * @param ch channel that got the duplicate open
829  */
830 void
831 GCCH_handle_duplicate_open (struct CadetChannel *ch)
832 {
833   if (NULL == ch->dest)
834   {
835     LOG (GNUNET_ERROR_TYPE_DEBUG,
836          "Ignoring duplicate channel OPEN on %s: port is closed\n",
837          GCCH_2s (ch));
838     return;
839   }
840   if (NULL != ch->retry_control_task)
841   {
842     LOG (GNUNET_ERROR_TYPE_DEBUG,
843          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
844          GCCH_2s (ch));
845     return;
846   }
847   LOG (GNUNET_ERROR_TYPE_DEBUG,
848        "Retransmitting OPEN_ACK on %s\n",
849        GCCH_2s (ch));
850   ch->retry_control_task
851     = GNUNET_SCHEDULER_add_now (&send_open_ack,
852                                 ch);
853 }
854
855
856 /**
857  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
858  *
859  * @param ch channel the ack is for
860  * @param to_owner #GNUNET_YES to send to owner,
861  *                 #GNUNET_NO to send to dest
862  */
863 static void
864 send_ack_to_client (struct CadetChannel *ch,
865                     int to_owner)
866 {
867   struct GNUNET_MQ_Envelope *env;
868   struct GNUNET_CADET_LocalAck *ack;
869   struct CadetChannelClient *ccc;
870
871   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
872   if (NULL == ccc)
873   {
874     /* This can happen if we are just getting ACKs after
875        our local client already disconnected. */
876     GNUNET_assert (GNUNET_YES == ch->destroy);
877     return;
878   }
879   env = GNUNET_MQ_msg (ack,
880                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
881   ack->ccn = ccc->ccn;
882   LOG (GNUNET_ERROR_TYPE_DEBUG,
883        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
884        GSC_2s (ccc->c),
885        (GNUNET_YES == to_owner) ? "owner" : "dest",
886        ntohl (ack->ccn.channel_of_client),
887        ch->pending_messages,
888        ch->max_pending_messages);
889   GSC_send_to_client (ccc->c,
890                       env);
891 }
892
893
894 /**
895  * A client is bound to the port that we have a channel
896  * open to.  Send the acknowledgement for the connection
897  * request and establish the link with the client.
898  *
899  * @param ch open incoming channel
900  * @param c client listening on the respective port
901  */
902 void
903 GCCH_bind (struct CadetChannel *ch,
904            struct CadetClient *c)
905 {
906   uint32_t options;
907   struct CadetChannelClient *cccd;
908
909   LOG (GNUNET_ERROR_TYPE_DEBUG,
910        "Binding %s from %s to port %s of %s\n",
911        GCCH_2s (ch),
912        GCT_2s (ch->t),
913        GNUNET_h2s (&ch->port),
914        GSC_2s (c));
915   if (NULL != ch->retry_control_task)
916   {
917     /* there might be a timeout task here */
918     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
919     ch->retry_control_task = NULL;
920   }
921   options = 0;
922   if (ch->nobuffer)
923     options |= GNUNET_CADET_OPTION_NOBUFFER;
924   if (ch->reliable)
925     options |= GNUNET_CADET_OPTION_RELIABLE;
926   if (ch->out_of_order)
927     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
928   cccd = GNUNET_new (struct CadetChannelClient);
929   ch->dest = cccd;
930   cccd->c = c;
931   cccd->client_ready = GNUNET_YES;
932   cccd->ccn = GSC_bind (c,
933                         ch,
934                         (GNUNET_YES == ch->is_loopback)
935                         ? GCP_get (&my_full_id,
936                                    GNUNET_YES)
937                         : GCT_get_destination (ch->t),
938                         &ch->port,
939                         options);
940   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
941                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
942   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
943   if (GNUNET_YES == ch->is_loopback)
944   {
945     ch->state = CADET_CHANNEL_OPEN_SENT;
946     GCCH_handle_channel_open_ack (ch);
947   }
948   else
949   {
950     /* notify other peer that we accepted the connection */
951     ch->retry_control_task
952       = GNUNET_SCHEDULER_add_now (&send_open_ack,
953                                   ch);
954   }
955   /* give client it's initial supply of ACKs */
956   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
957                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
958   for (unsigned int i=0;i<ch->max_pending_messages;i++)
959     send_ack_to_client (ch,
960                         GNUNET_NO);
961 }
962
963
964 /**
965  * Destroy locally created channel.  Called by the local client, so no
966  * need to tell the client.
967  *
968  * @param ch channel to destroy
969  * @param c client that caused the destruction
970  * @param ccn client number of the client @a c
971  */
972 void
973 GCCH_channel_local_destroy (struct CadetChannel *ch,
974                             struct CadetClient *c,
975                             struct GNUNET_CADET_ClientChannelNumber ccn)
976 {
977   LOG (GNUNET_ERROR_TYPE_DEBUG,
978        "%s asks for destruction of %s\n",
979        GSC_2s (c),
980        GCCH_2s (ch));
981   GNUNET_assert (NULL != c);
982   if ( (NULL != ch->owner) &&
983        (c == ch->owner->c) &&
984        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
985   {
986     free_channel_client (ch->owner);
987     ch->owner = NULL;
988   }
989   else if ( (NULL != ch->dest) &&
990             (c == ch->dest->c) &&
991             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
992   {
993     free_channel_client (ch->dest);
994     ch->dest = NULL;
995   }
996   else
997   {
998     GNUNET_assert (0);
999   }
1000
1001   if (GNUNET_YES == ch->destroy)
1002   {
1003     /* other end already destroyed, with the local client gone, no need
1004        to finish transmissions, just destroy immediately. */
1005     channel_destroy (ch);
1006     return;
1007   }
1008   if ( (NULL != ch->head_sent) ||
1009        (NULL != ch->owner) ||
1010        (NULL != ch->dest) )
1011   {
1012     /* Wait for other end to destroy us as well,
1013        and otherwise allow send queue to be transmitted first */
1014     ch->destroy = GNUNET_YES;
1015     return;
1016   }
1017   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1018   if (CADET_CHANNEL_NEW != ch->state)
1019     GCT_send_channel_destroy (ch->t,
1020                               ch->ctn);
1021   /* Nothing left to do, just finish destruction */
1022   channel_destroy (ch);
1023 }
1024
1025
1026 /**
1027  * We got an acknowledgement for the creation of the channel
1028  * (the port is open on the other side). Begin transmissions.
1029  *
1030  * @param ch channel to destroy
1031  */
1032 void
1033 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
1034 {
1035   switch (ch->state)
1036   {
1037   case CADET_CHANNEL_NEW:
1038     /* this should be impossible */
1039     GNUNET_break (0);
1040     break;
1041   case CADET_CHANNEL_OPEN_SENT:
1042     if (NULL == ch->owner)
1043     {
1044       /* We're not the owner, wrong direction! */
1045       GNUNET_break_op (0);
1046       return;
1047     }
1048     LOG (GNUNET_ERROR_TYPE_DEBUG,
1049          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1050          GCCH_2s (ch));
1051     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1052     {
1053       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1054       ch->retry_control_task = NULL;
1055     }
1056     ch->state = CADET_CHANNEL_READY;
1057     /* On first connect, send client as many ACKs as we allow messages
1058        to be buffered! */
1059     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1060       send_ack_to_client (ch,
1061                           GNUNET_YES);
1062     break;
1063   case CADET_CHANNEL_READY:
1064     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1065     LOG (GNUNET_ERROR_TYPE_DEBUG,
1066          "Received duplicate channel OPEN_ACK for %s\n",
1067          GCCH_2s (ch));
1068     GNUNET_STATISTICS_update (stats,
1069                               "# duplicate CREATE_ACKs",
1070                               1,
1071                               GNUNET_NO);
1072     break;
1073   }
1074 }
1075
1076
1077 /**
1078  * Test if element @a e1 comes before element @a e2.
1079  *
1080  * @param cls closure, to a flag where we indicate duplicate packets
1081  * @param e1 an element of to sort
1082  * @param e2 another element to sort
1083  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1084  */
1085 static int
1086 is_before (void *cls,
1087            struct CadetOutOfOrderMessage *m1,
1088            struct CadetOutOfOrderMessage *m2)
1089 {
1090   int *duplicate = cls;
1091   uint32_t v1 = ntohl (m1->mid.mid);
1092   uint32_t v2 = ntohl (m2->mid.mid);
1093   uint32_t delta;
1094
1095   delta = v2 - v1;
1096   if (0 == delta)
1097     *duplicate = GNUNET_YES;
1098   if (delta > (uint32_t) INT_MAX)
1099   {
1100     /* in overflow range, we can safely assume we wrapped around */
1101     return GNUNET_NO;
1102   }
1103   else
1104   {
1105     /* result is small, thus v2 > v1, thus e1 < e2 */
1106     return GNUNET_YES;
1107   }
1108 }
1109
1110
1111 /**
1112  * We got payload data for a channel.  Pass it on to the client
1113  * and send an ACK to the other end (once flow control allows it!)
1114  *
1115  * @param ch channel that got data
1116  * @param msg message that was received
1117  */
1118 void
1119 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1120                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1121 {
1122   struct GNUNET_MQ_Envelope *env;
1123   struct GNUNET_CADET_LocalData *ld;
1124   struct CadetChannelClient *ccc;
1125   size_t payload_size;
1126   struct CadetOutOfOrderMessage *com;
1127   int duplicate;
1128   uint32_t mid_min;
1129   uint32_t mid_max;
1130   uint32_t mid_msg;
1131
1132   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1133   if ( (GNUNET_YES == ch->destroy) &&
1134        (NULL == ch->owner) &&
1135        (NULL == ch->dest) )
1136   {
1137     /* This client is gone, but we still have messages to send to
1138        the other end (which is why @a ch is not yet dead).  However,
1139        we cannot pass messages to our client anymore. */
1140     LOG (GNUNET_ERROR_TYPE_DEBUG,
1141          "Dropping incoming payload on %s as this end is already closed\n",
1142          GCCH_2s (ch));
1143     /* FIXME: send back ACK/NACK/Closed notification
1144        to stop retransmissions! */
1145     return;
1146   }
1147   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1148   env = GNUNET_MQ_msg_extra (ld,
1149                              payload_size,
1150                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1151   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1152   GNUNET_memcpy (&ld[1],
1153                  &msg[1],
1154                  payload_size);
1155   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1156   if ( (GNUNET_YES == ccc->client_ready) &&
1157        ( (GNUNET_YES == ch->out_of_order) ||
1158          (msg->mid.mid == ch->mid_recv.mid) ) )
1159   {
1160     LOG (GNUNET_ERROR_TYPE_DEBUG,
1161          "Giving %u bytes of payload with MID %u from %s to client %s\n",
1162          (unsigned int) payload_size,
1163          ntohl (msg->mid.mid),
1164          GCCH_2s (ch),
1165          GSC_2s (ccc->c));
1166     ccc->client_ready = GNUNET_NO;
1167     GSC_send_to_client (ccc->c,
1168                         env);
1169     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1170     ch->mid_futures >>= 1;
1171     if (GNUNET_YES == ch->reliable)
1172       send_channel_data_ack (ch);
1173     return;
1174   }
1175
1176   /* check if message ought to be dropped because it is anicent/too distant/duplicate */
1177   mid_min = ntohl (ch->mid_recv.mid);
1178   mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
1179   mid_msg = ntohl (msg->mid.mid);
1180   if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
1181        ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
1182   {
1183     LOG (GNUNET_ERROR_TYPE_DEBUG,
1184          "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
1185          (unsigned int) payload_size,
1186          GCCH_2s (ch),
1187          ntohl (msg->mid.mid));
1188     GNUNET_STATISTICS_update (stats,
1189                               "# duplicate DATA (ancient or future)",
1190                               1,
1191                               GNUNET_NO);
1192     GNUNET_MQ_discard (env);
1193     if (GNUNET_YES == ch->reliable)
1194       send_channel_data_ack (ch);
1195     return;
1196   }
1197
1198   /* Insert message into sorted out-of-order queue */
1199   com = GNUNET_new (struct CadetOutOfOrderMessage);
1200   com->mid = msg->mid;
1201   com->env = env;
1202   duplicate = GNUNET_NO;
1203   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1204                                       is_before,
1205                                       &duplicate,
1206                                       ccc->head_recv,
1207                                       ccc->tail_recv,
1208                                       com);
1209   if (GNUNET_YES == duplicate)
1210   {
1211     /* Duplicate within the queue, drop also */
1212     LOG (GNUNET_ERROR_TYPE_DEBUG,
1213          "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1214          (unsigned int) payload_size,
1215          GCCH_2s (ch),
1216          ntohl (msg->mid.mid));
1217     GNUNET_STATISTICS_update (stats,
1218                               "# duplicate DATA",
1219                               1,
1220                               GNUNET_NO);
1221     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1222                                  ccc->tail_recv,
1223                                  com);
1224     GNUNET_MQ_discard (com->env);
1225     GNUNET_free (com);
1226     if (GNUNET_YES == ch->reliable)
1227       send_channel_data_ack (ch);
1228     return;
1229   }
1230   LOG (GNUNET_ERROR_TYPE_DEBUG,
1231        "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1232        (GNUNET_YES == ccc->client_ready)
1233        ? "out-of-order"
1234        : "client-not-ready",
1235        (unsigned int) payload_size,
1236        GCCH_2s (ch),
1237        ntohl (ccc->ccn.channel_of_client),
1238        ccc,
1239        ntohl (msg->mid.mid),
1240        ntohl (ch->mid_recv.mid));
1241   send_channel_data_ack (ch);
1242 }
1243
1244
1245 /**
1246  * Function called once the tunnel has sent one of our messages.
1247  * If the message is unreliable, simply frees the `crm`. If the
1248  * message was reliable, calculate retransmission time and
1249  * wait for ACK (or retransmit).
1250  *
1251  * @param cls the `struct CadetReliableMessage` that was sent
1252  */
1253 static void
1254 data_sent_cb (void *cls);
1255
1256
1257 /**
1258  * We need to retry a transmission, the last one took too long to
1259  * be acknowledged.
1260  *
1261  * @param cls the `struct CadetChannel` where we need to retransmit
1262  */
1263 static void
1264 retry_transmission (void *cls)
1265 {
1266   struct CadetChannel *ch = cls;
1267   struct CadetReliableMessage *crm = ch->head_sent;
1268
1269   ch->retry_data_task = NULL;
1270   GNUNET_assert (NULL == crm->qe);
1271   LOG (GNUNET_ERROR_TYPE_DEBUG,
1272        "Retrying transmission on %s of message %u\n",
1273        GCCH_2s (ch),
1274        (unsigned int) ntohl (crm->data_message->mid.mid));
1275   crm->qe = GCT_send (ch->t,
1276                       &crm->data_message->header,
1277                       &data_sent_cb,
1278                       crm);
1279   GNUNET_assert (NULL == ch->retry_data_task);
1280 }
1281
1282
1283 /**
1284  * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1285  * the queue and tell our client that it can send more.
1286  *
1287  * @param ch the channel that got the PLAINTEXT_DATA_ACK
1288  * @param crm the message that got acknowledged
1289  */
1290 static void
1291 handle_matching_ack (struct CadetChannel *ch,
1292                      struct CadetReliableMessage *crm)
1293 {
1294   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1295                                ch->tail_sent,
1296                                crm);
1297   ch->pending_messages--;
1298   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1299   LOG (GNUNET_ERROR_TYPE_DEBUG,
1300        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1301        GCCH_2s (ch),
1302        (unsigned int) ntohl (crm->data_message->mid.mid),
1303        ch->pending_messages);
1304   if (NULL != crm->qe)
1305   {
1306     GCT_send_cancel (crm->qe);
1307     crm->qe = NULL;
1308   }
1309   GNUNET_free (crm->data_message);
1310   GNUNET_free (crm);
1311   send_ack_to_client (ch,
1312                       (NULL == ch->owner)
1313                       ? GNUNET_NO
1314                       : GNUNET_YES);
1315 }
1316
1317
1318 /**
1319  * We got an acknowledgement for payload data for a channel.
1320  * Possibly resume transmissions.
1321  *
1322  * @param ch channel that got the ack
1323  * @param ack details about what was received
1324  */
1325 void
1326 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1327                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1328 {
1329   struct CadetReliableMessage *crm;
1330   struct CadetReliableMessage *crmn;
1331   int found;
1332   uint32_t mid_base;
1333   uint64_t mid_mask;
1334   unsigned int delta;
1335
1336   GNUNET_break (GNUNET_NO == ch->is_loopback);
1337   if (GNUNET_NO == ch->reliable)
1338   {
1339     /* not expecting ACKs on unreliable channel, odd */
1340     GNUNET_break_op (0);
1341     return;
1342   }
1343   mid_base = ntohl (ack->mid.mid);
1344   mid_mask = GNUNET_htonll (ack->futures);
1345   found = GNUNET_NO;
1346   for (crm = ch->head_sent;
1347         NULL != crm;
1348        crm = crmn)
1349   {
1350     crmn = crm->next;
1351     if (ack->mid.mid == crm->data_message->mid.mid)
1352     {
1353       LOG (GNUNET_ERROR_TYPE_DEBUG,
1354            "Got DATA_ACK with base %u matching message %u on %s\n",
1355            (unsigned int) mid_base,
1356            ntohl (crm->data_message->mid.mid),
1357            GCCH_2s (ch));
1358       handle_matching_ack (ch,
1359                            crm);
1360       found = GNUNET_YES;
1361       continue;
1362     }
1363     delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1;
1364     if (delta >= UINT_MAX - ch->max_pending_messages)
1365     {
1366       /* overflow, means crm was way in the past, so this ACK counts for it. */
1367       LOG (GNUNET_ERROR_TYPE_DEBUG,
1368            "Got DATA_ACK with base %u past %u on %s\n",
1369            (unsigned int) mid_base,
1370            ntohl (crm->data_message->mid.mid),
1371            GCCH_2s (ch));
1372       handle_matching_ack (ch,
1373                            crm);
1374       found = GNUNET_YES;
1375       continue;
1376     }
1377     if (delta >= 64)
1378       continue;
1379     if (0 != (mid_mask & (1LLU << delta)))
1380     {
1381       LOG (GNUNET_ERROR_TYPE_DEBUG,
1382            "Got DATA_ACK with mask for %u on %s\n",
1383            ntohl (crm->data_message->mid.mid),
1384            GCCH_2s (ch));
1385       handle_matching_ack (ch,
1386                            crm);
1387       found = GNUNET_YES;
1388     }
1389   }
1390   if (GNUNET_NO == found)
1391   {
1392     /* ACK for message we already dropped, might have been a
1393        duplicate ACK? Ignore. */
1394     LOG (GNUNET_ERROR_TYPE_DEBUG,
1395          "Duplicate DATA_ACK on %s, ignoring\n",
1396          GCCH_2s (ch));
1397     GNUNET_STATISTICS_update (stats,
1398                               "# duplicate DATA_ACKs",
1399                               1,
1400                               GNUNET_NO);
1401     return;
1402   }
1403   if (NULL != ch->retry_data_task)
1404   {
1405     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1406     ch->retry_data_task = NULL;
1407   }
1408   if (NULL != ch->head_sent)
1409     ch->retry_data_task
1410       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1411                                  &retry_transmission,
1412                                  ch);
1413 }
1414
1415
1416 /**
1417  * Destroy channel, based on the other peer closing the
1418  * connection.  Also needs to remove this channel from
1419  * the tunnel.
1420  *
1421  * @param ch channel to destroy
1422  */
1423 void
1424 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1425 {
1426   struct CadetChannelClient *ccc;
1427
1428   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1429   LOG (GNUNET_ERROR_TYPE_DEBUG,
1430        "Received remote channel DESTROY for %s\n",
1431        GCCH_2s (ch));
1432   if (GNUNET_YES == ch->destroy)
1433   {
1434     /* Local client already gone, this is instant-death. */
1435     channel_destroy (ch);
1436     return;
1437   }
1438   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1439   if (NULL != ccc->head_recv)
1440   {
1441     LOG (GNUNET_ERROR_TYPE_WARNING,
1442          "Lost end of transmission due to remote shutdown on %s\n",
1443          GCCH_2s (ch));
1444     /* FIXME: change API to notify client about truncated transmission! */
1445   }
1446   ch->destroy = GNUNET_YES;
1447   GSC_handle_remote_channel_destroy (ccc->c,
1448                                      ccc->ccn,
1449                                      ch);
1450   channel_destroy (ch);
1451 }
1452
1453
1454 /**
1455  * Test if element @a e1 comes before element @a e2.
1456  *
1457  * @param cls closure, to a flag where we indicate duplicate packets
1458  * @param crm1 an element of to sort
1459  * @param crm2 another element to sort
1460  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1461  */
1462 static int
1463 cmp_crm_by_next_retry (void *cls,
1464                        struct CadetReliableMessage *crm1,
1465                        struct CadetReliableMessage *crm2)
1466 {
1467   if (crm1->next_retry.abs_value_us <
1468       crm2->next_retry.abs_value_us)
1469     return GNUNET_YES;
1470   return GNUNET_NO;
1471 }
1472
1473
1474 /**
1475  * Function called once the tunnel has sent one of our messages.
1476  * If the message is unreliable, simply frees the `crm`. If the
1477  * message was reliable, calculate retransmission time and
1478  * wait for ACK (or retransmit).
1479  *
1480  * @param cls the `struct CadetReliableMessage` that was sent
1481  */
1482 static void
1483 data_sent_cb (void *cls)
1484 {
1485   struct CadetReliableMessage *crm = cls;
1486   struct CadetChannel *ch = crm->ch;
1487
1488   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1489   GNUNET_assert (NULL != crm->qe);
1490   crm->qe = NULL;
1491   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1492                                ch->tail_sent,
1493                                crm);
1494   if (GNUNET_NO == ch->reliable)
1495   {
1496     GNUNET_free (crm->data_message);
1497     GNUNET_free (crm);
1498     ch->pending_messages--;
1499     send_ack_to_client (ch,
1500                         (NULL == ch->owner)
1501                         ? GNUNET_NO
1502                         : GNUNET_YES);
1503     return;
1504   }
1505   if (0 == crm->retry_delay.rel_value_us)
1506     crm->retry_delay = ch->expected_delay;
1507   else
1508     crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1509   crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1510                                                MIN_RTT_DELAY);
1511   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1512
1513   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1514                                       cmp_crm_by_next_retry,
1515                                       NULL,
1516                                       ch->head_sent,
1517                                       ch->tail_sent,
1518                                       crm);
1519   LOG (GNUNET_ERROR_TYPE_DEBUG,
1520        "Message %u sent, next transmission on %s in %s\n",
1521        (unsigned int) ntohl (crm->data_message->mid.mid),
1522        GCCH_2s (ch),
1523        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1524                                                GNUNET_YES));
1525   if (NULL == ch->head_sent->qe)
1526   {
1527     if (NULL != ch->retry_data_task)
1528       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1529     ch->retry_data_task
1530       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1531                                  &retry_transmission,
1532                                  ch);
1533   }
1534 }
1535
1536
1537 /**
1538  * Handle data given by a client.
1539  *
1540  * Check whether the client is allowed to send in this tunnel, save if
1541  * channel is reliable and send an ACK to the client if there is still
1542  * buffer space in the tunnel.
1543  *
1544  * @param ch Channel.
1545  * @param sender_ccn ccn of the sender
1546  * @param buf payload to transmit.
1547  * @param buf_len number of bytes in @a buf
1548  * @return #GNUNET_OK if everything goes well,
1549  *         #GNUNET_SYSERR in case of an error.
1550  */
1551 int
1552 GCCH_handle_local_data (struct CadetChannel *ch,
1553                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1554                         const char *buf,
1555                         size_t buf_len)
1556 {
1557   struct CadetReliableMessage *crm;
1558
1559   if (ch->pending_messages > ch->max_pending_messages)
1560   {
1561     GNUNET_break (0);
1562     return GNUNET_SYSERR;
1563   }
1564   ch->pending_messages++;
1565
1566   if (GNUNET_YES == ch->is_loopback)
1567   {
1568     struct CadetChannelClient *receiver;
1569     struct GNUNET_MQ_Envelope *env;
1570     struct GNUNET_CADET_LocalData *ld;
1571     int to_owner;
1572
1573     env = GNUNET_MQ_msg_extra (ld,
1574                                buf_len,
1575                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1576     if (sender_ccn.channel_of_client ==
1577         ch->owner->ccn.channel_of_client)
1578     {
1579       receiver = ch->dest;
1580       to_owner = GNUNET_NO;
1581     }
1582     else
1583     {
1584       GNUNET_assert (sender_ccn.channel_of_client ==
1585                      ch->dest->ccn.channel_of_client);
1586       receiver = ch->owner;
1587       to_owner = GNUNET_YES;
1588     }
1589     ld->ccn = receiver->ccn;
1590     GNUNET_memcpy (&ld[1],
1591                    buf,
1592                    buf_len);
1593     if (GNUNET_YES == receiver->client_ready)
1594     {
1595       GSC_send_to_client (receiver->c,
1596                           env);
1597       send_ack_to_client (ch,
1598                           to_owner);
1599     }
1600     else
1601     {
1602       struct CadetOutOfOrderMessage *oom;
1603
1604       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1605       oom->env = env;
1606       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1607                                         receiver->tail_recv,
1608                                         oom);
1609     }
1610     return GNUNET_OK;
1611   }
1612
1613   /* Everything is correct, send the message. */
1614   crm = GNUNET_malloc (sizeof (*crm));
1615   crm->ch = ch;
1616   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1617                                      + buf_len);
1618   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1619   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1620   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1621   crm->data_message->mid = ch->mid_send;
1622   crm->data_message->ctn = ch->ctn;
1623   GNUNET_memcpy (&crm->data_message[1],
1624                  buf,
1625                  buf_len);
1626   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1627                                ch->tail_sent,
1628                                crm);
1629   LOG (GNUNET_ERROR_TYPE_DEBUG,
1630        "Sending %u bytes from local client to %s with MID %u\n",
1631        buf_len,
1632        GCCH_2s (ch),
1633        ntohl (crm->data_message->mid.mid));
1634   if (NULL != ch->retry_data_task)
1635   {
1636     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1637     ch->retry_data_task = NULL;
1638   }
1639   crm->qe = GCT_send (ch->t,
1640                       &crm->data_message->header,
1641                       &data_sent_cb,
1642                       crm);
1643   GNUNET_assert (NULL == ch->retry_data_task);
1644   return GNUNET_OK;
1645 }
1646
1647
1648 /**
1649  * Handle ACK from client on local channel.  Means the client is ready
1650  * for more data, see if we have any for it.
1651  *
1652  * @param ch channel to destroy
1653  * @param client_ccn ccn of the client sending the ack
1654  */
1655 void
1656 GCCH_handle_local_ack (struct CadetChannel *ch,
1657                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1658 {
1659   struct CadetChannelClient *ccc;
1660   struct CadetOutOfOrderMessage *com;
1661
1662   if ( (NULL != ch->owner) &&
1663        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1664     ccc = ch->owner;
1665   else if ( (NULL != ch->dest) &&
1666             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1667     ccc = ch->dest;
1668   else
1669     GNUNET_assert (0);
1670   ccc->client_ready = GNUNET_YES;
1671   com = ccc->head_recv;
1672   if (NULL == com)
1673   {
1674     LOG (GNUNET_ERROR_TYPE_DEBUG,
1675          "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1676          GSC_2s (ccc->c),
1677          ntohl (client_ccn.channel_of_client),
1678          GCCH_2s (ch),
1679          ntohl (ccc->ccn.channel_of_client),
1680          ccc);
1681     return; /* none pending */
1682   }
1683   if (GNUNET_YES == ch->is_loopback)
1684   {
1685     int to_owner;
1686
1687     /* Messages are always in-order, just send */
1688     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1689                                  ccc->tail_recv,
1690                                  com);
1691     GSC_send_to_client (ccc->c,
1692                         com->env);
1693     /* Notify sender that we can receive more */
1694     if (ccc->ccn.channel_of_client ==
1695         ch->owner->ccn.channel_of_client)
1696     {
1697       to_owner = GNUNET_NO;
1698     }
1699     else
1700     {
1701       GNUNET_assert (ccc->ccn.channel_of_client ==
1702                      ch->dest->ccn.channel_of_client);
1703       to_owner = GNUNET_YES;
1704     }
1705     send_ack_to_client (ch,
1706                         to_owner);
1707     GNUNET_free (com);
1708     return;
1709   }
1710
1711   if ( (com->mid.mid != ch->mid_recv.mid) &&
1712        (GNUNET_NO == ch->out_of_order) )
1713   {
1714     LOG (GNUNET_ERROR_TYPE_DEBUG,
1715          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1716          GSC_2s (ccc->c),
1717          ntohl (ccc->ccn.channel_of_client),
1718          ntohl (com->mid.mid),
1719          ntohl (ch->mid_recv.mid));
1720     return; /* missing next one in-order */
1721   }
1722
1723   LOG (GNUNET_ERROR_TYPE_DEBUG,
1724        "Got LOCAL_ACK, passing payload message %u to %s-%X on %s\n",
1725        ntohl (com->mid.mid),
1726        GSC_2s (ccc->c),
1727        ntohl (ccc->ccn.channel_of_client),
1728        GCCH_2s (ch));
1729
1730   /* all good, pass next message to client */
1731   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1732                                ccc->tail_recv,
1733                                com);
1734   /* FIXME: if unreliable, this is not aggressive
1735      enough, as it would be OK to have lost some! */
1736   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1737   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1738   ccc->client_ready = GNUNET_NO;
1739   GSC_send_to_client (ccc->c,
1740                       com->env);
1741   GNUNET_free (com);
1742   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1743        (GNUNET_YES == ch->reliable) )
1744   {
1745     /* The next 15 messages were also already received (0xFF), this
1746        suggests that the sender may be blocked on flow control
1747        urgently waiting for an ACK from us. (As we have an inherent
1748        maximum of 64 bits, and 15 is getting too close for comfort.)
1749        So we should send one now. */
1750     LOG (GNUNET_ERROR_TYPE_DEBUG,
1751          "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1752          GCCH_2s (ch));
1753     if (GNUNET_YES == ch->reliable)
1754       send_channel_data_ack (ch);
1755   }
1756
1757   if (NULL != ccc->head_recv)
1758     return;
1759   if (GNUNET_NO == ch->destroy)
1760     return;
1761   GCT_send_channel_destroy (ch->t,
1762                             ch->ctn);
1763   channel_destroy (ch);
1764 }
1765
1766
1767 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1768
1769
1770 /**
1771  * Log channel info.
1772  *
1773  * @param ch Channel.
1774  * @param level Debug level to use.
1775  */
1776 void
1777 GCCH_debug (struct CadetChannel *ch,
1778             enum GNUNET_ErrorType level)
1779 {
1780   int do_log;
1781
1782   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1783                                        "cadet-chn",
1784                                        __FILE__, __FUNCTION__, __LINE__);
1785   if (0 == do_log)
1786     return;
1787
1788   if (NULL == ch)
1789   {
1790     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1791     return;
1792   }
1793   LOG2 (level,
1794         "CHN %s:%X (%p)\n",
1795         GCT_2s (ch->t),
1796         ch->ctn,
1797         ch);
1798   if (NULL != ch->owner)
1799   {
1800     LOG2 (level,
1801           "CHN origin %s ready %s local-id: %u\n",
1802           GSC_2s (ch->owner->c),
1803           ch->owner->client_ready ? "YES" : "NO",
1804           ntohl (ch->owner->ccn.channel_of_client));
1805   }
1806   if (NULL != ch->dest)
1807   {
1808     LOG2 (level,
1809           "CHN destination %s ready %s local-id: %u\n",
1810           GSC_2s (ch->dest->c),
1811           ch->dest->client_ready ? "YES" : "NO",
1812           ntohl (ch->dest->ccn.channel_of_client));
1813   }
1814   LOG2 (level,
1815         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1816         ntohl (ch->mid_recv.mid),
1817         (unsigned long long) ch->mid_futures,
1818         ntohl (ch->mid_send.mid));
1819 }
1820
1821
1822
1823 /* end of gnunet-service-cadet-new_channel.c */