handle ACKs past client down
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - Optimize ACKs by using 'mid_futures' properly!
29  * - introduce shutdown so we can have half-closed channels, modify
30  *   destroy to include MID to have FIN-ACK equivalents, etc.
31  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
32  * - check that '0xFFULL' really is sufficient for flow control!
33  * - revisit handling of 'unreliable' traffic!
34  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
35  * - figure out flow control without ACKs (unreliable traffic!)
36  */
37 #include "platform.h"
38 #include "gnunet_util_lib.h"
39 #include "cadet.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-cadet-new.h"
42 #include "gnunet-service-cadet-new_channel.h"
43 #include "gnunet-service-cadet-new_connection.h"
44 #include "gnunet-service-cadet-new_tunnels.h"
45 #include "gnunet-service-cadet-new_peer.h"
46 #include "gnunet-service-cadet-new_paths.h"
47
48 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
49
50 /**
51  * How long do we initially wait before retransmitting?
52  */
53 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
54
55 /**
56  * How long do we wait before dropping state about incoming
57  * connection to closed port?
58  */
59 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
60
61 /**
62  * How long do we wait at least before retransmitting ever?
63  */
64 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
65
66 /**
67  * Maximum message ID into the future we accept for out-of-order messages.
68  * If the message is more than this into the future, we drop it.  This is
69  * important both to detect values that are actually in the past, as well
70  * as to limit adversarially triggerable memory consumption.
71  *
72  * Note that right now we have "max_pending_messages = 4" hard-coded in
73  * the logic below, so a value of 4 would suffice here. But we plan to
74  * allow larger windows in the future...
75  */
76 #define MAX_OUT_OF_ORDER_DISTANCE 1024
77
78
79 /**
80  * All the states a connection can be in.
81  */
82 enum CadetChannelState
83 {
84   /**
85    * Uninitialized status, should never appear in operation.
86    */
87   CADET_CHANNEL_NEW,
88
89   /**
90    * Connection create message sent, waiting for ACK.
91    */
92   CADET_CHANNEL_OPEN_SENT,
93
94   /**
95    * Connection confirmed, ready to carry traffic.
96    */
97   CADET_CHANNEL_READY
98 };
99
100
101 /**
102  * Info needed to retry a message in case it gets lost.
103  * Note that we DO use this structure also for unreliable
104  * messages.
105  */
106 struct CadetReliableMessage
107 {
108   /**
109    * Double linked list, FIFO style
110    */
111   struct CadetReliableMessage *next;
112
113   /**
114    * Double linked list, FIFO style
115    */
116   struct CadetReliableMessage *prev;
117
118   /**
119    * Which channel is this message in?
120    */
121   struct CadetChannel *ch;
122
123   /**
124    * Entry in the tunnels queue for this message, NULL if it has left
125    * the tunnel.  Used to cancel transmission in case we receive an
126    * ACK in time.
127    */
128   struct CadetTunnelQueueEntry *qe;
129
130   /**
131    * How soon should we retry if we fail to get an ACK?
132    * Messages in the queue are sorted by this value.
133    */
134   struct GNUNET_TIME_Absolute next_retry;
135
136   /**
137    * How long do we wait for an ACK after transmission?
138    * Use for the back-off calculation.
139    */
140   struct GNUNET_TIME_Relative retry_delay;
141
142   /**
143    * Data message we are trying to send.
144    */
145   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
146
147 };
148
149
150 /**
151  * List of received out-of-order data messages.
152  */
153 struct CadetOutOfOrderMessage
154 {
155   /**
156    * Double linked list, FIFO style
157    */
158   struct CadetOutOfOrderMessage *next;
159
160   /**
161    * Double linked list, FIFO style
162    */
163   struct CadetOutOfOrderMessage *prev;
164
165   /**
166    * ID of the message (messages up to this point needed
167    * before we give this one to the client).
168    */
169   struct ChannelMessageIdentifier mid;
170
171   /**
172    * The envelope with the payload of the out-of-order message
173    */
174   struct GNUNET_MQ_Envelope *env;
175
176 };
177
178
179 /**
180  * Client endpoint of a `struct CadetChannel`.  A channel may be a
181  * loopback channel, in which case it has two of these endpoints.
182  * Note that flow control also is required in both directions.
183  */
184 struct CadetChannelClient
185 {
186   /**
187    * Client handle.  Not by itself sufficient to designate
188    * the client endpoint, as the same client handle may
189    * be used for both the owner and the destination, and
190    * we thus also need the channel ID to identify the client.
191    */
192   struct CadetClient *c;
193
194   /**
195    * Head of DLL of messages received out of order or while client was unready.
196    */
197   struct CadetOutOfOrderMessage *head_recv;
198
199   /**
200    * Tail DLL of messages received out of order or while client was unready.
201    */
202   struct CadetOutOfOrderMessage *tail_recv;
203
204   /**
205    * Local tunnel number for this client.
206    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
207    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
208    */
209   struct GNUNET_CADET_ClientChannelNumber ccn;
210
211   /**
212    * Can we send data to the client?
213    */
214   int client_ready;
215
216 };
217
218
219 /**
220  * Struct containing all information regarding a channel to a remote client.
221  */
222 struct CadetChannel
223 {
224   /**
225    * Tunnel this channel is in.
226    */
227   struct CadetTunnel *t;
228
229   /**
230    * Client owner of the tunnel, if any.
231    * (Used if this channel represends the initiating end of the tunnel.)
232    */
233   struct CadetChannelClient *owner;
234
235   /**
236    * Client destination of the tunnel, if any.
237    * (Used if this channel represents the listening end of the tunnel.)
238    */
239   struct CadetChannelClient *dest;
240
241   /**
242    * Last entry in the tunnel's queue relating to control messages
243    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
244    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
245    * transmission in case we receive updated information.
246    */
247   struct CadetTunnelQueueEntry *last_control_qe;
248
249   /**
250    * Head of DLL of messages sent and not yet ACK'd.
251    */
252   struct CadetReliableMessage *head_sent;
253
254   /**
255    * Tail of DLL of messages sent and not yet ACK'd.
256    */
257   struct CadetReliableMessage *tail_sent;
258
259   /**
260    * Task to resend/poll in case no ACK is received.
261    */
262   struct GNUNET_SCHEDULER_Task *retry_control_task;
263
264   /**
265    * Task to resend/poll in case no ACK is received.
266    */
267   struct GNUNET_SCHEDULER_Task *retry_data_task;
268
269   /**
270    * Last time the channel was used
271    */
272   struct GNUNET_TIME_Absolute timestamp;
273
274   /**
275    * Destination port of the channel.
276    */
277   struct GNUNET_HashCode port;
278
279   /**
280    * Counter for exponential backoff.
281    */
282   struct GNUNET_TIME_Relative retry_time;
283
284   /**
285    * How long does it usually take to get an ACK.
286    */
287   struct GNUNET_TIME_Relative expected_delay;
288
289   /**
290    * Bitfield of already-received messages past @e mid_recv.
291    *
292    * FIXME: not yet properly used (bits here are never set!)
293    */
294   uint64_t mid_futures;
295
296   /**
297    * Next MID expected for incoming traffic.
298    */
299   struct ChannelMessageIdentifier mid_recv;
300
301   /**
302    * Next MID to use for outgoing traffic.
303    */
304   struct ChannelMessageIdentifier mid_send;
305
306   /**
307    * Total (reliable) messages pending ACK for this channel.
308    */
309   unsigned int pending_messages;
310
311   /**
312    * Maximum (reliable) messages pending ACK for this channel
313    * before we throttle the client.
314    */
315   unsigned int max_pending_messages;
316
317   /**
318    * Number identifying this channel in its tunnel.
319    */
320   struct GNUNET_CADET_ChannelTunnelNumber ctn;
321
322   /**
323    * Channel state.
324    */
325   enum CadetChannelState state;
326
327   /**
328    * Is the tunnel bufferless (minimum latency)?
329    */
330   int nobuffer;
331
332   /**
333    * Is the tunnel reliable?
334    */
335   int reliable;
336
337   /**
338    * Is the tunnel out-of-order?
339    */
340   int out_of_order;
341
342   /**
343    * Is this channel a loopback channel, where the destination is us again?
344    */
345   int is_loopback;
346
347   /**
348    * Flag to signal the destruction of the channel.  If this is set to
349    * #GNUNET_YES the channel will be destroyed once the queue is
350    * empty.
351    */
352   int destroy;
353
354 };
355
356
357 /**
358  * Get the static string for identification of the channel.
359  *
360  * @param ch Channel.
361  *
362  * @return Static string with the channel IDs.
363  */
364 const char *
365 GCCH_2s (const struct CadetChannel *ch)
366 {
367   static char buf[128];
368
369   GNUNET_snprintf (buf,
370                    sizeof (buf),
371                    "Channel %s:%s ctn:%X(%X/%X)",
372                    (GNUNET_YES == ch->is_loopback)
373                    ? "loopback"
374                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
375                    GNUNET_h2s (&ch->port),
376                    ch->ctn,
377                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
378                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
379   return buf;
380 }
381
382
383 /**
384  * Get the channel's public ID.
385  *
386  * @param ch Channel.
387  *
388  * @return ID used to identify the channel with the remote peer.
389  */
390 struct GNUNET_CADET_ChannelTunnelNumber
391 GCCH_get_id (const struct CadetChannel *ch)
392 {
393   return ch->ctn;
394 }
395
396
397 /**
398  * Release memory associated with @a ccc
399  *
400  * @param ccc data structure to clean up
401  */
402 static void
403 free_channel_client (struct CadetChannelClient *ccc)
404 {
405   struct CadetOutOfOrderMessage *com;
406
407   while (NULL != (com = ccc->head_recv))
408   {
409     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
410                                  ccc->tail_recv,
411                                  com);
412     GNUNET_MQ_discard (com->env);
413     GNUNET_free (com);
414   }
415   GNUNET_free (ccc);
416 }
417
418
419 /**
420  * Destroy the given channel.
421  *
422  * @param ch channel to destroy
423  */
424 static void
425 channel_destroy (struct CadetChannel *ch)
426 {
427   struct CadetReliableMessage *crm;
428
429   while (NULL != (crm = ch->head_sent))
430   {
431     GNUNET_assert (ch == crm->ch);
432     if (NULL != crm->qe)
433     {
434       GCT_send_cancel (crm->qe);
435       crm->qe = NULL;
436     }
437     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
438                                  ch->tail_sent,
439                                  crm);
440     GNUNET_free (crm->data_message);
441     GNUNET_free (crm);
442   }
443   if (NULL != ch->owner)
444   {
445     free_channel_client (ch->owner);
446     ch->owner = NULL;
447   }
448   if (NULL != ch->dest)
449   {
450     free_channel_client (ch->dest);
451     ch->dest = NULL;
452   }
453   if (NULL != ch->last_control_qe)
454   {
455     GCT_send_cancel (ch->last_control_qe);
456     ch->last_control_qe = NULL;
457   }
458   if (NULL != ch->retry_data_task)
459   {
460     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
461     ch->retry_data_task = NULL;
462   }
463   if (NULL != ch->retry_control_task)
464   {
465     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
466     ch->retry_control_task = NULL;
467   }
468   if (GNUNET_NO == ch->is_loopback)
469   {
470     GCT_remove_channel (ch->t,
471                         ch,
472                         ch->ctn);
473     ch->t = NULL;
474   }
475   GNUNET_free (ch);
476 }
477
478
479 /**
480  * Send a channel create message.
481  *
482  * @param cls Channel for which to send.
483  */
484 static void
485 send_channel_open (void *cls);
486
487
488 /**
489  * Function called once the tunnel confirms that we sent the
490  * create message.  Delays for a bit until we retry.
491  *
492  * @param cls our `struct CadetChannel`.
493  */
494 static void
495 channel_open_sent_cb (void *cls)
496 {
497   struct CadetChannel *ch = cls;
498
499   GNUNET_assert (NULL != ch->last_control_qe);
500   ch->last_control_qe = NULL;
501   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
502   LOG (GNUNET_ERROR_TYPE_DEBUG,
503        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
504        GCCH_2s (ch),
505        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
506                                                GNUNET_YES));
507   ch->retry_control_task
508     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
509                                     &send_channel_open,
510                                     ch);
511 }
512
513
514 /**
515  * Send a channel open message.
516  *
517  * @param cls Channel for which to send.
518  */
519 static void
520 send_channel_open (void *cls)
521 {
522   struct CadetChannel *ch = cls;
523   struct GNUNET_CADET_ChannelOpenMessage msgcc;
524   uint32_t options;
525
526   ch->retry_control_task = NULL;
527   LOG (GNUNET_ERROR_TYPE_DEBUG,
528        "Sending CHANNEL_OPEN message for %s\n",
529        GCCH_2s (ch));
530   options = 0;
531   if (ch->nobuffer)
532     options |= GNUNET_CADET_OPTION_NOBUFFER;
533   if (ch->reliable)
534     options |= GNUNET_CADET_OPTION_RELIABLE;
535   if (ch->out_of_order)
536     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
537   msgcc.header.size = htons (sizeof (msgcc));
538   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
539   msgcc.opt = htonl (options);
540   msgcc.port = ch->port;
541   msgcc.ctn = ch->ctn;
542   ch->state = CADET_CHANNEL_OPEN_SENT;
543   ch->last_control_qe = GCT_send (ch->t,
544                                   &msgcc.header,
545                                   &channel_open_sent_cb,
546                                   ch);
547   GNUNET_assert (NULL == ch->retry_control_task);
548 }
549
550
551 /**
552  * Function called once and only once after a channel was bound
553  * to its tunnel via #GCT_add_channel() is ready for transmission.
554  * Note that this is only the case for channels that this peer
555  * initiates, as for incoming channels we assume that they are
556  * ready for transmission immediately upon receiving the open
557  * message.  Used to bootstrap the #GCT_send() process.
558  *
559  * @param ch the channel for which the tunnel is now ready
560  */
561 void
562 GCCH_tunnel_up (struct CadetChannel *ch)
563 {
564   GNUNET_assert (NULL == ch->retry_control_task);
565   LOG (GNUNET_ERROR_TYPE_DEBUG,
566        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
567        GCCH_2s (ch));
568   ch->retry_control_task
569     = GNUNET_SCHEDULER_add_now (&send_channel_open,
570                                 ch);
571 }
572
573
574 /**
575  * Create a new channel.
576  *
577  * @param owner local client owning the channel
578  * @param ccn local number of this channel at the @a owner
579  * @param destination peer to which we should build the channel
580  * @param port desired port at @a destination
581  * @param options options for the channel
582  * @return handle to the new channel
583  */
584 struct CadetChannel *
585 GCCH_channel_local_new (struct CadetClient *owner,
586                         struct GNUNET_CADET_ClientChannelNumber ccn,
587                         struct CadetPeer *destination,
588                         const struct GNUNET_HashCode *port,
589                         uint32_t options)
590 {
591   struct CadetChannel *ch;
592   struct CadetChannelClient *ccco;
593
594   ccco = GNUNET_new (struct CadetChannelClient);
595   ccco->c = owner;
596   ccco->ccn = ccn;
597   ccco->client_ready = GNUNET_YES;
598
599   ch = GNUNET_new (struct CadetChannel);
600   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
601   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
602   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
603   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
604   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
605   ch->owner = ccco;
606   ch->port = *port;
607   if (0 == memcmp (&my_full_id,
608                    GCP_get_id (destination),
609                    sizeof (struct GNUNET_PeerIdentity)))
610   {
611     struct CadetClient *c;
612
613     ch->is_loopback = GNUNET_YES;
614     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
615                                            port);
616     if (NULL == c)
617     {
618       /* port closed, wait for it to possibly open */
619       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
620                                                 port,
621                                                 ch,
622                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
623       LOG (GNUNET_ERROR_TYPE_DEBUG,
624            "Created loose incoming loopback channel to port %s\n",
625            GNUNET_h2s (&ch->port));
626     }
627     else
628     {
629       ch->dest = GNUNET_new (struct CadetChannelClient);
630       ch->dest->c = c;
631       ch->dest->client_ready = GNUNET_YES;
632       GCCH_bind (ch,
633                  ch->dest->c);
634     }
635   }
636   else
637   {
638     ch->t = GCP_get_tunnel (destination,
639                             GNUNET_YES);
640     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
641     ch->ctn = GCT_add_channel (ch->t,
642                                ch);
643   }
644   GNUNET_STATISTICS_update (stats,
645                             "# channels",
646                             1,
647                             GNUNET_NO);
648   LOG (GNUNET_ERROR_TYPE_DEBUG,
649        "Created channel to port %s at peer %s for %s using %s\n",
650        GNUNET_h2s (port),
651        GCP_2s (destination),
652        GSC_2s (owner),
653        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
654   return ch;
655 }
656
657
658 /**
659  * We had an incoming channel to a port that is closed.
660  * It has not been opened for a while, drop it.
661  *
662  * @param cls the channel to drop
663  */
664 static void
665 timeout_closed_cb (void *cls)
666 {
667   struct CadetChannel *ch = cls;
668
669   ch->retry_control_task = NULL;
670   LOG (GNUNET_ERROR_TYPE_DEBUG,
671        "Closing incoming channel to port %s from peer %s due to timeout\n",
672        GNUNET_h2s (&ch->port),
673        GCP_2s (GCT_get_destination (ch->t)));
674   channel_destroy (ch);
675 }
676
677
678 /**
679  * Create a new channel based on a request coming in over the network.
680  *
681  * @param t tunnel to the remote peer
682  * @param ctn identifier of this channel in the tunnel
683  * @param port desired local port
684  * @param options options for the channel
685  * @return handle to the new channel
686  */
687 struct CadetChannel *
688 GCCH_channel_incoming_new (struct CadetTunnel *t,
689                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
690                            const struct GNUNET_HashCode *port,
691                            uint32_t options)
692 {
693   struct CadetChannel *ch;
694   struct CadetClient *c;
695
696   ch = GNUNET_new (struct CadetChannel);
697   ch->port = *port;
698   ch->t = t;
699   ch->ctn = ctn;
700   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
701   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
702   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
703   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
704   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
705   GNUNET_STATISTICS_update (stats,
706                             "# channels",
707                             1,
708                             GNUNET_NO);
709
710   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
711                                          port);
712   if (NULL == c)
713   {
714     /* port closed, wait for it to possibly open */
715     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
716                                               port,
717                                               ch,
718                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
719     ch->retry_control_task
720       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
721                                       &timeout_closed_cb,
722                                       ch);
723     LOG (GNUNET_ERROR_TYPE_DEBUG,
724          "Created loose incoming channel to port %s from peer %s\n",
725          GNUNET_h2s (&ch->port),
726          GCP_2s (GCT_get_destination (ch->t)));
727   }
728   else
729   {
730     GCCH_bind (ch,
731                c);
732   }
733   GNUNET_STATISTICS_update (stats,
734                             "# channels",
735                             1,
736                             GNUNET_NO);
737   return ch;
738 }
739
740
741 /**
742  * Function called once the tunnel confirms that we sent the
743  * ACK message.  Just remembers it was sent, we do not expect
744  * ACKs for ACKs ;-).
745  *
746  * @param cls our `struct CadetChannel`.
747  */
748 static void
749 send_ack_cb (void *cls)
750 {
751   struct CadetChannel *ch = cls;
752
753   GNUNET_assert (NULL != ch->last_control_qe);
754   ch->last_control_qe = NULL;
755 }
756
757
758 /**
759  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
760  *
761  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
762  */
763 static void
764 send_channel_data_ack (struct CadetChannel *ch)
765 {
766   struct GNUNET_CADET_ChannelDataAckMessage msg;
767
768   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
769   msg.header.size = htons (sizeof (msg));
770   msg.ctn = ch->ctn;
771   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
772   msg.futures = GNUNET_htonll (ch->mid_futures);
773   if (NULL != ch->last_control_qe)
774     GCT_send_cancel (ch->last_control_qe);
775   LOG (GNUNET_ERROR_TYPE_DEBUG,
776        "Sending DATA_ACK %u:%llX via %s\n",
777        (unsigned int) ntohl (msg.mid.mid),
778        (unsigned long long) ch->mid_futures,
779        GCCH_2s (ch));
780   ch->last_control_qe = GCT_send (ch->t,
781                                   &msg.header,
782                                   &send_ack_cb,
783                                   ch);
784 }
785
786
787 /**
788  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
789  * connection is up.
790  *
791  * @param cls the `struct CadetChannel`
792  */
793 static void
794 send_open_ack (void *cls)
795 {
796   struct CadetChannel *ch = cls;
797   struct GNUNET_CADET_ChannelManageMessage msg;
798
799   LOG (GNUNET_ERROR_TYPE_DEBUG,
800        "Sending CHANNEL_OPEN_ACK on %s\n",
801        GCCH_2s (ch));
802   ch->retry_control_task = NULL;
803   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
804   msg.header.size = htons (sizeof (msg));
805   msg.reserved = htonl (0);
806   msg.ctn = ch->ctn;
807   if (NULL != ch->last_control_qe)
808     GCT_send_cancel (ch->last_control_qe);
809   ch->last_control_qe = GCT_send (ch->t,
810                                   &msg.header,
811                                   &send_ack_cb,
812                                   ch);
813 }
814
815
816 /**
817  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
818  * this channel.  If the binding was successful, (re)transmit the
819  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
820  *
821  * @param ch channel that got the duplicate open
822  */
823 void
824 GCCH_handle_duplicate_open (struct CadetChannel *ch)
825 {
826   if (NULL == ch->dest)
827   {
828     LOG (GNUNET_ERROR_TYPE_DEBUG,
829          "Ignoring duplicate channel OPEN on %s: port is closed\n",
830          GCCH_2s (ch));
831     return;
832   }
833   if (NULL != ch->retry_control_task)
834   {
835     LOG (GNUNET_ERROR_TYPE_DEBUG,
836          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
837          GCCH_2s (ch));
838     return;
839   }
840   LOG (GNUNET_ERROR_TYPE_DEBUG,
841        "Retransmitting OPEN_ACK on %s\n",
842        GCCH_2s (ch));
843   ch->retry_control_task
844     = GNUNET_SCHEDULER_add_now (&send_open_ack,
845                                 ch);
846 }
847
848
849 /**
850  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
851  *
852  * @param ch channel the ack is for
853  * @param to_owner #GNUNET_YES to send to owner,
854  *                 #GNUNET_NO to send to dest
855  */
856 static void
857 send_ack_to_client (struct CadetChannel *ch,
858                     int to_owner)
859 {
860   struct GNUNET_MQ_Envelope *env;
861   struct GNUNET_CADET_LocalAck *ack;
862   struct CadetChannelClient *ccc;
863
864   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
865   if (NULL == ccc)
866   {
867     /* This can happen if we are just getting ACKs after
868        our local client already disconnected. */
869     GNUNET_assert (GNUNET_YES == ch->destroy);
870     return;
871   }
872   env = GNUNET_MQ_msg (ack,
873                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
874   ack->ccn = ccc->ccn;
875   LOG (GNUNET_ERROR_TYPE_DEBUG,
876        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
877        GSC_2s (ccc->c),
878        (GNUNET_YES == to_owner) ? "owner" : "dest",
879        ntohl (ack->ccn.channel_of_client),
880        ch->pending_messages,
881        ch->max_pending_messages);
882   GSC_send_to_client (ccc->c,
883                       env);
884 }
885
886
887 /**
888  * A client is bound to the port that we have a channel
889  * open to.  Send the acknowledgement for the connection
890  * request and establish the link with the client.
891  *
892  * @param ch open incoming channel
893  * @param c client listening on the respective port
894  */
895 void
896 GCCH_bind (struct CadetChannel *ch,
897            struct CadetClient *c)
898 {
899   uint32_t options;
900   struct CadetChannelClient *cccd;
901
902   LOG (GNUNET_ERROR_TYPE_DEBUG,
903        "Binding %s from %s to port %s of %s\n",
904        GCCH_2s (ch),
905        GCT_2s (ch->t),
906        GNUNET_h2s (&ch->port),
907        GSC_2s (c));
908   if (NULL != ch->retry_control_task)
909   {
910     /* there might be a timeout task here */
911     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
912     ch->retry_control_task = NULL;
913   }
914   options = 0;
915   if (ch->nobuffer)
916     options |= GNUNET_CADET_OPTION_NOBUFFER;
917   if (ch->reliable)
918     options |= GNUNET_CADET_OPTION_RELIABLE;
919   if (ch->out_of_order)
920     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
921   cccd = GNUNET_new (struct CadetChannelClient);
922   ch->dest = cccd;
923   cccd->c = c;
924   cccd->client_ready = GNUNET_YES;
925   cccd->ccn = GSC_bind (c,
926                         ch,
927                         (GNUNET_YES == ch->is_loopback)
928                         ? GCP_get (&my_full_id,
929                                    GNUNET_YES)
930                         : GCT_get_destination (ch->t),
931                         &ch->port,
932                         options);
933   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
934                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
935   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
936   if (GNUNET_YES == ch->is_loopback)
937   {
938     ch->state = CADET_CHANNEL_OPEN_SENT;
939     GCCH_handle_channel_open_ack (ch);
940   }
941   else
942   {
943     /* notify other peer that we accepted the connection */
944     ch->retry_control_task
945       = GNUNET_SCHEDULER_add_now (&send_open_ack,
946                                   ch);
947   }
948   /* give client it's initial supply of ACKs */
949   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
950                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
951   for (unsigned int i=0;i<ch->max_pending_messages;i++)
952     send_ack_to_client (ch,
953                         GNUNET_NO);
954 }
955
956
957 /**
958  * Destroy locally created channel.  Called by the local client, so no
959  * need to tell the client.
960  *
961  * @param ch channel to destroy
962  * @param c client that caused the destruction
963  * @param ccn client number of the client @a c
964  */
965 void
966 GCCH_channel_local_destroy (struct CadetChannel *ch,
967                             struct CadetClient *c,
968                             struct GNUNET_CADET_ClientChannelNumber ccn)
969 {
970   LOG (GNUNET_ERROR_TYPE_DEBUG,
971        "%s asks for destruction of %s\n",
972        GSC_2s (c),
973        GCCH_2s (ch));
974   GNUNET_assert (NULL != c);
975   if ( (NULL != ch->owner) &&
976        (c == ch->owner->c) &&
977        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
978   {
979     free_channel_client (ch->owner);
980     ch->owner = NULL;
981   }
982   else if ( (NULL != ch->dest) &&
983             (c == ch->dest->c) &&
984             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
985   {
986     free_channel_client (ch->dest);
987     ch->dest = NULL;
988   }
989   else
990   {
991     GNUNET_assert (0);
992   }
993
994   if (GNUNET_YES == ch->destroy)
995   {
996     /* other end already destroyed, with the local client gone, no need
997        to finish transmissions, just destroy immediately. */
998     channel_destroy (ch);
999     return;
1000   }
1001   if ( (NULL != ch->head_sent) ||
1002        (NULL != ch->owner) ||
1003        (NULL != ch->dest) )
1004   {
1005     /* Wait for other end to destroy us as well,
1006        and otherwise allow send queue to be transmitted first */
1007     ch->destroy = GNUNET_YES;
1008     return;
1009   }
1010   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1011   if (CADET_CHANNEL_NEW != ch->state)
1012     GCT_send_channel_destroy (ch->t,
1013                               ch->ctn);
1014   /* Nothing left to do, just finish destruction */
1015   channel_destroy (ch);
1016 }
1017
1018
1019 /**
1020  * We got an acknowledgement for the creation of the channel
1021  * (the port is open on the other side). Begin transmissions.
1022  *
1023  * @param ch channel to destroy
1024  */
1025 void
1026 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
1027 {
1028   switch (ch->state)
1029   {
1030   case CADET_CHANNEL_NEW:
1031     /* this should be impossible */
1032     GNUNET_break (0);
1033     break;
1034   case CADET_CHANNEL_OPEN_SENT:
1035     if (NULL == ch->owner)
1036     {
1037       /* We're not the owner, wrong direction! */
1038       GNUNET_break_op (0);
1039       return;
1040     }
1041     LOG (GNUNET_ERROR_TYPE_DEBUG,
1042          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1043          GCCH_2s (ch));
1044     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1045     {
1046       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1047       ch->retry_control_task = NULL;
1048     }
1049     ch->state = CADET_CHANNEL_READY;
1050     /* On first connect, send client as many ACKs as we allow messages
1051        to be buffered! */
1052     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1053       send_ack_to_client (ch,
1054                           GNUNET_YES);
1055     break;
1056   case CADET_CHANNEL_READY:
1057     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1058     LOG (GNUNET_ERROR_TYPE_DEBUG,
1059          "Received duplicate channel OPEN_ACK for %s\n",
1060          GCCH_2s (ch));
1061     GNUNET_STATISTICS_update (stats,
1062                               "# duplicate CREATE_ACKs",
1063                               1,
1064                               GNUNET_NO);
1065     break;
1066   }
1067 }
1068
1069
1070 /**
1071  * Test if element @a e1 comes before element @a e2.
1072  *
1073  * @param cls closure, to a flag where we indicate duplicate packets
1074  * @param e1 an element of to sort
1075  * @param e2 another element to sort
1076  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1077  */
1078 static int
1079 is_before (void *cls,
1080            struct CadetOutOfOrderMessage *m1,
1081            struct CadetOutOfOrderMessage *m2)
1082 {
1083   int *duplicate = cls;
1084   uint32_t v1 = ntohl (m1->mid.mid);
1085   uint32_t v2 = ntohl (m2->mid.mid);
1086   uint32_t delta;
1087
1088   delta = v2 - v1;
1089   if (0 == delta)
1090     *duplicate = GNUNET_YES;
1091   if (delta > (uint32_t) INT_MAX)
1092   {
1093     /* in overflow range, we can safely assume we wrapped around */
1094     return GNUNET_NO;
1095   }
1096   else
1097   {
1098     /* result is small, thus v2 > v1, thus e1 < e2 */
1099     return GNUNET_YES;
1100   }
1101 }
1102
1103
1104 /**
1105  * We got payload data for a channel.  Pass it on to the client
1106  * and send an ACK to the other end (once flow control allows it!)
1107  *
1108  * @param ch channel that got data
1109  * @param msg message that was received
1110  */
1111 void
1112 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1113                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1114 {
1115   struct GNUNET_MQ_Envelope *env;
1116   struct GNUNET_CADET_LocalData *ld;
1117   struct CadetChannelClient *ccc;
1118   size_t payload_size;
1119   struct CadetOutOfOrderMessage *com;
1120   int duplicate;
1121   uint32_t mid_min;
1122   uint32_t mid_max;
1123   uint32_t mid_msg;
1124
1125   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1126   if ( (GNUNET_YES == ch->destroy) &&
1127        (NULL == ch->owner) &&
1128        (NULL == ch->dest) )
1129   {
1130     /* This client is gone, but we still have messages to send to
1131        the other end (which is why @a ch is not yet dead).  However,
1132        we cannot pass messages to our client anymore. */
1133     LOG (GNUNET_ERROR_TYPE_DEBUG,
1134          "Dropping incoming payload on %s as this end is already closed\n",
1135          GCCH_2s (ch));
1136     /* FIXME: send back ACK/NACK/Closed notification
1137        to stop retransmissions! */
1138     return;
1139   }
1140   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1141   env = GNUNET_MQ_msg_extra (ld,
1142                              payload_size,
1143                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1144   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1145   GNUNET_memcpy (&ld[1],
1146                  &msg[1],
1147                  payload_size);
1148   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1149   if ( (GNUNET_YES == ccc->client_ready) &&
1150        ( (GNUNET_YES == ch->out_of_order) ||
1151          (msg->mid.mid == ch->mid_recv.mid) ) )
1152   {
1153     LOG (GNUNET_ERROR_TYPE_DEBUG,
1154          "Giving %u bytes of payload with MID %u from %s to client %s\n",
1155          (unsigned int) payload_size,
1156          ntohl (msg->mid.mid),
1157          GCCH_2s (ch),
1158          GSC_2s (ccc->c));
1159     ccc->client_ready = GNUNET_NO;
1160     GSC_send_to_client (ccc->c,
1161                         env);
1162     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1163     ch->mid_futures >>= 1;
1164     if (GNUNET_YES == ch->reliable)
1165       send_channel_data_ack (ch);
1166     return;
1167   }
1168
1169   /* check if message ought to be dropped because it is anicent/too distant/duplicate */
1170   mid_min = ntohl (ch->mid_recv.mid);
1171   mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
1172   mid_msg = ntohl (msg->mid.mid);
1173   if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
1174        ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
1175   {
1176     LOG (GNUNET_ERROR_TYPE_DEBUG,
1177          "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
1178          (unsigned int) payload_size,
1179          GCCH_2s (ch),
1180          ntohl (msg->mid.mid));
1181     GNUNET_STATISTICS_update (stats,
1182                               "# duplicate DATA (ancient or future)",
1183                               1,
1184                               GNUNET_NO);
1185     GNUNET_MQ_discard (env);
1186     return;
1187   }
1188
1189   /* Insert message into sorted out-of-order queue */
1190   com = GNUNET_new (struct CadetOutOfOrderMessage);
1191   com->mid = msg->mid;
1192   com->env = env;
1193   duplicate = GNUNET_NO;
1194   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1195                                       is_before,
1196                                       &duplicate,
1197                                       ccc->head_recv,
1198                                       ccc->tail_recv,
1199                                       com);
1200   if (GNUNET_YES == duplicate)
1201   {
1202     /* Duplicate within the queue, drop also */
1203     LOG (GNUNET_ERROR_TYPE_DEBUG,
1204          "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1205          (unsigned int) payload_size,
1206          GCCH_2s (ch),
1207          ntohl (msg->mid.mid));
1208     GNUNET_STATISTICS_update (stats,
1209                               "# duplicate DATA",
1210                               1,
1211                               GNUNET_NO);
1212     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1213                                  ccc->tail_recv,
1214                                  com);
1215     GNUNET_MQ_discard (com->env);
1216     GNUNET_free (com);
1217     if (GNUNET_YES == ch->reliable)
1218       send_channel_data_ack (ch);
1219     return;
1220   }
1221   LOG (GNUNET_ERROR_TYPE_DEBUG,
1222        "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1223        (GNUNET_YES == ccc->client_ready)
1224        ? "out-of-order"
1225        : "client-not-ready",
1226        (unsigned int) payload_size,
1227        GCCH_2s (ch),
1228        ntohl (ccc->ccn.channel_of_client),
1229        ccc,
1230        ntohl (msg->mid.mid),
1231        ntohl (ch->mid_recv.mid));
1232   send_channel_data_ack (ch);
1233 }
1234
1235
1236 /**
1237  * Function called once the tunnel has sent one of our messages.
1238  * If the message is unreliable, simply frees the `crm`. If the
1239  * message was reliable, calculate retransmission time and
1240  * wait for ACK (or retransmit).
1241  *
1242  * @param cls the `struct CadetReliableMessage` that was sent
1243  */
1244 static void
1245 data_sent_cb (void *cls);
1246
1247
1248 /**
1249  * We need to retry a transmission, the last one took too long to
1250  * be acknowledged.
1251  *
1252  * @param cls the `struct CadetChannel` where we need to retransmit
1253  */
1254 static void
1255 retry_transmission (void *cls)
1256 {
1257   struct CadetChannel *ch = cls;
1258   struct CadetReliableMessage *crm = ch->head_sent;
1259
1260   ch->retry_data_task = NULL;
1261   GNUNET_assert (NULL == crm->qe);
1262   LOG (GNUNET_ERROR_TYPE_DEBUG,
1263        "Retrying transmission on %s of message %u\n",
1264        GCCH_2s (ch),
1265        (unsigned int) ntohl (crm->data_message->mid.mid));
1266   crm->qe = GCT_send (ch->t,
1267                       &crm->data_message->header,
1268                       &data_sent_cb,
1269                       crm);
1270   GNUNET_assert (NULL == ch->retry_data_task);
1271 }
1272
1273
1274 /**
1275  * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1276  * the queue and tell our client that it can send more.
1277  *
1278  * @param ch the channel that got the PLAINTEXT_DATA_ACK
1279  * @param crm the message that got acknowledged
1280  */
1281 static void
1282 handle_matching_ack (struct CadetChannel *ch,
1283                      struct CadetReliableMessage *crm)
1284 {
1285   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1286                                ch->tail_sent,
1287                                crm);
1288   ch->pending_messages--;
1289   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1290   LOG (GNUNET_ERROR_TYPE_DEBUG,
1291        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1292        GCCH_2s (ch),
1293        (unsigned int) ntohl (crm->data_message->mid.mid),
1294        ch->pending_messages);
1295   if (NULL != crm->qe)
1296   {
1297     GCT_send_cancel (crm->qe);
1298     crm->qe = NULL;
1299   }
1300   GNUNET_free (crm->data_message);
1301   GNUNET_free (crm);
1302   send_ack_to_client (ch,
1303                       (NULL == ch->owner)
1304                       ? GNUNET_NO
1305                       : GNUNET_YES);
1306 }
1307
1308
1309 /**
1310  * We got an acknowledgement for payload data for a channel.
1311  * Possibly resume transmissions.
1312  *
1313  * @param ch channel that got the ack
1314  * @param ack details about what was received
1315  */
1316 void
1317 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1318                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1319 {
1320   struct CadetReliableMessage *crm;
1321   struct CadetReliableMessage *crmn;
1322   int found;
1323   uint32_t mid_base;
1324   uint64_t mid_mask;
1325   unsigned int delta;
1326
1327   GNUNET_break (GNUNET_NO == ch->is_loopback);
1328   if (GNUNET_NO == ch->reliable)
1329   {
1330     /* not expecting ACKs on unreliable channel, odd */
1331     GNUNET_break_op (0);
1332     return;
1333   }
1334   mid_base = ntohl (ack->mid.mid);
1335   mid_mask = GNUNET_htonll (ack->futures);
1336   found = GNUNET_NO;
1337   for (crm = ch->head_sent;
1338         NULL != crm;
1339        crm = crmn)
1340   {
1341     crmn = crm->next;
1342     if (ack->mid.mid == crm->data_message->mid.mid)
1343     {
1344       handle_matching_ack (ch,
1345                            crm);
1346       found = GNUNET_YES;
1347       continue;
1348     }
1349     delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1;
1350     if (delta >= 64)
1351       continue;
1352     if (0 != (mid_mask & (1LLU << delta)))
1353     {
1354       handle_matching_ack (ch,
1355                            crm);
1356       found = GNUNET_YES;
1357     }
1358   }
1359   if (GNUNET_NO == found)
1360   {
1361     /* ACK for message we already dropped, might have been a
1362        duplicate ACK? Ignore. */
1363     LOG (GNUNET_ERROR_TYPE_DEBUG,
1364          "Duplicate DATA_ACK on %s, ignoring\n",
1365          GCCH_2s (ch));
1366     GNUNET_STATISTICS_update (stats,
1367                               "# duplicate DATA_ACKs",
1368                               1,
1369                               GNUNET_NO);
1370     return;
1371   }
1372   if (NULL != ch->retry_data_task)
1373   {
1374     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1375     ch->retry_data_task = NULL;
1376   }
1377   if (NULL != ch->head_sent)
1378     ch->retry_data_task
1379       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1380                                  &retry_transmission,
1381                                  ch);
1382 }
1383
1384
1385 /**
1386  * Destroy channel, based on the other peer closing the
1387  * connection.  Also needs to remove this channel from
1388  * the tunnel.
1389  *
1390  * @param ch channel to destroy
1391  */
1392 void
1393 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1394 {
1395   struct CadetChannelClient *ccc;
1396
1397   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1398   LOG (GNUNET_ERROR_TYPE_DEBUG,
1399        "Received remote channel DESTROY for %s\n",
1400        GCCH_2s (ch));
1401   if (GNUNET_YES == ch->destroy)
1402   {
1403     /* Local client already gone, this is instant-death. */
1404     channel_destroy (ch);
1405     return;
1406   }
1407   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1408   if (NULL != ccc->head_recv)
1409   {
1410     LOG (GNUNET_ERROR_TYPE_WARNING,
1411          "Lost end of transmission due to remote shutdown on %s\n",
1412          GCCH_2s (ch));
1413     /* FIXME: change API to notify client about truncated transmission! */
1414   }
1415   ch->destroy = GNUNET_YES;
1416   GSC_handle_remote_channel_destroy (ccc->c,
1417                                      ccc->ccn,
1418                                      ch);
1419   channel_destroy (ch);
1420 }
1421
1422
1423 /**
1424  * Test if element @a e1 comes before element @a e2.
1425  *
1426  * @param cls closure, to a flag where we indicate duplicate packets
1427  * @param crm1 an element of to sort
1428  * @param crm2 another element to sort
1429  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1430  */
1431 static int
1432 cmp_crm_by_next_retry (void *cls,
1433                        struct CadetReliableMessage *crm1,
1434                        struct CadetReliableMessage *crm2)
1435 {
1436   if (crm1->next_retry.abs_value_us <
1437       crm2->next_retry.abs_value_us)
1438     return GNUNET_YES;
1439   return GNUNET_NO;
1440 }
1441
1442
1443 /**
1444  * Function called once the tunnel has sent one of our messages.
1445  * If the message is unreliable, simply frees the `crm`. If the
1446  * message was reliable, calculate retransmission time and
1447  * wait for ACK (or retransmit).
1448  *
1449  * @param cls the `struct CadetReliableMessage` that was sent
1450  */
1451 static void
1452 data_sent_cb (void *cls)
1453 {
1454   struct CadetReliableMessage *crm = cls;
1455   struct CadetChannel *ch = crm->ch;
1456
1457   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1458   GNUNET_assert (NULL != crm->qe);
1459   crm->qe = NULL;
1460   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1461                                ch->tail_sent,
1462                                crm);
1463   if (GNUNET_NO == ch->reliable)
1464   {
1465     GNUNET_free (crm->data_message);
1466     GNUNET_free (crm);
1467     ch->pending_messages--;
1468     send_ack_to_client (ch,
1469                         (NULL == ch->owner)
1470                         ? GNUNET_NO
1471                         : GNUNET_YES);
1472     return;
1473   }
1474   if (0 == crm->retry_delay.rel_value_us)
1475     crm->retry_delay = ch->expected_delay;
1476   else
1477     crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1478   crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1479                                                MIN_RTT_DELAY);
1480   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1481
1482   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1483                                       cmp_crm_by_next_retry,
1484                                       NULL,
1485                                       ch->head_sent,
1486                                       ch->tail_sent,
1487                                       crm);
1488   LOG (GNUNET_ERROR_TYPE_DEBUG,
1489        "Message %u sent, next transmission on %s in %s\n",
1490        (unsigned int) ntohl (crm->data_message->mid.mid),
1491        GCCH_2s (ch),
1492        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1493                                                GNUNET_YES));
1494   if (crm == ch->head_sent)
1495   {
1496     /* We are the new head, need to reschedule retry task */
1497     if (NULL != ch->retry_data_task)
1498       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1499     ch->retry_data_task
1500       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1501                                  &retry_transmission,
1502                                  ch);
1503   }
1504 }
1505
1506
1507 /**
1508  * Handle data given by a client.
1509  *
1510  * Check whether the client is allowed to send in this tunnel, save if
1511  * channel is reliable and send an ACK to the client if there is still
1512  * buffer space in the tunnel.
1513  *
1514  * @param ch Channel.
1515  * @param sender_ccn ccn of the sender
1516  * @param buf payload to transmit.
1517  * @param buf_len number of bytes in @a buf
1518  * @return #GNUNET_OK if everything goes well,
1519  *         #GNUNET_SYSERR in case of an error.
1520  */
1521 int
1522 GCCH_handle_local_data (struct CadetChannel *ch,
1523                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1524                         const char *buf,
1525                         size_t buf_len)
1526 {
1527   struct CadetReliableMessage *crm;
1528
1529   if (ch->pending_messages > ch->max_pending_messages)
1530   {
1531     GNUNET_break (0);
1532     return GNUNET_SYSERR;
1533   }
1534   ch->pending_messages++;
1535
1536   if (GNUNET_YES == ch->is_loopback)
1537   {
1538     struct CadetChannelClient *receiver;
1539     struct GNUNET_MQ_Envelope *env;
1540     struct GNUNET_CADET_LocalData *ld;
1541     int to_owner;
1542
1543     env = GNUNET_MQ_msg_extra (ld,
1544                                buf_len,
1545                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1546     if (sender_ccn.channel_of_client ==
1547         ch->owner->ccn.channel_of_client)
1548     {
1549       receiver = ch->dest;
1550       to_owner = GNUNET_NO;
1551     }
1552     else
1553     {
1554       GNUNET_assert (sender_ccn.channel_of_client ==
1555                      ch->dest->ccn.channel_of_client);
1556       receiver = ch->owner;
1557       to_owner = GNUNET_YES;
1558     }
1559     ld->ccn = receiver->ccn;
1560     GNUNET_memcpy (&ld[1],
1561                    buf,
1562                    buf_len);
1563     if (GNUNET_YES == receiver->client_ready)
1564     {
1565       GSC_send_to_client (receiver->c,
1566                           env);
1567       send_ack_to_client (ch,
1568                           to_owner);
1569     }
1570     else
1571     {
1572       struct CadetOutOfOrderMessage *oom;
1573
1574       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1575       oom->env = env;
1576       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1577                                         receiver->tail_recv,
1578                                         oom);
1579     }
1580     return GNUNET_OK;
1581   }
1582
1583   /* Everything is correct, send the message. */
1584   crm = GNUNET_malloc (sizeof (*crm));
1585   crm->ch = ch;
1586   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1587                                      + buf_len);
1588   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1589   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1590   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1591   crm->data_message->mid = ch->mid_send;
1592   crm->data_message->ctn = ch->ctn;
1593   GNUNET_memcpy (&crm->data_message[1],
1594                  buf,
1595                  buf_len);
1596   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1597                                ch->tail_sent,
1598                                crm);
1599   LOG (GNUNET_ERROR_TYPE_DEBUG,
1600        "Sending %u bytes from local client to %s with MID %u\n",
1601        buf_len,
1602        GCCH_2s (ch),
1603        ntohl (crm->data_message->mid.mid));
1604   if (NULL != ch->retry_data_task)
1605   {
1606     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1607     ch->retry_data_task = NULL;
1608   }
1609   crm->qe = GCT_send (ch->t,
1610                       &crm->data_message->header,
1611                       &data_sent_cb,
1612                       crm);
1613   GNUNET_assert (NULL == ch->retry_data_task);
1614   return GNUNET_OK;
1615 }
1616
1617
1618 /**
1619  * Handle ACK from client on local channel.  Means the client is ready
1620  * for more data, see if we have any for it.
1621  *
1622  * @param ch channel to destroy
1623  * @param client_ccn ccn of the client sending the ack
1624  */
1625 void
1626 GCCH_handle_local_ack (struct CadetChannel *ch,
1627                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1628 {
1629   struct CadetChannelClient *ccc;
1630   struct CadetOutOfOrderMessage *com;
1631
1632   if ( (NULL != ch->owner) &&
1633        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1634     ccc = ch->owner;
1635   else if ( (NULL != ch->dest) &&
1636             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1637     ccc = ch->dest;
1638   else
1639     GNUNET_assert (0);
1640   ccc->client_ready = GNUNET_YES;
1641   com = ccc->head_recv;
1642   if (NULL == com)
1643   {
1644     LOG (GNUNET_ERROR_TYPE_DEBUG,
1645          "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1646          GSC_2s (ccc->c),
1647          ntohl (client_ccn.channel_of_client),
1648          GCCH_2s (ch),
1649          ntohl (ccc->ccn.channel_of_client),
1650          ccc);
1651     return; /* none pending */
1652   }
1653   if (GNUNET_YES == ch->is_loopback)
1654   {
1655     int to_owner;
1656
1657     /* Messages are always in-order, just send */
1658     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1659                                  ccc->tail_recv,
1660                                  com);
1661     GSC_send_to_client (ccc->c,
1662                         com->env);
1663     /* Notify sender that we can receive more */
1664     if (ccc->ccn.channel_of_client ==
1665         ch->owner->ccn.channel_of_client)
1666     {
1667       to_owner = GNUNET_NO;
1668     }
1669     else
1670     {
1671       GNUNET_assert (ccc->ccn.channel_of_client ==
1672                      ch->dest->ccn.channel_of_client);
1673       to_owner = GNUNET_YES;
1674     }
1675     send_ack_to_client (ch,
1676                         to_owner);
1677     GNUNET_free (com);
1678     return;
1679   }
1680
1681   if ( (com->mid.mid != ch->mid_recv.mid) &&
1682        (GNUNET_NO == ch->out_of_order) )
1683   {
1684     LOG (GNUNET_ERROR_TYPE_DEBUG,
1685          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1686          GSC_2s (ccc->c),
1687          ntohl (ccc->ccn.channel_of_client),
1688          ntohl (com->mid.mid),
1689          ntohl (ch->mid_recv.mid));
1690     return; /* missing next one in-order */
1691   }
1692
1693   LOG (GNUNET_ERROR_TYPE_DEBUG,
1694        "Got LOCAL ACK, passing payload message %u to %s-%X on %s\n",
1695        ntohl (com->mid.mid),
1696        GSC_2s (ccc->c),
1697        ntohl (ccc->ccn.channel_of_client),
1698        GCCH_2s (ch));
1699
1700   /* all good, pass next message to client */
1701   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1702                                ccc->tail_recv,
1703                                com);
1704   /* FIXME: if unreliable, this is not aggressive
1705      enough, as it would be OK to have lost some! */
1706   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1707   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1708   ccc->client_ready = GNUNET_NO;
1709   GSC_send_to_client (ccc->c,
1710                       com->env);
1711   GNUNET_free (com);
1712   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1713        (GNUNET_YES == ch->reliable) )
1714   {
1715     /* The next 15 messages were also already received (0xFF), this
1716        suggests that the sender may be blocked on flow control
1717        urgently waiting for an ACK from us. (As we have an inherent
1718        maximum of 64 bits, and 15 is getting too close for comfort.)
1719        So we should send one now. */
1720     LOG (GNUNET_ERROR_TYPE_DEBUG,
1721          "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1722          GCCH_2s (ch));
1723     if (GNUNET_YES == ch->reliable)
1724       send_channel_data_ack (ch);
1725   }
1726
1727   if (NULL != ccc->head_recv)
1728     return;
1729   if (GNUNET_NO == ch->destroy)
1730     return;
1731   GCT_send_channel_destroy (ch->t,
1732                             ch->ctn);
1733   channel_destroy (ch);
1734 }
1735
1736
1737 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1738
1739
1740 /**
1741  * Log channel info.
1742  *
1743  * @param ch Channel.
1744  * @param level Debug level to use.
1745  */
1746 void
1747 GCCH_debug (struct CadetChannel *ch,
1748             enum GNUNET_ErrorType level)
1749 {
1750   int do_log;
1751
1752   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1753                                        "cadet-chn",
1754                                        __FILE__, __FUNCTION__, __LINE__);
1755   if (0 == do_log)
1756     return;
1757
1758   if (NULL == ch)
1759   {
1760     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1761     return;
1762   }
1763   LOG2 (level,
1764         "CHN %s:%X (%p)\n",
1765         GCT_2s (ch->t),
1766         ch->ctn,
1767         ch);
1768   if (NULL != ch->owner)
1769   {
1770     LOG2 (level,
1771           "CHN origin %s ready %s local-id: %u\n",
1772           GSC_2s (ch->owner->c),
1773           ch->owner->client_ready ? "YES" : "NO",
1774           ntohl (ch->owner->ccn.channel_of_client));
1775   }
1776   if (NULL != ch->dest)
1777   {
1778     LOG2 (level,
1779           "CHN destination %s ready %s local-id: %u\n",
1780           GSC_2s (ch->dest->c),
1781           ch->dest->client_ready ? "YES" : "NO",
1782           ntohl (ch->dest->ccn.channel_of_client));
1783   }
1784   LOG2 (level,
1785         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1786         ntohl (ch->mid_recv.mid),
1787         (unsigned long long) ch->mid_futures,
1788         ntohl (ch->mid_send.mid));
1789 }
1790
1791
1792
1793 /* end of gnunet-service-cadet-new_channel.c */