optimize mqm_head scans by avoiding constantly scanning over definitively non-ready...
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/gnunet-service-cadet-new_channel.c
22  * @brief logical links between CADET clients
23  * @author Bartlomiej Polot
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - Congestion/flow control:
28  *   + calculate current RTT if possible, use that for initial retransmissions
29  *     (NOTE: needs us to learn which connection the tunnel uses for the message!)
30  *   + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31  *     (and figure out how/where to use this!)
32  *   + figure out flow control without ACKs (unreliable traffic!)
33  * - revisit handling of 'unbuffered' traffic!
34  *   (need to push down through tunnel into connection selection)
35  * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
36  *   reserve more bits in 'options' to allow for buffer size control?
37  */
38 #include "platform.h"
39 #include "gnunet_util_lib.h"
40 #include "cadet.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-cadet-new.h"
43 #include "gnunet-service-cadet-new_channel.h"
44 #include "gnunet-service-cadet-new_connection.h"
45 #include "gnunet-service-cadet-new_tunnels.h"
46 #include "gnunet-service-cadet-new_peer.h"
47 #include "gnunet-service-cadet-new_paths.h"
48
49 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50
51 /**
52  * How long do we initially wait before retransmitting?
53  */
54 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55
56 /**
57  * How long do we wait before dropping state about incoming
58  * connection to closed port?
59  */
60 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
61
62 /**
63  * How long do we wait at least before retransmitting ever?
64  */
65 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
66
67 /**
68  * Maximum message ID into the future we accept for out-of-order messages.
69  * If the message is more than this into the future, we drop it.  This is
70  * important both to detect values that are actually in the past, as well
71  * as to limit adversarially triggerable memory consumption.
72  *
73  * Note that right now we have "max_pending_messages = 4" hard-coded in
74  * the logic below, so a value of 4 would suffice here. But we plan to
75  * allow larger windows in the future...
76  */
77 #define MAX_OUT_OF_ORDER_DISTANCE 1024
78
79
80 /**
81  * All the states a connection can be in.
82  */
83 enum CadetChannelState
84 {
85   /**
86    * Uninitialized status, should never appear in operation.
87    */
88   CADET_CHANNEL_NEW,
89
90   /**
91    * Connection create message sent, waiting for ACK.
92    */
93   CADET_CHANNEL_OPEN_SENT,
94
95   /**
96    * Connection confirmed, ready to carry traffic.
97    */
98   CADET_CHANNEL_READY
99 };
100
101
102 /**
103  * Info needed to retry a message in case it gets lost.
104  * Note that we DO use this structure also for unreliable
105  * messages.
106  */
107 struct CadetReliableMessage
108 {
109   /**
110    * Double linked list, FIFO style
111    */
112   struct CadetReliableMessage *next;
113
114   /**
115    * Double linked list, FIFO style
116    */
117   struct CadetReliableMessage *prev;
118
119   /**
120    * Which channel is this message in?
121    */
122   struct CadetChannel *ch;
123
124   /**
125    * Entry in the tunnels queue for this message, NULL if it has left
126    * the tunnel.  Used to cancel transmission in case we receive an
127    * ACK in time.
128    */
129   struct CadetTunnelQueueEntry *qe;
130
131   /**
132    * How soon should we retry if we fail to get an ACK?
133    * Messages in the queue are sorted by this value.
134    */
135   struct GNUNET_TIME_Absolute next_retry;
136
137   /**
138    * How long do we wait for an ACK after transmission?
139    * Use for the back-off calculation.
140    */
141   struct GNUNET_TIME_Relative retry_delay;
142
143   /**
144    * Data message we are trying to send.
145    */
146   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
147
148 };
149
150
151 /**
152  * List of received out-of-order data messages.
153  */
154 struct CadetOutOfOrderMessage
155 {
156   /**
157    * Double linked list, FIFO style
158    */
159   struct CadetOutOfOrderMessage *next;
160
161   /**
162    * Double linked list, FIFO style
163    */
164   struct CadetOutOfOrderMessage *prev;
165
166   /**
167    * ID of the message (messages up to this point needed
168    * before we give this one to the client).
169    */
170   struct ChannelMessageIdentifier mid;
171
172   /**
173    * The envelope with the payload of the out-of-order message
174    */
175   struct GNUNET_MQ_Envelope *env;
176
177 };
178
179
180 /**
181  * Client endpoint of a `struct CadetChannel`.  A channel may be a
182  * loopback channel, in which case it has two of these endpoints.
183  * Note that flow control also is required in both directions.
184  */
185 struct CadetChannelClient
186 {
187   /**
188    * Client handle.  Not by itself sufficient to designate
189    * the client endpoint, as the same client handle may
190    * be used for both the owner and the destination, and
191    * we thus also need the channel ID to identify the client.
192    */
193   struct CadetClient *c;
194
195   /**
196    * Head of DLL of messages received out of order or while client was unready.
197    */
198   struct CadetOutOfOrderMessage *head_recv;
199
200   /**
201    * Tail DLL of messages received out of order or while client was unready.
202    */
203   struct CadetOutOfOrderMessage *tail_recv;
204
205   /**
206    * Local tunnel number for this client.
207    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
208    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
209    */
210   struct GNUNET_CADET_ClientChannelNumber ccn;
211
212   /**
213    * Number of entries currently in @a head_recv DLL.
214    */
215   unsigned int num_recv;
216
217   /**
218    * Can we send data to the client?
219    */
220   int client_ready;
221
222 };
223
224
225 /**
226  * Struct containing all information regarding a channel to a remote client.
227  */
228 struct CadetChannel
229 {
230   /**
231    * Tunnel this channel is in.
232    */
233   struct CadetTunnel *t;
234
235   /**
236    * Client owner of the tunnel, if any.
237    * (Used if this channel represends the initiating end of the tunnel.)
238    */
239   struct CadetChannelClient *owner;
240
241   /**
242    * Client destination of the tunnel, if any.
243    * (Used if this channel represents the listening end of the tunnel.)
244    */
245   struct CadetChannelClient *dest;
246
247   /**
248    * Last entry in the tunnel's queue relating to control messages
249    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
250    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
251    * transmission in case we receive updated information.
252    */
253   struct CadetTunnelQueueEntry *last_control_qe;
254
255   /**
256    * Head of DLL of messages sent and not yet ACK'd.
257    */
258   struct CadetReliableMessage *head_sent;
259
260   /**
261    * Tail of DLL of messages sent and not yet ACK'd.
262    */
263   struct CadetReliableMessage *tail_sent;
264
265   /**
266    * Task to resend/poll in case no ACK is received.
267    */
268   struct GNUNET_SCHEDULER_Task *retry_control_task;
269
270   /**
271    * Task to resend/poll in case no ACK is received.
272    */
273   struct GNUNET_SCHEDULER_Task *retry_data_task;
274
275   /**
276    * Last time the channel was used
277    */
278   struct GNUNET_TIME_Absolute timestamp;
279
280   /**
281    * Destination port of the channel.
282    */
283   struct GNUNET_HashCode port;
284
285   /**
286    * Counter for exponential backoff.
287    */
288   struct GNUNET_TIME_Relative retry_time;
289
290   /**
291    * How long does it usually take to get an ACK.
292    */
293   struct GNUNET_TIME_Relative expected_delay;
294
295   /**
296    * Bitfield of already-received messages past @e mid_recv.
297    */
298   uint64_t mid_futures;
299
300   /**
301    * Next MID expected for incoming traffic.
302    */
303   struct ChannelMessageIdentifier mid_recv;
304
305   /**
306    * Next MID to use for outgoing traffic.
307    */
308   struct ChannelMessageIdentifier mid_send;
309
310   /**
311    * Total (reliable) messages pending ACK for this channel.
312    */
313   unsigned int pending_messages;
314
315   /**
316    * Maximum (reliable) messages pending ACK for this channel
317    * before we throttle the client.
318    */
319   unsigned int max_pending_messages;
320
321   /**
322    * Number identifying this channel in its tunnel.
323    */
324   struct GNUNET_CADET_ChannelTunnelNumber ctn;
325
326   /**
327    * Channel state.
328    */
329   enum CadetChannelState state;
330
331   /**
332    * Count how many ACKs we skipped, used to prevent long
333    * sequences of ACK skipping.
334    */
335   unsigned int skip_ack_series;
336
337   /**
338    * Is the tunnel bufferless (minimum latency)?
339    */
340   int nobuffer;
341
342   /**
343    * Is the tunnel reliable?
344    */
345   int reliable;
346
347   /**
348    * Is the tunnel out-of-order?
349    */
350   int out_of_order;
351
352   /**
353    * Is this channel a loopback channel, where the destination is us again?
354    */
355   int is_loopback;
356
357   /**
358    * Flag to signal the destruction of the channel.  If this is set to
359    * #GNUNET_YES the channel will be destroyed once the queue is
360    * empty.
361    */
362   int destroy;
363
364 };
365
366
367 /**
368  * Get the static string for identification of the channel.
369  *
370  * @param ch Channel.
371  *
372  * @return Static string with the channel IDs.
373  */
374 const char *
375 GCCH_2s (const struct CadetChannel *ch)
376 {
377   static char buf[128];
378
379   GNUNET_snprintf (buf,
380                    sizeof (buf),
381                    "Channel %s:%s ctn:%X(%X/%X)",
382                    (GNUNET_YES == ch->is_loopback)
383                    ? "loopback"
384                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
385                    GNUNET_h2s (&ch->port),
386                    ch->ctn,
387                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
388                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
389   return buf;
390 }
391
392
393 /**
394  * Get the channel's public ID.
395  *
396  * @param ch Channel.
397  *
398  * @return ID used to identify the channel with the remote peer.
399  */
400 struct GNUNET_CADET_ChannelTunnelNumber
401 GCCH_get_id (const struct CadetChannel *ch)
402 {
403   return ch->ctn;
404 }
405
406
407 /**
408  * Release memory associated with @a ccc
409  *
410  * @param ccc data structure to clean up
411  */
412 static void
413 free_channel_client (struct CadetChannelClient *ccc)
414 {
415   struct CadetOutOfOrderMessage *com;
416
417   while (NULL != (com = ccc->head_recv))
418   {
419     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
420                                  ccc->tail_recv,
421                                  com);
422     ccc->num_recv--;
423     GNUNET_MQ_discard (com->env);
424     GNUNET_free (com);
425   }
426   GNUNET_free (ccc);
427 }
428
429
430 /**
431  * Destroy the given channel.
432  *
433  * @param ch channel to destroy
434  */
435 static void
436 channel_destroy (struct CadetChannel *ch)
437 {
438   struct CadetReliableMessage *crm;
439
440   while (NULL != (crm = ch->head_sent))
441   {
442     GNUNET_assert (ch == crm->ch);
443     if (NULL != crm->qe)
444     {
445       GCT_send_cancel (crm->qe);
446       crm->qe = NULL;
447     }
448     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
449                                  ch->tail_sent,
450                                  crm);
451     GNUNET_free (crm->data_message);
452     GNUNET_free (crm);
453   }
454   if (NULL != ch->owner)
455   {
456     free_channel_client (ch->owner);
457     ch->owner = NULL;
458   }
459   if (NULL != ch->dest)
460   {
461     free_channel_client (ch->dest);
462     ch->dest = NULL;
463   }
464   if (NULL != ch->last_control_qe)
465   {
466     GCT_send_cancel (ch->last_control_qe);
467     ch->last_control_qe = NULL;
468   }
469   if (NULL != ch->retry_data_task)
470   {
471     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
472     ch->retry_data_task = NULL;
473   }
474   if (NULL != ch->retry_control_task)
475   {
476     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
477     ch->retry_control_task = NULL;
478   }
479   if (GNUNET_NO == ch->is_loopback)
480   {
481     GCT_remove_channel (ch->t,
482                         ch,
483                         ch->ctn);
484     ch->t = NULL;
485   }
486   GNUNET_free (ch);
487 }
488
489
490 /**
491  * Send a channel create message.
492  *
493  * @param cls Channel for which to send.
494  */
495 static void
496 send_channel_open (void *cls);
497
498
499 /**
500  * Function called once the tunnel confirms that we sent the
501  * create message.  Delays for a bit until we retry.
502  *
503  * @param cls our `struct CadetChannel`.
504  */
505 static void
506 channel_open_sent_cb (void *cls)
507 {
508   struct CadetChannel *ch = cls;
509
510   GNUNET_assert (NULL != ch->last_control_qe);
511   ch->last_control_qe = NULL;
512   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
513   LOG (GNUNET_ERROR_TYPE_DEBUG,
514        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
515        GCCH_2s (ch),
516        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
517                                                GNUNET_YES));
518   ch->retry_control_task
519     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
520                                     &send_channel_open,
521                                     ch);
522 }
523
524
525 /**
526  * Send a channel open message.
527  *
528  * @param cls Channel for which to send.
529  */
530 static void
531 send_channel_open (void *cls)
532 {
533   struct CadetChannel *ch = cls;
534   struct GNUNET_CADET_ChannelOpenMessage msgcc;
535   uint32_t options;
536
537   ch->retry_control_task = NULL;
538   LOG (GNUNET_ERROR_TYPE_DEBUG,
539        "Sending CHANNEL_OPEN message for %s\n",
540        GCCH_2s (ch));
541   options = 0;
542   if (ch->nobuffer)
543     options |= GNUNET_CADET_OPTION_NOBUFFER;
544   if (ch->reliable)
545     options |= GNUNET_CADET_OPTION_RELIABLE;
546   if (ch->out_of_order)
547     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
548   msgcc.header.size = htons (sizeof (msgcc));
549   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
550   msgcc.opt = htonl (options);
551   msgcc.port = ch->port;
552   msgcc.ctn = ch->ctn;
553   ch->state = CADET_CHANNEL_OPEN_SENT;
554   ch->last_control_qe = GCT_send (ch->t,
555                                   &msgcc.header,
556                                   &channel_open_sent_cb,
557                                   ch);
558   GNUNET_assert (NULL == ch->retry_control_task);
559 }
560
561
562 /**
563  * Function called once and only once after a channel was bound
564  * to its tunnel via #GCT_add_channel() is ready for transmission.
565  * Note that this is only the case for channels that this peer
566  * initiates, as for incoming channels we assume that they are
567  * ready for transmission immediately upon receiving the open
568  * message.  Used to bootstrap the #GCT_send() process.
569  *
570  * @param ch the channel for which the tunnel is now ready
571  */
572 void
573 GCCH_tunnel_up (struct CadetChannel *ch)
574 {
575   GNUNET_assert (NULL == ch->retry_control_task);
576   LOG (GNUNET_ERROR_TYPE_DEBUG,
577        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
578        GCCH_2s (ch));
579   ch->retry_control_task
580     = GNUNET_SCHEDULER_add_now (&send_channel_open,
581                                 ch);
582 }
583
584
585 /**
586  * Create a new channel.
587  *
588  * @param owner local client owning the channel
589  * @param ccn local number of this channel at the @a owner
590  * @param destination peer to which we should build the channel
591  * @param port desired port at @a destination
592  * @param options options for the channel
593  * @return handle to the new channel
594  */
595 struct CadetChannel *
596 GCCH_channel_local_new (struct CadetClient *owner,
597                         struct GNUNET_CADET_ClientChannelNumber ccn,
598                         struct CadetPeer *destination,
599                         const struct GNUNET_HashCode *port,
600                         uint32_t options)
601 {
602   struct CadetChannel *ch;
603   struct CadetChannelClient *ccco;
604
605   ccco = GNUNET_new (struct CadetChannelClient);
606   ccco->c = owner;
607   ccco->ccn = ccn;
608   ccco->client_ready = GNUNET_YES;
609
610   ch = GNUNET_new (struct CadetChannel);
611   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
612   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
613   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
614   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
615   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
616   ch->owner = ccco;
617   ch->port = *port;
618   if (0 == memcmp (&my_full_id,
619                    GCP_get_id (destination),
620                    sizeof (struct GNUNET_PeerIdentity)))
621   {
622     struct CadetClient *c;
623
624     ch->is_loopback = GNUNET_YES;
625     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
626                                            port);
627     if (NULL == c)
628     {
629       /* port closed, wait for it to possibly open */
630       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
631                                                 port,
632                                                 ch,
633                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
634       LOG (GNUNET_ERROR_TYPE_DEBUG,
635            "Created loose incoming loopback channel to port %s\n",
636            GNUNET_h2s (&ch->port));
637     }
638     else
639     {
640       ch->dest = GNUNET_new (struct CadetChannelClient);
641       ch->dest->c = c;
642       ch->dest->client_ready = GNUNET_YES;
643       GCCH_bind (ch,
644                  ch->dest->c);
645     }
646   }
647   else
648   {
649     ch->t = GCP_get_tunnel (destination,
650                             GNUNET_YES);
651     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
652     ch->ctn = GCT_add_channel (ch->t,
653                                ch);
654   }
655   GNUNET_STATISTICS_update (stats,
656                             "# channels",
657                             1,
658                             GNUNET_NO);
659   LOG (GNUNET_ERROR_TYPE_DEBUG,
660        "Created channel to port %s at peer %s for %s using %s\n",
661        GNUNET_h2s (port),
662        GCP_2s (destination),
663        GSC_2s (owner),
664        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
665   return ch;
666 }
667
668
669 /**
670  * We had an incoming channel to a port that is closed.
671  * It has not been opened for a while, drop it.
672  *
673  * @param cls the channel to drop
674  */
675 static void
676 timeout_closed_cb (void *cls)
677 {
678   struct CadetChannel *ch = cls;
679
680   ch->retry_control_task = NULL;
681   LOG (GNUNET_ERROR_TYPE_DEBUG,
682        "Closing incoming channel to port %s from peer %s due to timeout\n",
683        GNUNET_h2s (&ch->port),
684        GCP_2s (GCT_get_destination (ch->t)));
685   channel_destroy (ch);
686 }
687
688
689 /**
690  * Create a new channel based on a request coming in over the network.
691  *
692  * @param t tunnel to the remote peer
693  * @param ctn identifier of this channel in the tunnel
694  * @param port desired local port
695  * @param options options for the channel
696  * @return handle to the new channel
697  */
698 struct CadetChannel *
699 GCCH_channel_incoming_new (struct CadetTunnel *t,
700                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
701                            const struct GNUNET_HashCode *port,
702                            uint32_t options)
703 {
704   struct CadetChannel *ch;
705   struct CadetClient *c;
706
707   ch = GNUNET_new (struct CadetChannel);
708   ch->port = *port;
709   ch->t = t;
710   ch->ctn = ctn;
711   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
712   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
713   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
714   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
715   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
716   GNUNET_STATISTICS_update (stats,
717                             "# channels",
718                             1,
719                             GNUNET_NO);
720
721   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
722                                          port);
723   if (NULL == c)
724   {
725     /* port closed, wait for it to possibly open */
726     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
727                                               port,
728                                               ch,
729                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
730     ch->retry_control_task
731       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
732                                       &timeout_closed_cb,
733                                       ch);
734     LOG (GNUNET_ERROR_TYPE_DEBUG,
735          "Created loose incoming channel to port %s from peer %s\n",
736          GNUNET_h2s (&ch->port),
737          GCP_2s (GCT_get_destination (ch->t)));
738   }
739   else
740   {
741     GCCH_bind (ch,
742                c);
743   }
744   GNUNET_STATISTICS_update (stats,
745                             "# channels",
746                             1,
747                             GNUNET_NO);
748   return ch;
749 }
750
751
752 /**
753  * Function called once the tunnel confirms that we sent the
754  * ACK message.  Just remembers it was sent, we do not expect
755  * ACKs for ACKs ;-).
756  *
757  * @param cls our `struct CadetChannel`.
758  */
759 static void
760 send_ack_cb (void *cls)
761 {
762   struct CadetChannel *ch = cls;
763
764   GNUNET_assert (NULL != ch->last_control_qe);
765   ch->last_control_qe = NULL;
766 }
767
768
769 /**
770  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
771  *
772  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
773  */
774 static void
775 send_channel_data_ack (struct CadetChannel *ch)
776 {
777   struct GNUNET_CADET_ChannelDataAckMessage msg;
778
779   if (GNUNET_NO == ch->reliable)
780     return; /* no ACKs */
781   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
782   msg.header.size = htons (sizeof (msg));
783   msg.ctn = ch->ctn;
784   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
785   msg.futures = GNUNET_htonll (ch->mid_futures);
786   if (NULL != ch->last_control_qe)
787     GCT_send_cancel (ch->last_control_qe);
788   LOG (GNUNET_ERROR_TYPE_DEBUG,
789        "Sending DATA_ACK %u:%llX via %s\n",
790        (unsigned int) ntohl (msg.mid.mid),
791        (unsigned long long) ch->mid_futures,
792        GCCH_2s (ch));
793   ch->last_control_qe = GCT_send (ch->t,
794                                   &msg.header,
795                                   &send_ack_cb,
796                                   ch);
797 }
798
799
800 /**
801  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
802  * connection is up.
803  *
804  * @param cls the `struct CadetChannel`
805  */
806 static void
807 send_open_ack (void *cls)
808 {
809   struct CadetChannel *ch = cls;
810   struct GNUNET_CADET_ChannelManageMessage msg;
811
812   LOG (GNUNET_ERROR_TYPE_DEBUG,
813        "Sending CHANNEL_OPEN_ACK on %s\n",
814        GCCH_2s (ch));
815   ch->retry_control_task = NULL;
816   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
817   msg.header.size = htons (sizeof (msg));
818   msg.reserved = htonl (0);
819   msg.ctn = ch->ctn;
820   if (NULL != ch->last_control_qe)
821     GCT_send_cancel (ch->last_control_qe);
822   ch->last_control_qe = GCT_send (ch->t,
823                                   &msg.header,
824                                   &send_ack_cb,
825                                   ch);
826 }
827
828
829 /**
830  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
831  * this channel.  If the binding was successful, (re)transmit the
832  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
833  *
834  * @param ch channel that got the duplicate open
835  */
836 void
837 GCCH_handle_duplicate_open (struct CadetChannel *ch)
838 {
839   if (NULL == ch->dest)
840   {
841     LOG (GNUNET_ERROR_TYPE_DEBUG,
842          "Ignoring duplicate channel OPEN on %s: port is closed\n",
843          GCCH_2s (ch));
844     return;
845   }
846   if (NULL != ch->retry_control_task)
847   {
848     LOG (GNUNET_ERROR_TYPE_DEBUG,
849          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
850          GCCH_2s (ch));
851     return;
852   }
853   LOG (GNUNET_ERROR_TYPE_DEBUG,
854        "Retransmitting OPEN_ACK on %s\n",
855        GCCH_2s (ch));
856   ch->retry_control_task
857     = GNUNET_SCHEDULER_add_now (&send_open_ack,
858                                 ch);
859 }
860
861
862 /**
863  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
864  *
865  * @param ch channel the ack is for
866  * @param to_owner #GNUNET_YES to send to owner,
867  *                 #GNUNET_NO to send to dest
868  */
869 static void
870 send_ack_to_client (struct CadetChannel *ch,
871                     int to_owner)
872 {
873   struct GNUNET_MQ_Envelope *env;
874   struct GNUNET_CADET_LocalAck *ack;
875   struct CadetChannelClient *ccc;
876
877   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
878   if (NULL == ccc)
879   {
880     /* This can happen if we are just getting ACKs after
881        our local client already disconnected. */
882     GNUNET_assert (GNUNET_YES == ch->destroy);
883     return;
884   }
885   env = GNUNET_MQ_msg (ack,
886                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
887   ack->ccn = ccc->ccn;
888   LOG (GNUNET_ERROR_TYPE_DEBUG,
889        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
890        GSC_2s (ccc->c),
891        (GNUNET_YES == to_owner) ? "owner" : "dest",
892        ntohl (ack->ccn.channel_of_client),
893        ch->pending_messages,
894        ch->max_pending_messages);
895   GSC_send_to_client (ccc->c,
896                       env);
897 }
898
899
900 /**
901  * A client is bound to the port that we have a channel
902  * open to.  Send the acknowledgement for the connection
903  * request and establish the link with the client.
904  *
905  * @param ch open incoming channel
906  * @param c client listening on the respective port
907  */
908 void
909 GCCH_bind (struct CadetChannel *ch,
910            struct CadetClient *c)
911 {
912   uint32_t options;
913   struct CadetChannelClient *cccd;
914
915   LOG (GNUNET_ERROR_TYPE_DEBUG,
916        "Binding %s from %s to port %s of %s\n",
917        GCCH_2s (ch),
918        GCT_2s (ch->t),
919        GNUNET_h2s (&ch->port),
920        GSC_2s (c));
921   if (NULL != ch->retry_control_task)
922   {
923     /* there might be a timeout task here */
924     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
925     ch->retry_control_task = NULL;
926   }
927   options = 0;
928   if (ch->nobuffer)
929     options |= GNUNET_CADET_OPTION_NOBUFFER;
930   if (ch->reliable)
931     options |= GNUNET_CADET_OPTION_RELIABLE;
932   if (ch->out_of_order)
933     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
934   cccd = GNUNET_new (struct CadetChannelClient);
935   ch->dest = cccd;
936   cccd->c = c;
937   cccd->client_ready = GNUNET_YES;
938   cccd->ccn = GSC_bind (c,
939                         ch,
940                         (GNUNET_YES == ch->is_loopback)
941                         ? GCP_get (&my_full_id,
942                                    GNUNET_YES)
943                         : GCT_get_destination (ch->t),
944                         &ch->port,
945                         options);
946   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
947                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
948   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
949   if (GNUNET_YES == ch->is_loopback)
950   {
951     ch->state = CADET_CHANNEL_OPEN_SENT;
952     GCCH_handle_channel_open_ack (ch);
953   }
954   else
955   {
956     /* notify other peer that we accepted the connection */
957     ch->retry_control_task
958       = GNUNET_SCHEDULER_add_now (&send_open_ack,
959                                   ch);
960   }
961   /* give client it's initial supply of ACKs */
962   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
963                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
964   for (unsigned int i=0;i<ch->max_pending_messages;i++)
965     send_ack_to_client (ch,
966                         GNUNET_NO);
967 }
968
969
970 /**
971  * Destroy locally created channel.  Called by the local client, so no
972  * need to tell the client.
973  *
974  * @param ch channel to destroy
975  * @param c client that caused the destruction
976  * @param ccn client number of the client @a c
977  */
978 void
979 GCCH_channel_local_destroy (struct CadetChannel *ch,
980                             struct CadetClient *c,
981                             struct GNUNET_CADET_ClientChannelNumber ccn)
982 {
983   LOG (GNUNET_ERROR_TYPE_DEBUG,
984        "%s asks for destruction of %s\n",
985        GSC_2s (c),
986        GCCH_2s (ch));
987   GNUNET_assert (NULL != c);
988   if ( (NULL != ch->owner) &&
989        (c == ch->owner->c) &&
990        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
991   {
992     free_channel_client (ch->owner);
993     ch->owner = NULL;
994   }
995   else if ( (NULL != ch->dest) &&
996             (c == ch->dest->c) &&
997             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
998   {
999     free_channel_client (ch->dest);
1000     ch->dest = NULL;
1001   }
1002   else
1003   {
1004     GNUNET_assert (0);
1005   }
1006
1007   if (GNUNET_YES == ch->destroy)
1008   {
1009     /* other end already destroyed, with the local client gone, no need
1010        to finish transmissions, just destroy immediately. */
1011     channel_destroy (ch);
1012     return;
1013   }
1014   if ( (NULL != ch->head_sent) ||
1015        (NULL != ch->owner) ||
1016        (NULL != ch->dest) )
1017   {
1018     /* Wait for other end to destroy us as well,
1019        and otherwise allow send queue to be transmitted first */
1020     ch->destroy = GNUNET_YES;
1021     return;
1022   }
1023   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1024   if (CADET_CHANNEL_NEW != ch->state)
1025     GCT_send_channel_destroy (ch->t,
1026                               ch->ctn);
1027   /* Nothing left to do, just finish destruction */
1028   channel_destroy (ch);
1029 }
1030
1031
1032 /**
1033  * We got an acknowledgement for the creation of the channel
1034  * (the port is open on the other side). Begin transmissions.
1035  *
1036  * @param ch channel to destroy
1037  */
1038 void
1039 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
1040 {
1041   switch (ch->state)
1042   {
1043   case CADET_CHANNEL_NEW:
1044     /* this should be impossible */
1045     GNUNET_break (0);
1046     break;
1047   case CADET_CHANNEL_OPEN_SENT:
1048     if (NULL == ch->owner)
1049     {
1050       /* We're not the owner, wrong direction! */
1051       GNUNET_break_op (0);
1052       return;
1053     }
1054     LOG (GNUNET_ERROR_TYPE_DEBUG,
1055          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1056          GCCH_2s (ch));
1057     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1058     {
1059       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1060       ch->retry_control_task = NULL;
1061     }
1062     ch->state = CADET_CHANNEL_READY;
1063     /* On first connect, send client as many ACKs as we allow messages
1064        to be buffered! */
1065     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1066       send_ack_to_client (ch,
1067                           GNUNET_YES);
1068     break;
1069   case CADET_CHANNEL_READY:
1070     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1071     LOG (GNUNET_ERROR_TYPE_DEBUG,
1072          "Received duplicate channel OPEN_ACK for %s\n",
1073          GCCH_2s (ch));
1074     GNUNET_STATISTICS_update (stats,
1075                               "# duplicate CREATE_ACKs",
1076                               1,
1077                               GNUNET_NO);
1078     break;
1079   }
1080 }
1081
1082
1083 /**
1084  * Test if element @a e1 comes before element @a e2.
1085  *
1086  * @param cls closure, to a flag where we indicate duplicate packets
1087  * @param m1 a message of to sort
1088  * @param m2 another message to sort
1089  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1090  */
1091 static int
1092 is_before (void *cls,
1093            struct CadetOutOfOrderMessage *m1,
1094            struct CadetOutOfOrderMessage *m2)
1095 {
1096   int *duplicate = cls;
1097   uint32_t v1 = ntohl (m1->mid.mid);
1098   uint32_t v2 = ntohl (m2->mid.mid);
1099   uint32_t delta;
1100
1101   delta = v2 - v1;
1102   if (0 == delta)
1103     *duplicate = GNUNET_YES;
1104   if (delta > (uint32_t) INT_MAX)
1105   {
1106     /* in overflow range, we can safely assume we wrapped around */
1107     return GNUNET_NO;
1108   }
1109   else
1110   {
1111     /* result is small, thus v2 > v1, thus m1 < m2 */
1112     return GNUNET_YES;
1113   }
1114 }
1115
1116
1117 /**
1118  * We got payload data for a channel.  Pass it on to the client
1119  * and send an ACK to the other end (once flow control allows it!)
1120  *
1121  * @param ch channel that got data
1122  * @param msg message that was received
1123  */
1124 void
1125 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1126                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1127 {
1128   struct GNUNET_MQ_Envelope *env;
1129   struct GNUNET_CADET_LocalData *ld;
1130   struct CadetChannelClient *ccc;
1131   size_t payload_size;
1132   struct CadetOutOfOrderMessage *com;
1133   int duplicate;
1134   uint32_t mid_min;
1135   uint32_t mid_max;
1136   uint32_t mid_msg;
1137   uint32_t delta;
1138
1139   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1140   if ( (GNUNET_YES == ch->destroy) &&
1141        (NULL == ch->owner) &&
1142        (NULL == ch->dest) )
1143   {
1144     /* This client is gone, but we still have messages to send to
1145        the other end (which is why @a ch is not yet dead).  However,
1146        we cannot pass messages to our client anymore. */
1147     LOG (GNUNET_ERROR_TYPE_DEBUG,
1148          "Dropping incoming payload on %s as this end is already closed\n",
1149          GCCH_2s (ch));
1150     /* send back DESTROY notification to stop further retransmissions! */
1151     GCT_send_channel_destroy (ch->t,
1152                               ch->ctn);
1153     return;
1154   }
1155   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1156   env = GNUNET_MQ_msg_extra (ld,
1157                              payload_size,
1158                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1159   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1160   GNUNET_memcpy (&ld[1],
1161                  &msg[1],
1162                  payload_size);
1163   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1164   if ( (GNUNET_YES == ccc->client_ready) &&
1165        ( (GNUNET_YES == ch->out_of_order) ||
1166          (msg->mid.mid == ch->mid_recv.mid) ) )
1167   {
1168     LOG (GNUNET_ERROR_TYPE_DEBUG,
1169          "Giving %u bytes of payload with MID %u from %s to client %s\n",
1170          (unsigned int) payload_size,
1171          ntohl (msg->mid.mid),
1172          GCCH_2s (ch),
1173          GSC_2s (ccc->c));
1174     ccc->client_ready = GNUNET_NO;
1175     GSC_send_to_client (ccc->c,
1176                         env);
1177     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1178     ch->mid_futures >>= 1;
1179     send_channel_data_ack (ch);
1180     return;
1181   }
1182
1183   if (GNUNET_YES == ch->reliable)
1184   {
1185     /* check if message ought to be dropped because it is ancient/too distant/duplicate */
1186     mid_min = ntohl (ch->mid_recv.mid);
1187     mid_max = mid_min + ch->max_pending_messages;
1188     mid_msg = ntohl (msg->mid.mid);
1189     if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
1190          ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
1191     {
1192       LOG (GNUNET_ERROR_TYPE_DEBUG,
1193            "%s at %u drops ancient or far-future message %u\n",
1194            GCCH_2s (ch),
1195            (unsigned int) mid_min,
1196            ntohl (msg->mid.mid));
1197
1198       GNUNET_STATISTICS_update (stats,
1199                                 "# duplicate DATA (ancient or future)",
1200                                 1,
1201                                 GNUNET_NO);
1202       GNUNET_MQ_discard (env);
1203       send_channel_data_ack (ch);
1204       return;
1205     }
1206     /* mark bit for future ACKs */
1207     delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
1208     if (delta < 64)
1209     {
1210       if (0 != (ch->mid_futures & (1LLU << delta)))
1211       {
1212         /* Duplicate within the queue, drop also */
1213         LOG (GNUNET_ERROR_TYPE_DEBUG,
1214              "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1215              (unsigned int) payload_size,
1216              GCCH_2s (ch),
1217              ntohl (msg->mid.mid));
1218         GNUNET_STATISTICS_update (stats,
1219                                   "# duplicate DATA",
1220                                   1,
1221                                   GNUNET_NO);
1222         GNUNET_MQ_discard (env);
1223         send_channel_data_ack (ch);
1224         return;
1225       }
1226       ch->mid_futures |= (1LLU << delta);
1227       LOG (GNUNET_ERROR_TYPE_DEBUG,
1228            "Marked bit %llX for mid %u (base: %u); now: %llX\n",
1229            (1LLU << delta),
1230            mid_msg,
1231            mid_min,
1232            ch->mid_futures);
1233     }
1234   }
1235   else /* ! ch->reliable */
1236   {
1237     /* Channel is unreliable, so we do not ACK. But we also cannot
1238        allow buffering everything, so check if we have space... */
1239     if (ccc->num_recv >= ch->max_pending_messages)
1240     {
1241       struct CadetOutOfOrderMessage *drop;
1242
1243       /* Yep, need to drop. Drop the oldest message in
1244          the buffer. */
1245       LOG (GNUNET_ERROR_TYPE_DEBUG,
1246            "Queue full due slow client on %s, dropping oldest message\n",
1247            GCCH_2s (ch));
1248       GNUNET_STATISTICS_update (stats,
1249                                 "# messages dropped due to slow client",
1250                                 1,
1251                                 GNUNET_NO);
1252       drop = ccc->head_recv;
1253       GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1254                                    ccc->tail_recv,
1255                                    drop);
1256       ccc->num_recv--;
1257       GNUNET_MQ_discard (drop->env);
1258       GNUNET_free (drop);
1259     }
1260   }
1261
1262   /* Insert message into sorted out-of-order queue */
1263   com = GNUNET_new (struct CadetOutOfOrderMessage);
1264   com->mid = msg->mid;
1265   com->env = env;
1266   duplicate = GNUNET_NO;
1267   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1268                                       is_before,
1269                                       &duplicate,
1270                                       ccc->head_recv,
1271                                       ccc->tail_recv,
1272                                       com);
1273   ccc->num_recv++;
1274   if (GNUNET_YES == duplicate)
1275   {
1276     /* Duplicate within the queue, drop also (this is not covered by
1277        the case above if "delta" >= 64, which could be the case if
1278        max_pending_messages is also >= 64 or if our client is unready
1279        and we are seeing retransmissions of the message our client is
1280        blocked on. */
1281     LOG (GNUNET_ERROR_TYPE_DEBUG,
1282          "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1283          (unsigned int) payload_size,
1284          GCCH_2s (ch),
1285          ntohl (msg->mid.mid));
1286     GNUNET_STATISTICS_update (stats,
1287                               "# duplicate DATA",
1288                               1,
1289                               GNUNET_NO);
1290     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1291                                  ccc->tail_recv,
1292                                  com);
1293     ccc->num_recv--;
1294     GNUNET_MQ_discard (com->env);
1295     GNUNET_free (com);
1296     send_channel_data_ack (ch);
1297     return;
1298   }
1299   LOG (GNUNET_ERROR_TYPE_DEBUG,
1300        "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1301        (GNUNET_YES == ccc->client_ready)
1302        ? "out-of-order"
1303        : "client-not-ready",
1304        (unsigned int) payload_size,
1305        GCCH_2s (ch),
1306        ntohl (ccc->ccn.channel_of_client),
1307        ccc,
1308        ntohl (msg->mid.mid),
1309        ntohl (ch->mid_recv.mid));
1310   /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
1311      the sender may already be transmitting the previous one.  Needs
1312      experimental evaluation to see if/when this ACK helps or
1313      hurts. (We might even want another option.) */
1314   send_channel_data_ack (ch);
1315 }
1316
1317
1318 /**
1319  * Function called once the tunnel has sent one of our messages.
1320  * If the message is unreliable, simply frees the `crm`. If the
1321  * message was reliable, calculate retransmission time and
1322  * wait for ACK (or retransmit).
1323  *
1324  * @param cls the `struct CadetReliableMessage` that was sent
1325  */
1326 static void
1327 data_sent_cb (void *cls);
1328
1329
1330 /**
1331  * We need to retry a transmission, the last one took too long to
1332  * be acknowledged.
1333  *
1334  * @param cls the `struct CadetChannel` where we need to retransmit
1335  */
1336 static void
1337 retry_transmission (void *cls)
1338 {
1339   struct CadetChannel *ch = cls;
1340   struct CadetReliableMessage *crm = ch->head_sent;
1341
1342   ch->retry_data_task = NULL;
1343   GNUNET_assert (NULL == crm->qe);
1344   LOG (GNUNET_ERROR_TYPE_DEBUG,
1345        "Retrying transmission on %s of message %u\n",
1346        GCCH_2s (ch),
1347        (unsigned int) ntohl (crm->data_message->mid.mid));
1348   crm->qe = GCT_send (ch->t,
1349                       &crm->data_message->header,
1350                       &data_sent_cb,
1351                       crm);
1352   GNUNET_assert (NULL == ch->retry_data_task);
1353 }
1354
1355
1356 /**
1357  * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1358  * the queue and tell our client that it can send more.
1359  *
1360  * @param ch the channel that got the PLAINTEXT_DATA_ACK
1361  * @param crm the message that got acknowledged
1362  */
1363 static void
1364 handle_matching_ack (struct CadetChannel *ch,
1365                      struct CadetReliableMessage *crm)
1366 {
1367   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1368                                ch->tail_sent,
1369                                crm);
1370   ch->pending_messages--;
1371   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1372   LOG (GNUNET_ERROR_TYPE_DEBUG,
1373        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1374        GCCH_2s (ch),
1375        (unsigned int) ntohl (crm->data_message->mid.mid),
1376        ch->pending_messages);
1377   if (NULL != crm->qe)
1378   {
1379     GCT_send_cancel (crm->qe);
1380     crm->qe = NULL;
1381   }
1382   GNUNET_free (crm->data_message);
1383   GNUNET_free (crm);
1384   send_ack_to_client (ch,
1385                       (NULL == ch->owner)
1386                       ? GNUNET_NO
1387                       : GNUNET_YES);
1388 }
1389
1390
1391 /**
1392  * We got an acknowledgement for payload data for a channel.
1393  * Possibly resume transmissions.
1394  *
1395  * @param ch channel that got the ack
1396  * @param ack details about what was received
1397  */
1398 void
1399 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1400                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1401 {
1402   struct CadetReliableMessage *crm;
1403   struct CadetReliableMessage *crmn;
1404   int found;
1405   uint32_t mid_base;
1406   uint64_t mid_mask;
1407   unsigned int delta;
1408
1409   GNUNET_break (GNUNET_NO == ch->is_loopback);
1410   if (GNUNET_NO == ch->reliable)
1411   {
1412     /* not expecting ACKs on unreliable channel, odd */
1413     GNUNET_break_op (0);
1414     return;
1415   }
1416   /* mid_base is the MID of the next message that the
1417      other peer expects (i.e. that is missing!), everything
1418      LOWER (but excluding mid_base itself) was received. */
1419   mid_base = ntohl (ack->mid.mid);
1420   mid_mask = GNUNET_htonll (ack->futures);
1421   found = GNUNET_NO;
1422   for (crm = ch->head_sent;
1423         NULL != crm;
1424        crm = crmn)
1425   {
1426     crmn = crm->next;
1427     delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
1428     if (delta >= UINT_MAX - ch->max_pending_messages)
1429     {
1430       /* overflow, means crm was a bit in the past, so this ACK counts for it. */
1431       LOG (GNUNET_ERROR_TYPE_DEBUG,
1432            "Got DATA_ACK with base %u satisfying past message %u on %s\n",
1433            (unsigned int) mid_base,
1434            ntohl (crm->data_message->mid.mid),
1435            GCCH_2s (ch));
1436       handle_matching_ack (ch,
1437                            crm);
1438       found = GNUNET_YES;
1439       continue;
1440     }
1441     delta--;
1442     if (delta >= 64)
1443       continue;
1444     LOG (GNUNET_ERROR_TYPE_DEBUG,
1445          "Testing bit %llX for mid %u (base: %u)\n",
1446          (1LLU << delta),
1447          ntohl (crm->data_message->mid.mid),
1448          mid_base);
1449     if (0 != (mid_mask & (1LLU << delta)))
1450     {
1451       LOG (GNUNET_ERROR_TYPE_DEBUG,
1452            "Got DATA_ACK with mask for %u on %s\n",
1453            ntohl (crm->data_message->mid.mid),
1454            GCCH_2s (ch));
1455       handle_matching_ack (ch,
1456                            crm);
1457       found = GNUNET_YES;
1458     }
1459   }
1460   if (GNUNET_NO == found)
1461   {
1462     /* ACK for message we already dropped, might have been a
1463        duplicate ACK? Ignore. */
1464     LOG (GNUNET_ERROR_TYPE_DEBUG,
1465          "Duplicate DATA_ACK on %s, ignoring\n",
1466          GCCH_2s (ch));
1467     GNUNET_STATISTICS_update (stats,
1468                               "# duplicate DATA_ACKs",
1469                               1,
1470                               GNUNET_NO);
1471     return;
1472   }
1473   if (NULL != ch->retry_data_task)
1474   {
1475     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1476     ch->retry_data_task = NULL;
1477   }
1478   if ( (NULL != ch->head_sent) &&
1479        (NULL == ch->head_sent->qe) )
1480     ch->retry_data_task
1481       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1482                                  &retry_transmission,
1483                                  ch);
1484 }
1485
1486
1487 /**
1488  * Destroy channel, based on the other peer closing the
1489  * connection.  Also needs to remove this channel from
1490  * the tunnel.
1491  *
1492  * @param ch channel to destroy
1493  */
1494 void
1495 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1496 {
1497   struct CadetChannelClient *ccc;
1498
1499   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1500   LOG (GNUNET_ERROR_TYPE_DEBUG,
1501        "Received remote channel DESTROY for %s\n",
1502        GCCH_2s (ch));
1503   if (GNUNET_YES == ch->destroy)
1504   {
1505     /* Local client already gone, this is instant-death. */
1506     channel_destroy (ch);
1507     return;
1508   }
1509   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1510   if (NULL != ccc->head_recv)
1511   {
1512     LOG (GNUNET_ERROR_TYPE_WARNING,
1513          "Lost end of transmission due to remote shutdown on %s\n",
1514          GCCH_2s (ch));
1515     /* FIXME: change API to notify client about truncated transmission! */
1516   }
1517   ch->destroy = GNUNET_YES;
1518   GSC_handle_remote_channel_destroy (ccc->c,
1519                                      ccc->ccn,
1520                                      ch);
1521   channel_destroy (ch);
1522 }
1523
1524
1525 /**
1526  * Test if element @a e1 comes before element @a e2.
1527  *
1528  * @param cls closure, to a flag where we indicate duplicate packets
1529  * @param crm1 an element of to sort
1530  * @param crm2 another element to sort
1531  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1532  */
1533 static int
1534 cmp_crm_by_next_retry (void *cls,
1535                        struct CadetReliableMessage *crm1,
1536                        struct CadetReliableMessage *crm2)
1537 {
1538   if (crm1->next_retry.abs_value_us <
1539       crm2->next_retry.abs_value_us)
1540     return GNUNET_YES;
1541   return GNUNET_NO;
1542 }
1543
1544
1545 /**
1546  * Function called once the tunnel has sent one of our messages.
1547  * If the message is unreliable, simply frees the `crm`. If the
1548  * message was reliable, calculate retransmission time and
1549  * wait for ACK (or retransmit).
1550  *
1551  * @param cls the `struct CadetReliableMessage` that was sent
1552  */
1553 static void
1554 data_sent_cb (void *cls)
1555 {
1556   struct CadetReliableMessage *crm = cls;
1557   struct CadetChannel *ch = crm->ch;
1558
1559   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1560   GNUNET_assert (NULL != crm->qe);
1561   crm->qe = NULL;
1562   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1563                                ch->tail_sent,
1564                                crm);
1565   if (GNUNET_NO == ch->reliable)
1566   {
1567     GNUNET_free (crm->data_message);
1568     GNUNET_free (crm);
1569     ch->pending_messages--;
1570     send_ack_to_client (ch,
1571                         (NULL == ch->owner)
1572                         ? GNUNET_NO
1573                         : GNUNET_YES);
1574     return;
1575   }
1576   if (0 == crm->retry_delay.rel_value_us)
1577     crm->retry_delay = ch->expected_delay;
1578   else
1579     crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1580   crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1581                                                MIN_RTT_DELAY);
1582   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1583
1584   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1585                                       cmp_crm_by_next_retry,
1586                                       NULL,
1587                                       ch->head_sent,
1588                                       ch->tail_sent,
1589                                       crm);
1590   LOG (GNUNET_ERROR_TYPE_DEBUG,
1591        "Message %u sent, next transmission on %s in %s\n",
1592        (unsigned int) ntohl (crm->data_message->mid.mid),
1593        GCCH_2s (ch),
1594        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1595                                                GNUNET_YES));
1596   if (NULL == ch->head_sent->qe)
1597   {
1598     if (NULL != ch->retry_data_task)
1599       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1600     ch->retry_data_task
1601       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1602                                  &retry_transmission,
1603                                  ch);
1604   }
1605 }
1606
1607
1608 /**
1609  * Handle data given by a client.
1610  *
1611  * Check whether the client is allowed to send in this tunnel, save if
1612  * channel is reliable and send an ACK to the client if there is still
1613  * buffer space in the tunnel.
1614  *
1615  * @param ch Channel.
1616  * @param sender_ccn ccn of the sender
1617  * @param buf payload to transmit.
1618  * @param buf_len number of bytes in @a buf
1619  * @return #GNUNET_OK if everything goes well,
1620  *         #GNUNET_SYSERR in case of an error.
1621  */
1622 int
1623 GCCH_handle_local_data (struct CadetChannel *ch,
1624                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1625                         const char *buf,
1626                         size_t buf_len)
1627 {
1628   struct CadetReliableMessage *crm;
1629
1630   if (ch->pending_messages > ch->max_pending_messages)
1631   {
1632     GNUNET_break (0);
1633     return GNUNET_SYSERR;
1634   }
1635   ch->pending_messages++;
1636
1637   if (GNUNET_YES == ch->is_loopback)
1638   {
1639     struct CadetChannelClient *receiver;
1640     struct GNUNET_MQ_Envelope *env;
1641     struct GNUNET_CADET_LocalData *ld;
1642     int to_owner;
1643
1644     env = GNUNET_MQ_msg_extra (ld,
1645                                buf_len,
1646                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1647     if (sender_ccn.channel_of_client ==
1648         ch->owner->ccn.channel_of_client)
1649     {
1650       receiver = ch->dest;
1651       to_owner = GNUNET_NO;
1652     }
1653     else
1654     {
1655       GNUNET_assert (sender_ccn.channel_of_client ==
1656                      ch->dest->ccn.channel_of_client);
1657       receiver = ch->owner;
1658       to_owner = GNUNET_YES;
1659     }
1660     ld->ccn = receiver->ccn;
1661     GNUNET_memcpy (&ld[1],
1662                    buf,
1663                    buf_len);
1664     if (GNUNET_YES == receiver->client_ready)
1665     {
1666       GSC_send_to_client (receiver->c,
1667                           env);
1668       send_ack_to_client (ch,
1669                           to_owner);
1670     }
1671     else
1672     {
1673       struct CadetOutOfOrderMessage *oom;
1674
1675       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1676       oom->env = env;
1677       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1678                                         receiver->tail_recv,
1679                                         oom);
1680       receiver->num_recv++;
1681     }
1682     return GNUNET_OK;
1683   }
1684
1685   /* Everything is correct, send the message. */
1686   crm = GNUNET_malloc (sizeof (*crm));
1687   crm->ch = ch;
1688   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1689                                      + buf_len);
1690   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1691   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1692   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1693   crm->data_message->mid = ch->mid_send;
1694   crm->data_message->ctn = ch->ctn;
1695   GNUNET_memcpy (&crm->data_message[1],
1696                  buf,
1697                  buf_len);
1698   GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1699                                     ch->tail_sent,
1700                                     crm);
1701   LOG (GNUNET_ERROR_TYPE_DEBUG,
1702        "Sending message %u from local client to %s with %u bytes\n",
1703        ntohl (crm->data_message->mid.mid),
1704        GCCH_2s (ch),
1705        buf_len);
1706   if (NULL != ch->retry_data_task)
1707   {
1708     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1709     ch->retry_data_task = NULL;
1710   }
1711   crm->qe = GCT_send (ch->t,
1712                       &crm->data_message->header,
1713                       &data_sent_cb,
1714                       crm);
1715   GNUNET_assert (NULL == ch->retry_data_task);
1716   return GNUNET_OK;
1717 }
1718
1719
1720 /**
1721  * Handle ACK from client on local channel.  Means the client is ready
1722  * for more data, see if we have any for it.
1723  *
1724  * @param ch channel to destroy
1725  * @param client_ccn ccn of the client sending the ack
1726  */
1727 void
1728 GCCH_handle_local_ack (struct CadetChannel *ch,
1729                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1730 {
1731   struct CadetChannelClient *ccc;
1732   struct CadetOutOfOrderMessage *com;
1733
1734   if ( (NULL != ch->owner) &&
1735        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1736     ccc = ch->owner;
1737   else if ( (NULL != ch->dest) &&
1738             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1739     ccc = ch->dest;
1740   else
1741     GNUNET_assert (0);
1742   ccc->client_ready = GNUNET_YES;
1743   com = ccc->head_recv;
1744   if (NULL == com)
1745   {
1746     LOG (GNUNET_ERROR_TYPE_DEBUG,
1747          "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1748          GSC_2s (ccc->c),
1749          ntohl (client_ccn.channel_of_client),
1750          GCCH_2s (ch),
1751          ntohl (ccc->ccn.channel_of_client),
1752          ccc);
1753     return; /* none pending */
1754   }
1755   if (GNUNET_YES == ch->is_loopback)
1756   {
1757     int to_owner;
1758
1759     /* Messages are always in-order, just send */
1760     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1761                                  ccc->tail_recv,
1762                                  com);
1763     ccc->num_recv--;
1764     GSC_send_to_client (ccc->c,
1765                         com->env);
1766     /* Notify sender that we can receive more */
1767     if (ccc->ccn.channel_of_client ==
1768         ch->owner->ccn.channel_of_client)
1769     {
1770       to_owner = GNUNET_NO;
1771     }
1772     else
1773     {
1774       GNUNET_assert (ccc->ccn.channel_of_client ==
1775                      ch->dest->ccn.channel_of_client);
1776       to_owner = GNUNET_YES;
1777     }
1778     send_ack_to_client (ch,
1779                         to_owner);
1780     GNUNET_free (com);
1781     return;
1782   }
1783
1784   if ( (com->mid.mid != ch->mid_recv.mid) &&
1785        (GNUNET_NO == ch->out_of_order) &&
1786        (GNUNET_YES == ch->reliable) )
1787   {
1788     LOG (GNUNET_ERROR_TYPE_DEBUG,
1789          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1790          GSC_2s (ccc->c),
1791          ntohl (ccc->ccn.channel_of_client),
1792          ntohl (com->mid.mid),
1793          ntohl (ch->mid_recv.mid));
1794     return; /* missing next one in-order */
1795   }
1796
1797   LOG (GNUNET_ERROR_TYPE_DEBUG,
1798        "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
1799        ntohl (com->mid.mid),
1800        GSC_2s (ccc->c),
1801        ntohl (ccc->ccn.channel_of_client),
1802        GCCH_2s (ch));
1803
1804   /* all good, pass next message to client */
1805   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1806                                ccc->tail_recv,
1807                                com);
1808   ccc->num_recv--;
1809   /* FIXME: if unreliable, this is not aggressive
1810      enough, as it would be OK to have lost some! */
1811
1812   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1813   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1814   ccc->client_ready = GNUNET_NO;
1815   GSC_send_to_client (ccc->c,
1816                       com->env);
1817   GNUNET_free (com);
1818   send_channel_data_ack (ch);
1819   if (NULL != ccc->head_recv)
1820     return;
1821   if (GNUNET_NO == ch->destroy)
1822     return;
1823   GCT_send_channel_destroy (ch->t,
1824                             ch->ctn);
1825   channel_destroy (ch);
1826 }
1827
1828
1829 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1830
1831
1832 /**
1833  * Log channel info.
1834  *
1835  * @param ch Channel.
1836  * @param level Debug level to use.
1837  */
1838 void
1839 GCCH_debug (struct CadetChannel *ch,
1840             enum GNUNET_ErrorType level)
1841 {
1842   int do_log;
1843
1844   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1845                                        "cadet-chn",
1846                                        __FILE__, __FUNCTION__, __LINE__);
1847   if (0 == do_log)
1848     return;
1849
1850   if (NULL == ch)
1851   {
1852     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1853     return;
1854   }
1855   LOG2 (level,
1856         "CHN %s:%X (%p)\n",
1857         GCT_2s (ch->t),
1858         ch->ctn,
1859         ch);
1860   if (NULL != ch->owner)
1861   {
1862     LOG2 (level,
1863           "CHN origin %s ready %s local-id: %u\n",
1864           GSC_2s (ch->owner->c),
1865           ch->owner->client_ready ? "YES" : "NO",
1866           ntohl (ch->owner->ccn.channel_of_client));
1867   }
1868   if (NULL != ch->dest)
1869   {
1870     LOG2 (level,
1871           "CHN destination %s ready %s local-id: %u\n",
1872           GSC_2s (ch->dest->c),
1873           ch->dest->client_ready ? "YES" : "NO",
1874           ntohl (ch->dest->ccn.channel_of_client));
1875   }
1876   LOG2 (level,
1877         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1878         ntohl (ch->mid_recv.mid),
1879         (unsigned long long) ch->mid_futures,
1880         ntohl (ch->mid_send.mid));
1881 }
1882
1883
1884
1885 /* end of gnunet-service-cadet-new_channel.c */