properly clean up unbound channels
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/gnunet-service-cadet-new_channel.c
22  * @brief logical links between CADET clients
23  * @author Bartlomiej Polot
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - Congestion/flow control:
28  *   + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
29  *     (and figure out how/where to use this!)
30  *   + figure out flow control without ACKs (unreliable traffic!)
31  * - revisit handling of 'unbuffered' traffic!
32  *   (need to push down through tunnel into connection selection)
33  * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
34  *   reserve more bits in 'options' to allow for buffer size control?
35  */
36 #include "platform.h"
37 #include "gnunet_util_lib.h"
38 #include "cadet.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
46
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
48
49 /**
50  * How long do we initially wait before retransmitting?
51  */
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
53
54 /**
55  * How long do we wait before dropping state about incoming
56  * connection to closed port?
57  */
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
59
60 /**
61  * How long do we wait at least before retransmitting ever?
62  */
63 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
64
65 /**
66  * Maximum message ID into the future we accept for out-of-order messages.
67  * If the message is more than this into the future, we drop it.  This is
68  * important both to detect values that are actually in the past, as well
69  * as to limit adversarially triggerable memory consumption.
70  *
71  * Note that right now we have "max_pending_messages = 4" hard-coded in
72  * the logic below, so a value of 4 would suffice here. But we plan to
73  * allow larger windows in the future...
74  */
75 #define MAX_OUT_OF_ORDER_DISTANCE 1024
76
77
78 /**
79  * All the states a connection can be in.
80  */
81 enum CadetChannelState
82 {
83   /**
84    * Uninitialized status, should never appear in operation.
85    */
86   CADET_CHANNEL_NEW,
87
88   /**
89    * Connection create message sent, waiting for ACK.
90    */
91   CADET_CHANNEL_OPEN_SENT,
92
93   /**
94    * Connection confirmed, ready to carry traffic.
95    */
96   CADET_CHANNEL_READY
97 };
98
99
100 /**
101  * Info needed to retry a message in case it gets lost.
102  * Note that we DO use this structure also for unreliable
103  * messages.
104  */
105 struct CadetReliableMessage
106 {
107   /**
108    * Double linked list, FIFO style
109    */
110   struct CadetReliableMessage *next;
111
112   /**
113    * Double linked list, FIFO style
114    */
115   struct CadetReliableMessage *prev;
116
117   /**
118    * Which channel is this message in?
119    */
120   struct CadetChannel *ch;
121
122   /**
123    * Entry in the tunnels queue for this message, NULL if it has left
124    * the tunnel.  Used to cancel transmission in case we receive an
125    * ACK in time.
126    */
127   struct CadetTunnelQueueEntry *qe;
128
129   /**
130    * Data message we are trying to send.
131    */
132   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
133
134   /**
135    * How soon should we retry if we fail to get an ACK?
136    * Messages in the queue are sorted by this value.
137    */
138   struct GNUNET_TIME_Absolute next_retry;
139
140   /**
141    * How long do we wait for an ACK after transmission?
142    * Use for the back-off calculation.
143    */
144   struct GNUNET_TIME_Relative retry_delay;
145
146   /**
147    * Time when we first successfully transmitted the message
148    * (that is, set @e num_transmissions to 1).
149    */
150   struct GNUNET_TIME_Absolute first_transmission_time;
151
152   /**
153    * Identifier of the connection that this message took when it
154    * was first transmitted.  Only useful if @e num_transmissions is 1.
155    */
156   struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken;
157
158   /**
159    * How often was this message transmitted?  #GNUNET_SYSERR if there
160    * was an error transmitting the message, #GNUNET_NO if it was not
161    * yet transmitted ever, otherwise the number of (re) transmissions.
162    */
163   int num_transmissions;
164
165 };
166
167
168 /**
169  * List of received out-of-order data messages.
170  */
171 struct CadetOutOfOrderMessage
172 {
173   /**
174    * Double linked list, FIFO style
175    */
176   struct CadetOutOfOrderMessage *next;
177
178   /**
179    * Double linked list, FIFO style
180    */
181   struct CadetOutOfOrderMessage *prev;
182
183   /**
184    * ID of the message (messages up to this point needed
185    * before we give this one to the client).
186    */
187   struct ChannelMessageIdentifier mid;
188
189   /**
190    * The envelope with the payload of the out-of-order message
191    */
192   struct GNUNET_MQ_Envelope *env;
193
194 };
195
196
197 /**
198  * Client endpoint of a `struct CadetChannel`.  A channel may be a
199  * loopback channel, in which case it has two of these endpoints.
200  * Note that flow control also is required in both directions.
201  */
202 struct CadetChannelClient
203 {
204   /**
205    * Client handle.  Not by itself sufficient to designate
206    * the client endpoint, as the same client handle may
207    * be used for both the owner and the destination, and
208    * we thus also need the channel ID to identify the client.
209    */
210   struct CadetClient *c;
211
212   /**
213    * Head of DLL of messages received out of order or while client was unready.
214    */
215   struct CadetOutOfOrderMessage *head_recv;
216
217   /**
218    * Tail DLL of messages received out of order or while client was unready.
219    */
220   struct CadetOutOfOrderMessage *tail_recv;
221
222   /**
223    * Local tunnel number for this client.
224    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
225    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
226    */
227   struct GNUNET_CADET_ClientChannelNumber ccn;
228
229   /**
230    * Number of entries currently in @a head_recv DLL.
231    */
232   unsigned int num_recv;
233
234   /**
235    * Can we send data to the client?
236    */
237   int client_ready;
238
239 };
240
241
242 /**
243  * Struct containing all information regarding a channel to a remote client.
244  */
245 struct CadetChannel
246 {
247   /**
248    * Tunnel this channel is in.
249    */
250   struct CadetTunnel *t;
251
252   /**
253    * Client owner of the tunnel, if any.
254    * (Used if this channel represends the initiating end of the tunnel.)
255    */
256   struct CadetChannelClient *owner;
257
258   /**
259    * Client destination of the tunnel, if any.
260    * (Used if this channel represents the listening end of the tunnel.)
261    */
262   struct CadetChannelClient *dest;
263
264   /**
265    * Last entry in the tunnel's queue relating to control messages
266    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
267    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
268    * transmission in case we receive updated information.
269    */
270   struct CadetTunnelQueueEntry *last_control_qe;
271
272   /**
273    * Head of DLL of messages sent and not yet ACK'd.
274    */
275   struct CadetReliableMessage *head_sent;
276
277   /**
278    * Tail of DLL of messages sent and not yet ACK'd.
279    */
280   struct CadetReliableMessage *tail_sent;
281
282   /**
283    * Task to resend/poll in case no ACK is received.
284    */
285   struct GNUNET_SCHEDULER_Task *retry_control_task;
286
287   /**
288    * Task to resend/poll in case no ACK is received.
289    */
290   struct GNUNET_SCHEDULER_Task *retry_data_task;
291
292   /**
293    * Last time the channel was used
294    */
295   struct GNUNET_TIME_Absolute timestamp;
296
297   /**
298    * Destination port of the channel.
299    */
300   struct GNUNET_HashCode port;
301
302   /**
303    * Counter for exponential backoff.
304    */
305   struct GNUNET_TIME_Relative retry_time;
306
307   /**
308    * Bitfield of already-received messages past @e mid_recv.
309    */
310   uint64_t mid_futures;
311
312   /**
313    * Next MID expected for incoming traffic.
314    */
315   struct ChannelMessageIdentifier mid_recv;
316
317   /**
318    * Next MID to use for outgoing traffic.
319    */
320   struct ChannelMessageIdentifier mid_send;
321
322   /**
323    * Total (reliable) messages pending ACK for this channel.
324    */
325   unsigned int pending_messages;
326
327   /**
328    * Maximum (reliable) messages pending ACK for this channel
329    * before we throttle the client.
330    */
331   unsigned int max_pending_messages;
332
333   /**
334    * Number identifying this channel in its tunnel.
335    */
336   struct GNUNET_CADET_ChannelTunnelNumber ctn;
337
338   /**
339    * Channel state.
340    */
341   enum CadetChannelState state;
342
343   /**
344    * Count how many ACKs we skipped, used to prevent long
345    * sequences of ACK skipping.
346    */
347   unsigned int skip_ack_series;
348
349   /**
350    * Is the tunnel bufferless (minimum latency)?
351    */
352   int nobuffer;
353
354   /**
355    * Is the tunnel reliable?
356    */
357   int reliable;
358
359   /**
360    * Is the tunnel out-of-order?
361    */
362   int out_of_order;
363
364   /**
365    * Is this channel a loopback channel, where the destination is us again?
366    */
367   int is_loopback;
368
369   /**
370    * Flag to signal the destruction of the channel.  If this is set to
371    * #GNUNET_YES the channel will be destroyed once the queue is
372    * empty.
373    */
374   int destroy;
375
376 };
377
378
379 /**
380  * Get the static string for identification of the channel.
381  *
382  * @param ch Channel.
383  *
384  * @return Static string with the channel IDs.
385  */
386 const char *
387 GCCH_2s (const struct CadetChannel *ch)
388 {
389   static char buf[128];
390
391   GNUNET_snprintf (buf,
392                    sizeof (buf),
393                    "Channel %s:%s ctn:%X(%X/%X)",
394                    (GNUNET_YES == ch->is_loopback)
395                    ? "loopback"
396                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
397                    GNUNET_h2s (&ch->port),
398                    ch->ctn,
399                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
400                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
401   return buf;
402 }
403
404
405 /**
406  * Get the channel's public ID.
407  *
408  * @param ch Channel.
409  *
410  * @return ID used to identify the channel with the remote peer.
411  */
412 struct GNUNET_CADET_ChannelTunnelNumber
413 GCCH_get_id (const struct CadetChannel *ch)
414 {
415   return ch->ctn;
416 }
417
418
419 /**
420  * Release memory associated with @a ccc
421  *
422  * @param ccc data structure to clean up
423  */
424 static void
425 free_channel_client (struct CadetChannelClient *ccc)
426 {
427   struct CadetOutOfOrderMessage *com;
428
429   while (NULL != (com = ccc->head_recv))
430   {
431     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
432                                  ccc->tail_recv,
433                                  com);
434     ccc->num_recv--;
435     GNUNET_MQ_discard (com->env);
436     GNUNET_free (com);
437   }
438   GNUNET_free (ccc);
439 }
440
441
442 /**
443  * Destroy the given channel.
444  *
445  * @param ch channel to destroy
446  */
447 static void
448 channel_destroy (struct CadetChannel *ch)
449 {
450   struct CadetReliableMessage *crm;
451
452   while (NULL != (crm = ch->head_sent))
453   {
454     GNUNET_assert (ch == crm->ch);
455     if (NULL != crm->qe)
456     {
457       GCT_send_cancel (crm->qe);
458       crm->qe = NULL;
459     }
460     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
461                                  ch->tail_sent,
462                                  crm);
463     GNUNET_free (crm->data_message);
464     GNUNET_free (crm);
465   }
466   if (NULL != ch->owner)
467   {
468     free_channel_client (ch->owner);
469     ch->owner = NULL;
470   }
471   if (NULL != ch->dest)
472   {
473     free_channel_client (ch->dest);
474     ch->dest = NULL;
475   }
476   if (NULL != ch->last_control_qe)
477   {
478     GCT_send_cancel (ch->last_control_qe);
479     ch->last_control_qe = NULL;
480   }
481   if (NULL != ch->retry_data_task)
482   {
483     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
484     ch->retry_data_task = NULL;
485   }
486   if (NULL != ch->retry_control_task)
487   {
488     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
489     ch->retry_control_task = NULL;
490   }
491   if (GNUNET_NO == ch->is_loopback)
492   {
493     GCT_remove_channel (ch->t,
494                         ch,
495                         ch->ctn);
496     ch->t = NULL;
497   }
498   GNUNET_free (ch);
499 }
500
501
502 /**
503  * Send a channel create message.
504  *
505  * @param cls Channel for which to send.
506  */
507 static void
508 send_channel_open (void *cls);
509
510
511 /**
512  * Function called once the tunnel confirms that we sent the
513  * create message.  Delays for a bit until we retry.
514  *
515  * @param cls our `struct CadetChannel`.
516  * @param cid identifier of the connection within the tunnel, NULL
517  *            if transmission failed
518  */
519 static void
520 channel_open_sent_cb (void *cls,
521                       const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
522 {
523   struct CadetChannel *ch = cls;
524
525   GNUNET_assert (NULL != ch->last_control_qe);
526   ch->last_control_qe = NULL;
527   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
528   LOG (GNUNET_ERROR_TYPE_DEBUG,
529        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
530        GCCH_2s (ch),
531        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
532                                                GNUNET_YES));
533   ch->retry_control_task
534     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
535                                     &send_channel_open,
536                                     ch);
537 }
538
539
540 /**
541  * Send a channel open message.
542  *
543  * @param cls Channel for which to send.
544  */
545 static void
546 send_channel_open (void *cls)
547 {
548   struct CadetChannel *ch = cls;
549   struct GNUNET_CADET_ChannelOpenMessage msgcc;
550   uint32_t options;
551
552   ch->retry_control_task = NULL;
553   LOG (GNUNET_ERROR_TYPE_DEBUG,
554        "Sending CHANNEL_OPEN message for %s\n",
555        GCCH_2s (ch));
556   options = 0;
557   if (ch->nobuffer)
558     options |= GNUNET_CADET_OPTION_NOBUFFER;
559   if (ch->reliable)
560     options |= GNUNET_CADET_OPTION_RELIABLE;
561   if (ch->out_of_order)
562     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
563   msgcc.header.size = htons (sizeof (msgcc));
564   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
565   msgcc.opt = htonl (options);
566   msgcc.port = ch->port;
567   msgcc.ctn = ch->ctn;
568   ch->state = CADET_CHANNEL_OPEN_SENT;
569   if (NULL != ch->last_control_qe)
570     GCT_send_cancel (ch->last_control_qe);
571   ch->last_control_qe = GCT_send (ch->t,
572                                   &msgcc.header,
573                                   &channel_open_sent_cb,
574                                   ch);
575   GNUNET_assert (NULL == ch->retry_control_task);
576 }
577
578
579 /**
580  * Function called once and only once after a channel was bound
581  * to its tunnel via #GCT_add_channel() is ready for transmission.
582  * Note that this is only the case for channels that this peer
583  * initiates, as for incoming channels we assume that they are
584  * ready for transmission immediately upon receiving the open
585  * message.  Used to bootstrap the #GCT_send() process.
586  *
587  * @param ch the channel for which the tunnel is now ready
588  */
589 void
590 GCCH_tunnel_up (struct CadetChannel *ch)
591 {
592   GNUNET_assert (NULL == ch->retry_control_task);
593   LOG (GNUNET_ERROR_TYPE_DEBUG,
594        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
595        GCCH_2s (ch));
596   ch->retry_control_task
597     = GNUNET_SCHEDULER_add_now (&send_channel_open,
598                                 ch);
599 }
600
601
602 /**
603  * Create a new channel.
604  *
605  * @param owner local client owning the channel
606  * @param ccn local number of this channel at the @a owner
607  * @param destination peer to which we should build the channel
608  * @param port desired port at @a destination
609  * @param options options for the channel
610  * @return handle to the new channel
611  */
612 struct CadetChannel *
613 GCCH_channel_local_new (struct CadetClient *owner,
614                         struct GNUNET_CADET_ClientChannelNumber ccn,
615                         struct CadetPeer *destination,
616                         const struct GNUNET_HashCode *port,
617                         uint32_t options)
618 {
619   struct CadetChannel *ch;
620   struct CadetChannelClient *ccco;
621
622   ccco = GNUNET_new (struct CadetChannelClient);
623   ccco->c = owner;
624   ccco->ccn = ccn;
625   ccco->client_ready = GNUNET_YES;
626
627   ch = GNUNET_new (struct CadetChannel);
628   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
629   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
630   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
631   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
632   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
633   ch->owner = ccco;
634   ch->port = *port;
635   if (0 == memcmp (&my_full_id,
636                    GCP_get_id (destination),
637                    sizeof (struct GNUNET_PeerIdentity)))
638   {
639     struct CadetClient *c;
640
641     ch->is_loopback = GNUNET_YES;
642     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
643                                            port);
644     if (NULL == c)
645     {
646       /* port closed, wait for it to possibly open */
647       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
648                                                 port,
649                                                 ch,
650                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
651       LOG (GNUNET_ERROR_TYPE_DEBUG,
652            "Created loose incoming loopback channel to port %s\n",
653            GNUNET_h2s (&ch->port));
654     }
655     else
656     {
657       ch->dest = GNUNET_new (struct CadetChannelClient);
658       ch->dest->c = c;
659       ch->dest->client_ready = GNUNET_YES;
660       GCCH_bind (ch,
661                  ch->dest->c);
662     }
663   }
664   else
665   {
666     ch->t = GCP_get_tunnel (destination,
667                             GNUNET_YES);
668     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
669     ch->ctn = GCT_add_channel (ch->t,
670                                ch);
671   }
672   GNUNET_STATISTICS_update (stats,
673                             "# channels",
674                             1,
675                             GNUNET_NO);
676   LOG (GNUNET_ERROR_TYPE_DEBUG,
677        "Created channel to port %s at peer %s for %s using %s\n",
678        GNUNET_h2s (port),
679        GCP_2s (destination),
680        GSC_2s (owner),
681        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
682   return ch;
683 }
684
685
686 /**
687  * We had an incoming channel to a port that is closed.
688  * It has not been opened for a while, drop it.
689  *
690  * @param cls the channel to drop
691  */
692 static void
693 timeout_closed_cb (void *cls)
694 {
695   struct CadetChannel *ch = cls;
696
697   ch->retry_control_task = NULL;
698   LOG (GNUNET_ERROR_TYPE_DEBUG,
699        "Closing incoming channel to port %s from peer %s due to timeout\n",
700        GNUNET_h2s (&ch->port),
701        GCP_2s (GCT_get_destination (ch->t)));
702   channel_destroy (ch);
703 }
704
705
706 /**
707  * Create a new channel based on a request coming in over the network.
708  *
709  * @param t tunnel to the remote peer
710  * @param ctn identifier of this channel in the tunnel
711  * @param port desired local port
712  * @param options options for the channel
713  * @return handle to the new channel
714  */
715 struct CadetChannel *
716 GCCH_channel_incoming_new (struct CadetTunnel *t,
717                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
718                            const struct GNUNET_HashCode *port,
719                            uint32_t options)
720 {
721   struct CadetChannel *ch;
722   struct CadetClient *c;
723
724   ch = GNUNET_new (struct CadetChannel);
725   ch->port = *port;
726   ch->t = t;
727   ch->ctn = ctn;
728   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
729   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
730   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
731   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
732   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
733   GNUNET_STATISTICS_update (stats,
734                             "# channels",
735                             1,
736                             GNUNET_NO);
737
738   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
739                                          port);
740   if (NULL == c)
741   {
742     /* port closed, wait for it to possibly open */
743     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
744                                               port,
745                                               ch,
746                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
747     ch->retry_control_task
748       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
749                                       &timeout_closed_cb,
750                                       ch);
751     LOG (GNUNET_ERROR_TYPE_DEBUG,
752          "Created loose incoming channel to port %s from peer %s\n",
753          GNUNET_h2s (&ch->port),
754          GCP_2s (GCT_get_destination (ch->t)));
755   }
756   else
757   {
758     GCCH_bind (ch,
759                c);
760   }
761   GNUNET_STATISTICS_update (stats,
762                             "# channels",
763                             1,
764                             GNUNET_NO);
765   return ch;
766 }
767
768
769 /**
770  * Function called once the tunnel confirms that we sent the
771  * ACK message.  Just remembers it was sent, we do not expect
772  * ACKs for ACKs ;-).
773  *
774  * @param cls our `struct CadetChannel`.
775  * @param cid identifier of the connection within the tunnel, NULL
776  *            if transmission failed
777  */
778 static void
779 send_ack_cb (void *cls,
780              const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
781 {
782   struct CadetChannel *ch = cls;
783
784   GNUNET_assert (NULL != ch->last_control_qe);
785   ch->last_control_qe = NULL;
786 }
787
788
789 /**
790  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
791  *
792  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
793  */
794 static void
795 send_channel_data_ack (struct CadetChannel *ch)
796 {
797   struct GNUNET_CADET_ChannelDataAckMessage msg;
798
799   if (GNUNET_NO == ch->reliable)
800     return; /* no ACKs */
801   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
802   msg.header.size = htons (sizeof (msg));
803   msg.ctn = ch->ctn;
804   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
805   msg.futures = GNUNET_htonll (ch->mid_futures);
806   LOG (GNUNET_ERROR_TYPE_DEBUG,
807        "Sending DATA_ACK %u:%llX via %s\n",
808        (unsigned int) ntohl (msg.mid.mid),
809        (unsigned long long) ch->mid_futures,
810        GCCH_2s (ch));
811   if (NULL != ch->last_control_qe)
812     GCT_send_cancel (ch->last_control_qe);
813   ch->last_control_qe = GCT_send (ch->t,
814                                   &msg.header,
815                                   &send_ack_cb,
816                                   ch);
817 }
818
819
820 /**
821  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
822  * connection is up.
823  *
824  * @param cls the `struct CadetChannel`
825  */
826 static void
827 send_open_ack (void *cls)
828 {
829   struct CadetChannel *ch = cls;
830   struct GNUNET_CADET_ChannelManageMessage msg;
831
832   LOG (GNUNET_ERROR_TYPE_DEBUG,
833        "Sending CHANNEL_OPEN_ACK on %s\n",
834        GCCH_2s (ch));
835   ch->retry_control_task = NULL;
836   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
837   msg.header.size = htons (sizeof (msg));
838   msg.reserved = htonl (0);
839   msg.ctn = ch->ctn;
840   if (NULL != ch->last_control_qe)
841     GCT_send_cancel (ch->last_control_qe);
842   ch->last_control_qe = GCT_send (ch->t,
843                                   &msg.header,
844                                   &send_ack_cb,
845                                   ch);
846 }
847
848
849 /**
850  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
851  * this channel.  If the binding was successful, (re)transmit the
852  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
853  *
854  * @param ch channel that got the duplicate open
855  * @param cti identifier of the connection that delivered the message
856  */
857 void
858 GCCH_handle_duplicate_open (struct CadetChannel *ch,
859                             const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
860 {
861   if (NULL == ch->dest)
862   {
863     LOG (GNUNET_ERROR_TYPE_DEBUG,
864          "Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n",
865          GCCH_2s (ch));
866     return;
867   }
868   if (NULL != ch->retry_control_task)
869   {
870     LOG (GNUNET_ERROR_TYPE_DEBUG,
871          "Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n",
872          GCCH_2s (ch));
873     return;
874   }
875   LOG (GNUNET_ERROR_TYPE_DEBUG,
876        "Retransmitting CHANNEL_OPEN_ACK on %s\n",
877        GCCH_2s (ch));
878   ch->retry_control_task
879     = GNUNET_SCHEDULER_add_now (&send_open_ack,
880                                 ch);
881 }
882
883
884 /**
885  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
886  *
887  * @param ch channel the ack is for
888  * @param to_owner #GNUNET_YES to send to owner,
889  *                 #GNUNET_NO to send to dest
890  */
891 static void
892 send_ack_to_client (struct CadetChannel *ch,
893                     int to_owner)
894 {
895   struct GNUNET_MQ_Envelope *env;
896   struct GNUNET_CADET_LocalAck *ack;
897   struct CadetChannelClient *ccc;
898
899   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
900   if (NULL == ccc)
901   {
902     /* This can happen if we are just getting ACKs after
903        our local client already disconnected. */
904     GNUNET_assert (GNUNET_YES == ch->destroy);
905     return;
906   }
907   env = GNUNET_MQ_msg (ack,
908                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
909   ack->ccn = ccc->ccn;
910   LOG (GNUNET_ERROR_TYPE_DEBUG,
911        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
912        GSC_2s (ccc->c),
913        (GNUNET_YES == to_owner) ? "owner" : "dest",
914        ntohl (ack->ccn.channel_of_client),
915        ch->pending_messages,
916        ch->max_pending_messages);
917   GSC_send_to_client (ccc->c,
918                       env);
919 }
920
921
922 /**
923  * A client is bound to the port that we have a channel
924  * open to.  Send the acknowledgement for the connection
925  * request and establish the link with the client.
926  *
927  * @param ch open incoming channel
928  * @param c client listening on the respective port
929  */
930 void
931 GCCH_bind (struct CadetChannel *ch,
932            struct CadetClient *c)
933 {
934   uint32_t options;
935   struct CadetChannelClient *cccd;
936
937   LOG (GNUNET_ERROR_TYPE_DEBUG,
938        "Binding %s from %s to port %s of %s\n",
939        GCCH_2s (ch),
940        GCT_2s (ch->t),
941        GNUNET_h2s (&ch->port),
942        GSC_2s (c));
943   if (NULL != ch->retry_control_task)
944   {
945     /* there might be a timeout task here */
946     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
947     ch->retry_control_task = NULL;
948   }
949   options = 0;
950   if (ch->nobuffer)
951     options |= GNUNET_CADET_OPTION_NOBUFFER;
952   if (ch->reliable)
953     options |= GNUNET_CADET_OPTION_RELIABLE;
954   if (ch->out_of_order)
955     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
956   cccd = GNUNET_new (struct CadetChannelClient);
957   ch->dest = cccd;
958   cccd->c = c;
959   cccd->client_ready = GNUNET_YES;
960   cccd->ccn = GSC_bind (c,
961                         ch,
962                         (GNUNET_YES == ch->is_loopback)
963                         ? GCP_get (&my_full_id,
964                                    GNUNET_YES)
965                         : GCT_get_destination (ch->t),
966                         &ch->port,
967                         options);
968   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
969                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
970   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
971   if (GNUNET_YES == ch->is_loopback)
972   {
973     ch->state = CADET_CHANNEL_OPEN_SENT;
974     GCCH_handle_channel_open_ack (ch,
975                                   NULL);
976   }
977   else
978   {
979     /* notify other peer that we accepted the connection */
980     ch->retry_control_task
981       = GNUNET_SCHEDULER_add_now (&send_open_ack,
982                                   ch);
983   }
984   /* give client it's initial supply of ACKs */
985   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
986                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
987   for (unsigned int i=0;i<ch->max_pending_messages;i++)
988     send_ack_to_client (ch,
989                         GNUNET_NO);
990 }
991
992
993 /**
994  * Destroy locally created channel.  Called by the local client, so no
995  * need to tell the client.
996  *
997  * @param ch channel to destroy
998  * @param c client that caused the destruction
999  * @param ccn client number of the client @a c
1000  */
1001 void
1002 GCCH_channel_local_destroy (struct CadetChannel *ch,
1003                             struct CadetClient *c,
1004                             struct GNUNET_CADET_ClientChannelNumber ccn)
1005 {
1006   LOG (GNUNET_ERROR_TYPE_DEBUG,
1007        "%s asks for destruction of %s\n",
1008        GSC_2s (c),
1009        GCCH_2s (ch));
1010   GNUNET_assert (NULL != c);
1011   if ( (NULL != ch->owner) &&
1012        (c == ch->owner->c) &&
1013        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
1014   {
1015     free_channel_client (ch->owner);
1016     ch->owner = NULL;
1017   }
1018   else if ( (NULL != ch->dest) &&
1019             (c == ch->dest->c) &&
1020             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
1021   {
1022     free_channel_client (ch->dest);
1023     ch->dest = NULL;
1024   }
1025   else
1026   {
1027     GNUNET_assert (0);
1028   }
1029
1030   if (GNUNET_YES == ch->destroy)
1031   {
1032     /* other end already destroyed, with the local client gone, no need
1033        to finish transmissions, just destroy immediately. */
1034     channel_destroy (ch);
1035     return;
1036   }
1037   if ( (NULL != ch->head_sent) ||
1038        (NULL != ch->owner) ||
1039        (NULL != ch->dest) )
1040   {
1041     /* Wait for other end to destroy us as well,
1042        and otherwise allow send queue to be transmitted first */
1043     ch->destroy = GNUNET_YES;
1044     return;
1045   }
1046   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1047   if (CADET_CHANNEL_NEW == ch->state)
1048     GSC_drop_loose_channel (&ch->port,
1049                             ch);
1050   else
1051     GCT_send_channel_destroy (ch->t,
1052                               ch->ctn);
1053   /* Nothing left to do, just finish destruction */
1054   channel_destroy (ch);
1055 }
1056
1057
1058 /**
1059  * We got an acknowledgement for the creation of the channel
1060  * (the port is open on the other side). Begin transmissions.
1061  *
1062  * @param ch channel to destroy
1063  * @param cti identifier of the connection that delivered the message
1064  */
1065 void
1066 GCCH_handle_channel_open_ack (struct CadetChannel *ch,
1067                               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1068 {
1069   switch (ch->state)
1070   {
1071   case CADET_CHANNEL_NEW:
1072     /* this should be impossible */
1073     GNUNET_break (0);
1074     break;
1075   case CADET_CHANNEL_OPEN_SENT:
1076     if (NULL == ch->owner)
1077     {
1078       /* We're not the owner, wrong direction! */
1079       GNUNET_break_op (0);
1080       return;
1081     }
1082     LOG (GNUNET_ERROR_TYPE_DEBUG,
1083          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1084          GCCH_2s (ch));
1085     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1086     {
1087       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1088       ch->retry_control_task = NULL;
1089     }
1090     ch->state = CADET_CHANNEL_READY;
1091     /* On first connect, send client as many ACKs as we allow messages
1092        to be buffered! */
1093     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1094       send_ack_to_client (ch,
1095                           GNUNET_YES);
1096     break;
1097   case CADET_CHANNEL_READY:
1098     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1099     LOG (GNUNET_ERROR_TYPE_DEBUG,
1100          "Received duplicate channel OPEN_ACK for %s\n",
1101          GCCH_2s (ch));
1102     GNUNET_STATISTICS_update (stats,
1103                               "# duplicate CREATE_ACKs",
1104                               1,
1105                               GNUNET_NO);
1106     break;
1107   }
1108 }
1109
1110
1111 /**
1112  * Test if element @a e1 comes before element @a e2.
1113  *
1114  * @param cls closure, to a flag where we indicate duplicate packets
1115  * @param m1 a message of to sort
1116  * @param m2 another message to sort
1117  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1118  */
1119 static int
1120 is_before (void *cls,
1121            struct CadetOutOfOrderMessage *m1,
1122            struct CadetOutOfOrderMessage *m2)
1123 {
1124   int *duplicate = cls;
1125   uint32_t v1 = ntohl (m1->mid.mid);
1126   uint32_t v2 = ntohl (m2->mid.mid);
1127   uint32_t delta;
1128
1129   delta = v2 - v1;
1130   if (0 == delta)
1131     *duplicate = GNUNET_YES;
1132   if (delta > (uint32_t) INT_MAX)
1133   {
1134     /* in overflow range, we can safely assume we wrapped around */
1135     return GNUNET_NO;
1136   }
1137   else
1138   {
1139     /* result is small, thus v2 > v1, thus m1 < m2 */
1140     return GNUNET_YES;
1141   }
1142 }
1143
1144
1145 /**
1146  * We got payload data for a channel.  Pass it on to the client
1147  * and send an ACK to the other end (once flow control allows it!)
1148  *
1149  * @param ch channel that got data
1150  * @param cti identifier of the connection that delivered the message
1151  * @param msg message that was received
1152  */
1153 void
1154 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1155                                     const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1156                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1157 {
1158   struct GNUNET_MQ_Envelope *env;
1159   struct GNUNET_CADET_LocalData *ld;
1160   struct CadetChannelClient *ccc;
1161   size_t payload_size;
1162   struct CadetOutOfOrderMessage *com;
1163   int duplicate;
1164   uint32_t mid_min;
1165   uint32_t mid_max;
1166   uint32_t mid_msg;
1167   uint32_t delta;
1168
1169   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1170   if ( (GNUNET_YES == ch->destroy) &&
1171        (NULL == ch->owner) &&
1172        (NULL == ch->dest) )
1173   {
1174     /* This client is gone, but we still have messages to send to
1175        the other end (which is why @a ch is not yet dead).  However,
1176        we cannot pass messages to our client anymore. */
1177     LOG (GNUNET_ERROR_TYPE_DEBUG,
1178          "Dropping incoming payload on %s as this end is already closed\n",
1179          GCCH_2s (ch));
1180     /* send back DESTROY notification to stop further retransmissions! */
1181     GCT_send_channel_destroy (ch->t,
1182                               ch->ctn);
1183     return;
1184   }
1185   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1186   env = GNUNET_MQ_msg_extra (ld,
1187                              payload_size,
1188                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1189   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1190   GNUNET_memcpy (&ld[1],
1191                  &msg[1],
1192                  payload_size);
1193   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1194   if ( (GNUNET_YES == ccc->client_ready) &&
1195        ( (GNUNET_YES == ch->out_of_order) ||
1196          (msg->mid.mid == ch->mid_recv.mid) ) )
1197   {
1198     LOG (GNUNET_ERROR_TYPE_DEBUG,
1199          "Giving %u bytes of payload with MID %u from %s to client %s\n",
1200          (unsigned int) payload_size,
1201          ntohl (msg->mid.mid),
1202          GCCH_2s (ch),
1203          GSC_2s (ccc->c));
1204     ccc->client_ready = GNUNET_NO;
1205     GSC_send_to_client (ccc->c,
1206                         env);
1207     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1208     ch->mid_futures >>= 1;
1209     send_channel_data_ack (ch);
1210     return;
1211   }
1212
1213   if (GNUNET_YES == ch->reliable)
1214   {
1215     /* check if message ought to be dropped because it is ancient/too distant/duplicate */
1216     mid_min = ntohl (ch->mid_recv.mid);
1217     mid_max = mid_min + ch->max_pending_messages;
1218     mid_msg = ntohl (msg->mid.mid);
1219     if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
1220          ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
1221     {
1222       LOG (GNUNET_ERROR_TYPE_DEBUG,
1223            "%s at %u drops ancient or far-future message %u\n",
1224            GCCH_2s (ch),
1225            (unsigned int) mid_min,
1226            ntohl (msg->mid.mid));
1227
1228       GNUNET_STATISTICS_update (stats,
1229                                 "# duplicate DATA (ancient or future)",
1230                                 1,
1231                                 GNUNET_NO);
1232       GNUNET_MQ_discard (env);
1233       send_channel_data_ack (ch);
1234       return;
1235     }
1236     /* mark bit for future ACKs */
1237     delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
1238     if (delta < 64)
1239     {
1240       if (0 != (ch->mid_futures & (1LLU << delta)))
1241       {
1242         /* Duplicate within the queue, drop also */
1243         LOG (GNUNET_ERROR_TYPE_DEBUG,
1244              "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1245              (unsigned int) payload_size,
1246              GCCH_2s (ch),
1247              ntohl (msg->mid.mid));
1248         GNUNET_STATISTICS_update (stats,
1249                                   "# duplicate DATA",
1250                                   1,
1251                                   GNUNET_NO);
1252         GNUNET_MQ_discard (env);
1253         send_channel_data_ack (ch);
1254         return;
1255       }
1256       ch->mid_futures |= (1LLU << delta);
1257       LOG (GNUNET_ERROR_TYPE_DEBUG,
1258            "Marked bit %llX for mid %u (base: %u); now: %llX\n",
1259            (1LLU << delta),
1260            mid_msg,
1261            mid_min,
1262            ch->mid_futures);
1263     }
1264   }
1265   else /* ! ch->reliable */
1266   {
1267     /* Channel is unreliable, so we do not ACK. But we also cannot
1268        allow buffering everything, so check if we have space... */
1269     if (ccc->num_recv >= ch->max_pending_messages)
1270     {
1271       struct CadetOutOfOrderMessage *drop;
1272
1273       /* Yep, need to drop. Drop the oldest message in
1274          the buffer. */
1275       LOG (GNUNET_ERROR_TYPE_DEBUG,
1276            "Queue full due slow client on %s, dropping oldest message\n",
1277            GCCH_2s (ch));
1278       GNUNET_STATISTICS_update (stats,
1279                                 "# messages dropped due to slow client",
1280                                 1,
1281                                 GNUNET_NO);
1282       drop = ccc->head_recv;
1283       GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1284                                    ccc->tail_recv,
1285                                    drop);
1286       ccc->num_recv--;
1287       GNUNET_MQ_discard (drop->env);
1288       GNUNET_free (drop);
1289     }
1290   }
1291
1292   /* Insert message into sorted out-of-order queue */
1293   com = GNUNET_new (struct CadetOutOfOrderMessage);
1294   com->mid = msg->mid;
1295   com->env = env;
1296   duplicate = GNUNET_NO;
1297   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1298                                       is_before,
1299                                       &duplicate,
1300                                       ccc->head_recv,
1301                                       ccc->tail_recv,
1302                                       com);
1303   ccc->num_recv++;
1304   if (GNUNET_YES == duplicate)
1305   {
1306     /* Duplicate within the queue, drop also (this is not covered by
1307        the case above if "delta" >= 64, which could be the case if
1308        max_pending_messages is also >= 64 or if our client is unready
1309        and we are seeing retransmissions of the message our client is
1310        blocked on. */
1311     LOG (GNUNET_ERROR_TYPE_DEBUG,
1312          "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1313          (unsigned int) payload_size,
1314          GCCH_2s (ch),
1315          ntohl (msg->mid.mid));
1316     GNUNET_STATISTICS_update (stats,
1317                               "# duplicate DATA",
1318                               1,
1319                               GNUNET_NO);
1320     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1321                                  ccc->tail_recv,
1322                                  com);
1323     ccc->num_recv--;
1324     GNUNET_MQ_discard (com->env);
1325     GNUNET_free (com);
1326     send_channel_data_ack (ch);
1327     return;
1328   }
1329   LOG (GNUNET_ERROR_TYPE_DEBUG,
1330        "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1331        (GNUNET_YES == ccc->client_ready)
1332        ? "out-of-order"
1333        : "client-not-ready",
1334        (unsigned int) payload_size,
1335        GCCH_2s (ch),
1336        ntohl (ccc->ccn.channel_of_client),
1337        ccc,
1338        ntohl (msg->mid.mid),
1339        ntohl (ch->mid_recv.mid));
1340   /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
1341      the sender may already be transmitting the previous one.  Needs
1342      experimental evaluation to see if/when this ACK helps or
1343      hurts. (We might even want another option.) */
1344   send_channel_data_ack (ch);
1345 }
1346
1347
1348 /**
1349  * Function called once the tunnel has sent one of our messages.
1350  * If the message is unreliable, simply frees the `crm`. If the
1351  * message was reliable, calculate retransmission time and
1352  * wait for ACK (or retransmit).
1353  *
1354  * @param cls the `struct CadetReliableMessage` that was sent
1355  * @param cid identifier of the connection within the tunnel, NULL
1356  *            if transmission failed
1357  */
1358 static void
1359 data_sent_cb (void *cls,
1360               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid);
1361
1362
1363 /**
1364  * We need to retry a transmission, the last one took too long to
1365  * be acknowledged.
1366  *
1367  * @param cls the `struct CadetChannel` where we need to retransmit
1368  */
1369 static void
1370 retry_transmission (void *cls)
1371 {
1372   struct CadetChannel *ch = cls;
1373   struct CadetReliableMessage *crm = ch->head_sent;
1374
1375   ch->retry_data_task = NULL;
1376   GNUNET_assert (NULL == crm->qe);
1377   LOG (GNUNET_ERROR_TYPE_DEBUG,
1378        "Retrying transmission on %s of message %u\n",
1379        GCCH_2s (ch),
1380        (unsigned int) ntohl (crm->data_message->mid.mid));
1381   crm->qe = GCT_send (ch->t,
1382                       &crm->data_message->header,
1383                       &data_sent_cb,
1384                       crm);
1385   GNUNET_assert (NULL == ch->retry_data_task);
1386 }
1387
1388
1389 /**
1390  * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1391  * the queue and tell our client that it can send more.
1392  *
1393  * @param ch the channel that got the PLAINTEXT_DATA_ACK
1394  * @param cti identifier of the connection that delivered the message
1395  * @param crm the message that got acknowledged
1396  */
1397 static void
1398 handle_matching_ack (struct CadetChannel *ch,
1399                      const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1400                      struct CadetReliableMessage *crm)
1401 {
1402   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1403                                ch->tail_sent,
1404                                crm);
1405   ch->pending_messages--;
1406   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1407   LOG (GNUNET_ERROR_TYPE_DEBUG,
1408        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1409        GCCH_2s (ch),
1410        (unsigned int) ntohl (crm->data_message->mid.mid),
1411        ch->pending_messages);
1412   if (NULL != crm->qe)
1413   {
1414     GCT_send_cancel (crm->qe);
1415     crm->qe = NULL;
1416   }
1417   if ( (1 == crm->num_transmissions) &&
1418        (NULL != cti) )
1419   {
1420     GCC_ack_observed (cti);
1421     if (0 == memcmp (cti,
1422                      &crm->connection_taken,
1423                      sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier)))
1424     {
1425       GCC_latency_observed (cti,
1426                             GNUNET_TIME_absolute_get_duration (crm->first_transmission_time));
1427     }
1428   }
1429   GNUNET_free (crm->data_message);
1430   GNUNET_free (crm);
1431   send_ack_to_client (ch,
1432                       (NULL == ch->owner)
1433                       ? GNUNET_NO
1434                       : GNUNET_YES);
1435 }
1436
1437
1438 /**
1439  * We got an acknowledgement for payload data for a channel.
1440  * Possibly resume transmissions.
1441  *
1442  * @param ch channel that got the ack
1443  * @param cti identifier of the connection that delivered the message
1444  * @param ack details about what was received
1445  */
1446 void
1447 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1448                                         const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1449                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1450 {
1451   struct CadetReliableMessage *crm;
1452   struct CadetReliableMessage *crmn;
1453   int found;
1454   uint32_t mid_base;
1455   uint64_t mid_mask;
1456   unsigned int delta;
1457
1458   GNUNET_break (GNUNET_NO == ch->is_loopback);
1459   if (GNUNET_NO == ch->reliable)
1460   {
1461     /* not expecting ACKs on unreliable channel, odd */
1462     GNUNET_break_op (0);
1463     return;
1464   }
1465   /* mid_base is the MID of the next message that the
1466      other peer expects (i.e. that is missing!), everything
1467      LOWER (but excluding mid_base itself) was received. */
1468   mid_base = ntohl (ack->mid.mid);
1469   mid_mask = GNUNET_htonll (ack->futures);
1470   found = GNUNET_NO;
1471   for (crm = ch->head_sent;
1472         NULL != crm;
1473        crm = crmn)
1474   {
1475     crmn = crm->next;
1476     delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
1477     if (delta >= UINT_MAX - ch->max_pending_messages)
1478     {
1479       /* overflow, means crm was a bit in the past, so this ACK counts for it. */
1480       LOG (GNUNET_ERROR_TYPE_DEBUG,
1481            "Got DATA_ACK with base %u satisfying past message %u on %s\n",
1482            (unsigned int) mid_base,
1483            ntohl (crm->data_message->mid.mid),
1484            GCCH_2s (ch));
1485       handle_matching_ack (ch,
1486                            cti,
1487                            crm);
1488       found = GNUNET_YES;
1489       continue;
1490     }
1491     delta--;
1492     if (delta >= 64)
1493       continue;
1494     LOG (GNUNET_ERROR_TYPE_DEBUG,
1495          "Testing bit %llX for mid %u (base: %u)\n",
1496          (1LLU << delta),
1497          ntohl (crm->data_message->mid.mid),
1498          mid_base);
1499     if (0 != (mid_mask & (1LLU << delta)))
1500     {
1501       LOG (GNUNET_ERROR_TYPE_DEBUG,
1502            "Got DATA_ACK with mask for %u on %s\n",
1503            ntohl (crm->data_message->mid.mid),
1504            GCCH_2s (ch));
1505       handle_matching_ack (ch,
1506                            cti,
1507                            crm);
1508       found = GNUNET_YES;
1509     }
1510   }
1511   if (GNUNET_NO == found)
1512   {
1513     /* ACK for message we already dropped, might have been a
1514        duplicate ACK? Ignore. */
1515     LOG (GNUNET_ERROR_TYPE_DEBUG,
1516          "Duplicate DATA_ACK on %s, ignoring\n",
1517          GCCH_2s (ch));
1518     GNUNET_STATISTICS_update (stats,
1519                               "# duplicate DATA_ACKs",
1520                               1,
1521                               GNUNET_NO);
1522     return;
1523   }
1524   if (NULL != ch->retry_data_task)
1525   {
1526     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1527     ch->retry_data_task = NULL;
1528   }
1529   if ( (NULL != ch->head_sent) &&
1530        (NULL == ch->head_sent->qe) )
1531     ch->retry_data_task
1532       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1533                                  &retry_transmission,
1534                                  ch);
1535 }
1536
1537
1538 /**
1539  * Destroy channel, based on the other peer closing the
1540  * connection.  Also needs to remove this channel from
1541  * the tunnel.
1542  *
1543  * @param ch channel to destroy
1544  * @param cti identifier of the connection that delivered the message,
1545  *            NULL if we are simulating receiving a destroy due to shutdown
1546  */
1547 void
1548 GCCH_handle_remote_destroy (struct CadetChannel *ch,
1549                             const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1550 {
1551   struct CadetChannelClient *ccc;
1552
1553   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1554   LOG (GNUNET_ERROR_TYPE_DEBUG,
1555        "Received remote channel DESTROY for %s\n",
1556        GCCH_2s (ch));
1557   if (GNUNET_YES == ch->destroy)
1558   {
1559     /* Local client already gone, this is instant-death. */
1560     channel_destroy (ch);
1561     return;
1562   }
1563   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1564   if (NULL != ccc->head_recv)
1565   {
1566     LOG (GNUNET_ERROR_TYPE_WARNING,
1567          "Lost end of transmission due to remote shutdown on %s\n",
1568          GCCH_2s (ch));
1569     /* FIXME: change API to notify client about truncated transmission! */
1570   }
1571   ch->destroy = GNUNET_YES;
1572   GSC_handle_remote_channel_destroy (ccc->c,
1573                                      ccc->ccn,
1574                                      ch);
1575   channel_destroy (ch);
1576 }
1577
1578
1579 /**
1580  * Test if element @a e1 comes before element @a e2.
1581  *
1582  * @param cls closure, to a flag where we indicate duplicate packets
1583  * @param crm1 an element of to sort
1584  * @param crm2 another element to sort
1585  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1586  */
1587 static int
1588 cmp_crm_by_next_retry (void *cls,
1589                        struct CadetReliableMessage *crm1,
1590                        struct CadetReliableMessage *crm2)
1591 {
1592   if (crm1->next_retry.abs_value_us <
1593       crm2->next_retry.abs_value_us)
1594     return GNUNET_YES;
1595   return GNUNET_NO;
1596 }
1597
1598
1599 /**
1600  * Function called once the tunnel has sent one of our messages.
1601  * If the message is unreliable, simply frees the `crm`. If the
1602  * message was reliable, calculate retransmission time and
1603  * wait for ACK (or retransmit).
1604  *
1605  * @param cls the `struct CadetReliableMessage` that was sent
1606  * @param cid identifier of the connection within the tunnel, NULL
1607  *            if transmission failed
1608  */
1609 static void
1610 data_sent_cb (void *cls,
1611               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
1612 {
1613   struct CadetReliableMessage *crm = cls;
1614   struct CadetChannel *ch = crm->ch;
1615
1616   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1617   GNUNET_assert (NULL != crm->qe);
1618   crm->qe = NULL;
1619   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1620                                ch->tail_sent,
1621                                crm);
1622   if (GNUNET_NO == ch->reliable)
1623   {
1624     GNUNET_free (crm->data_message);
1625     GNUNET_free (crm);
1626     ch->pending_messages--;
1627     send_ack_to_client (ch,
1628                         (NULL == ch->owner)
1629                         ? GNUNET_NO
1630                         : GNUNET_YES);
1631     return;
1632   }
1633   if (NULL == cid)
1634   {
1635     /* There was an error sending. */
1636     crm->num_transmissions = GNUNET_SYSERR;
1637   }
1638   else if (GNUNET_SYSERR != crm->num_transmissions)
1639   {
1640     /* Increment transmission counter, and possibly store @a cid
1641        if this was the first transmission. */
1642     crm->num_transmissions++;
1643     if (1 == crm->num_transmissions)
1644     {
1645       crm->first_transmission_time = GNUNET_TIME_absolute_get ();
1646       crm->connection_taken = *cid;
1647       GCC_ack_expected (cid);
1648     }
1649   }
1650   if ( (0 == crm->retry_delay.rel_value_us) &&
1651        (NULL != cid) )
1652   {
1653     struct CadetConnection *cc = GCC_lookup (cid);
1654
1655     if (NULL != cc)
1656       crm->retry_delay = GCC_get_metrics (cc)->aged_latency;
1657     else
1658       crm->retry_delay = ch->retry_time;
1659   }
1660   crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1661   crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1662                                                MIN_RTT_DELAY);
1663   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1664
1665   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1666                                       cmp_crm_by_next_retry,
1667                                       NULL,
1668                                       ch->head_sent,
1669                                       ch->tail_sent,
1670                                       crm);
1671   LOG (GNUNET_ERROR_TYPE_DEBUG,
1672        "Message %u sent, next transmission on %s in %s\n",
1673        (unsigned int) ntohl (crm->data_message->mid.mid),
1674        GCCH_2s (ch),
1675        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1676                                                GNUNET_YES));
1677   if (NULL == ch->head_sent->qe)
1678   {
1679     if (NULL != ch->retry_data_task)
1680       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1681     ch->retry_data_task
1682       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1683                                  &retry_transmission,
1684                                  ch);
1685   }
1686 }
1687
1688
1689 /**
1690  * Handle data given by a client.
1691  *
1692  * Check whether the client is allowed to send in this tunnel, save if
1693  * channel is reliable and send an ACK to the client if there is still
1694  * buffer space in the tunnel.
1695  *
1696  * @param ch Channel.
1697  * @param sender_ccn ccn of the sender
1698  * @param buf payload to transmit.
1699  * @param buf_len number of bytes in @a buf
1700  * @return #GNUNET_OK if everything goes well,
1701  *         #GNUNET_SYSERR in case of an error.
1702  */
1703 int
1704 GCCH_handle_local_data (struct CadetChannel *ch,
1705                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1706                         const char *buf,
1707                         size_t buf_len)
1708 {
1709   struct CadetReliableMessage *crm;
1710
1711   if (ch->pending_messages > ch->max_pending_messages)
1712   {
1713     GNUNET_break (0);
1714     return GNUNET_SYSERR;
1715   }
1716   if (GNUNET_YES == ch->destroy)
1717   {
1718     /* we are going down, drop messages */
1719     return GNUNET_OK;
1720   }
1721   ch->pending_messages++;
1722
1723   if (GNUNET_YES == ch->is_loopback)
1724   {
1725     struct CadetChannelClient *receiver;
1726     struct GNUNET_MQ_Envelope *env;
1727     struct GNUNET_CADET_LocalData *ld;
1728     int to_owner;
1729
1730     env = GNUNET_MQ_msg_extra (ld,
1731                                buf_len,
1732                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1733     if ( (NULL != ch->owner) &&
1734          (sender_ccn.channel_of_client ==
1735           ch->owner->ccn.channel_of_client) )
1736     {
1737       receiver = ch->dest;
1738       to_owner = GNUNET_NO;
1739     }
1740     else if ( (NULL != ch->dest) &&
1741               (sender_ccn.channel_of_client ==
1742                ch->dest->ccn.channel_of_client) )
1743     {
1744       receiver = ch->owner;
1745       to_owner = GNUNET_YES;
1746     }
1747     else
1748     {
1749       GNUNET_break (0);
1750       return GNUNET_SYSERR;
1751     }
1752     ld->ccn = receiver->ccn;
1753     GNUNET_memcpy (&ld[1],
1754                    buf,
1755                    buf_len);
1756     if (GNUNET_YES == receiver->client_ready)
1757     {
1758       GSC_send_to_client (receiver->c,
1759                           env);
1760       send_ack_to_client (ch,
1761                           to_owner);
1762     }
1763     else
1764     {
1765       struct CadetOutOfOrderMessage *oom;
1766
1767       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1768       oom->env = env;
1769       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1770                                         receiver->tail_recv,
1771                                         oom);
1772       receiver->num_recv++;
1773     }
1774     return GNUNET_OK;
1775   }
1776
1777   /* Everything is correct, send the message. */
1778   crm = GNUNET_malloc (sizeof (*crm));
1779   crm->ch = ch;
1780   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1781                                      + buf_len);
1782   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1783   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1784   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1785   crm->data_message->mid = ch->mid_send;
1786   crm->data_message->ctn = ch->ctn;
1787   GNUNET_memcpy (&crm->data_message[1],
1788                  buf,
1789                  buf_len);
1790   GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1791                                     ch->tail_sent,
1792                                     crm);
1793   LOG (GNUNET_ERROR_TYPE_DEBUG,
1794        "Sending message %u from local client to %s with %u bytes\n",
1795        ntohl (crm->data_message->mid.mid),
1796        GCCH_2s (ch),
1797        buf_len);
1798   if (NULL != ch->retry_data_task)
1799   {
1800     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1801     ch->retry_data_task = NULL;
1802   }
1803   crm->qe = GCT_send (ch->t,
1804                       &crm->data_message->header,
1805                       &data_sent_cb,
1806                       crm);
1807   GNUNET_assert (NULL == ch->retry_data_task);
1808   return GNUNET_OK;
1809 }
1810
1811
1812 /**
1813  * Handle ACK from client on local channel.  Means the client is ready
1814  * for more data, see if we have any for it.
1815  *
1816  * @param ch channel to destroy
1817  * @param client_ccn ccn of the client sending the ack
1818  */
1819 void
1820 GCCH_handle_local_ack (struct CadetChannel *ch,
1821                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1822 {
1823   struct CadetChannelClient *ccc;
1824   struct CadetOutOfOrderMessage *com;
1825
1826   if ( (NULL != ch->owner) &&
1827        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1828     ccc = ch->owner;
1829   else if ( (NULL != ch->dest) &&
1830             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1831     ccc = ch->dest;
1832   else
1833     GNUNET_assert (0);
1834   ccc->client_ready = GNUNET_YES;
1835   com = ccc->head_recv;
1836   if (NULL == com)
1837   {
1838     LOG (GNUNET_ERROR_TYPE_DEBUG,
1839          "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1840          GSC_2s (ccc->c),
1841          ntohl (client_ccn.channel_of_client),
1842          GCCH_2s (ch),
1843          ntohl (ccc->ccn.channel_of_client),
1844          ccc);
1845     return; /* none pending */
1846   }
1847   if (GNUNET_YES == ch->is_loopback)
1848   {
1849     int to_owner;
1850
1851     /* Messages are always in-order, just send */
1852     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1853                                  ccc->tail_recv,
1854                                  com);
1855     ccc->num_recv--;
1856     GSC_send_to_client (ccc->c,
1857                         com->env);
1858     /* Notify sender that we can receive more */
1859     if (ccc->ccn.channel_of_client ==
1860         ch->owner->ccn.channel_of_client)
1861     {
1862       to_owner = GNUNET_NO;
1863     }
1864     else
1865     {
1866       GNUNET_assert (ccc->ccn.channel_of_client ==
1867                      ch->dest->ccn.channel_of_client);
1868       to_owner = GNUNET_YES;
1869     }
1870     send_ack_to_client (ch,
1871                         to_owner);
1872     GNUNET_free (com);
1873     return;
1874   }
1875
1876   if ( (com->mid.mid != ch->mid_recv.mid) &&
1877        (GNUNET_NO == ch->out_of_order) &&
1878        (GNUNET_YES == ch->reliable) )
1879   {
1880     LOG (GNUNET_ERROR_TYPE_DEBUG,
1881          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1882          GSC_2s (ccc->c),
1883          ntohl (ccc->ccn.channel_of_client),
1884          ntohl (com->mid.mid),
1885          ntohl (ch->mid_recv.mid));
1886     return; /* missing next one in-order */
1887   }
1888
1889   LOG (GNUNET_ERROR_TYPE_DEBUG,
1890        "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
1891        ntohl (com->mid.mid),
1892        GSC_2s (ccc->c),
1893        ntohl (ccc->ccn.channel_of_client),
1894        GCCH_2s (ch));
1895
1896   /* all good, pass next message to client */
1897   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1898                                ccc->tail_recv,
1899                                com);
1900   ccc->num_recv--;
1901   /* FIXME: if unreliable, this is not aggressive
1902      enough, as it would be OK to have lost some! */
1903
1904   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1905   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1906   ccc->client_ready = GNUNET_NO;
1907   GSC_send_to_client (ccc->c,
1908                       com->env);
1909   GNUNET_free (com);
1910   send_channel_data_ack (ch);
1911   if (NULL != ccc->head_recv)
1912     return;
1913   if (GNUNET_NO == ch->destroy)
1914     return;
1915   GCT_send_channel_destroy (ch->t,
1916                             ch->ctn);
1917   channel_destroy (ch);
1918 }
1919
1920
1921 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1922
1923
1924 /**
1925  * Log channel info.
1926  *
1927  * @param ch Channel.
1928  * @param level Debug level to use.
1929  */
1930 void
1931 GCCH_debug (struct CadetChannel *ch,
1932             enum GNUNET_ErrorType level)
1933 {
1934   int do_log;
1935
1936   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1937                                        "cadet-chn",
1938                                        __FILE__, __FUNCTION__, __LINE__);
1939   if (0 == do_log)
1940     return;
1941
1942   if (NULL == ch)
1943   {
1944     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1945     return;
1946   }
1947   LOG2 (level,
1948         "CHN %s:%X (%p)\n",
1949         GCT_2s (ch->t),
1950         ch->ctn,
1951         ch);
1952   if (NULL != ch->owner)
1953   {
1954     LOG2 (level,
1955           "CHN origin %s ready %s local-id: %u\n",
1956           GSC_2s (ch->owner->c),
1957           ch->owner->client_ready ? "YES" : "NO",
1958           ntohl (ch->owner->ccn.channel_of_client));
1959   }
1960   if (NULL != ch->dest)
1961   {
1962     LOG2 (level,
1963           "CHN destination %s ready %s local-id: %u\n",
1964           GSC_2s (ch->dest->c),
1965           ch->dest->client_ready ? "YES" : "NO",
1966           ntohl (ch->dest->ccn.channel_of_client));
1967   }
1968   LOG2 (level,
1969         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1970         ntohl (ch->mid_recv.mid),
1971         (unsigned long long) ch->mid_futures,
1972         ntohl (ch->mid_send.mid));
1973 }
1974
1975
1976
1977 /* end of gnunet-service-cadet-new_channel.c */