towards proper DATA_ACK handling
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - Optimize ACKs by using 'mid_futures' properly!
29  * - introduce shutdown so we can have half-closed channels, modify
30  *   destroy to include MID to have FIN-ACK equivalents, etc.
31  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
32  * - check that '0xFFULL' really is sufficient for flow control!
33  * - revisit handling of 'unreliable' traffic!
34  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
35  * - figure out flow control without ACKs (unreliable traffic!)
36  */
37 #include "platform.h"
38 #include "gnunet_util_lib.h"
39 #include "cadet.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-cadet-new.h"
42 #include "gnunet-service-cadet-new_channel.h"
43 #include "gnunet-service-cadet-new_connection.h"
44 #include "gnunet-service-cadet-new_tunnels.h"
45 #include "gnunet-service-cadet-new_peer.h"
46 #include "gnunet-service-cadet-new_paths.h"
47
48 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
49
50 /**
51  * How long do we initially wait before retransmitting?
52  */
53 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
54
55 /**
56  * How long do we wait before dropping state about incoming
57  * connection to closed port?
58  */
59 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
60
61 /**
62  * How long do we wait at least before retransmitting ever?
63  */
64 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
65
66 /**
67  * Maximum message ID into the future we accept for out-of-order messages.
68  * If the message is more than this into the future, we drop it.  This is
69  * important both to detect values that are actually in the past, as well
70  * as to limit adversarially triggerable memory consumption.
71  *
72  * Note that right now we have "max_pending_messages = 4" hard-coded in
73  * the logic below, so a value of 4 would suffice here. But we plan to
74  * allow larger windows in the future...
75  */
76 #define MAX_OUT_OF_ORDER_DISTANCE 1024
77
78
79 /**
80  * All the states a connection can be in.
81  */
82 enum CadetChannelState
83 {
84   /**
85    * Uninitialized status, should never appear in operation.
86    */
87   CADET_CHANNEL_NEW,
88
89   /**
90    * Connection create message sent, waiting for ACK.
91    */
92   CADET_CHANNEL_OPEN_SENT,
93
94   /**
95    * Connection confirmed, ready to carry traffic.
96    */
97   CADET_CHANNEL_READY
98 };
99
100
101 /**
102  * Info needed to retry a message in case it gets lost.
103  * Note that we DO use this structure also for unreliable
104  * messages.
105  */
106 struct CadetReliableMessage
107 {
108   /**
109    * Double linked list, FIFO style
110    */
111   struct CadetReliableMessage *next;
112
113   /**
114    * Double linked list, FIFO style
115    */
116   struct CadetReliableMessage *prev;
117
118   /**
119    * Which channel is this message in?
120    */
121   struct CadetChannel *ch;
122
123   /**
124    * Entry in the tunnels queue for this message, NULL if it has left
125    * the tunnel.  Used to cancel transmission in case we receive an
126    * ACK in time.
127    */
128   struct CadetTunnelQueueEntry *qe;
129
130   /**
131    * How soon should we retry if we fail to get an ACK?
132    * Messages in the queue are sorted by this value.
133    */
134   struct GNUNET_TIME_Absolute next_retry;
135
136   /**
137    * How long do we wait for an ACK after transmission?
138    * Use for the back-off calculation.
139    */
140   struct GNUNET_TIME_Relative retry_delay;
141
142   /**
143    * Data message we are trying to send.
144    */
145   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
146
147 };
148
149
150 /**
151  * List of received out-of-order data messages.
152  */
153 struct CadetOutOfOrderMessage
154 {
155   /**
156    * Double linked list, FIFO style
157    */
158   struct CadetOutOfOrderMessage *next;
159
160   /**
161    * Double linked list, FIFO style
162    */
163   struct CadetOutOfOrderMessage *prev;
164
165   /**
166    * ID of the message (messages up to this point needed
167    * before we give this one to the client).
168    */
169   struct ChannelMessageIdentifier mid;
170
171   /**
172    * The envelope with the payload of the out-of-order message
173    */
174   struct GNUNET_MQ_Envelope *env;
175
176 };
177
178
179 /**
180  * Client endpoint of a `struct CadetChannel`.  A channel may be a
181  * loopback channel, in which case it has two of these endpoints.
182  * Note that flow control also is required in both directions.
183  */
184 struct CadetChannelClient
185 {
186   /**
187    * Client handle.  Not by itself sufficient to designate
188    * the client endpoint, as the same client handle may
189    * be used for both the owner and the destination, and
190    * we thus also need the channel ID to identify the client.
191    */
192   struct CadetClient *c;
193
194   /**
195    * Head of DLL of messages received out of order or while client was unready.
196    */
197   struct CadetOutOfOrderMessage *head_recv;
198
199   /**
200    * Tail DLL of messages received out of order or while client was unready.
201    */
202   struct CadetOutOfOrderMessage *tail_recv;
203
204   /**
205    * Local tunnel number for this client.
206    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
207    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
208    */
209   struct GNUNET_CADET_ClientChannelNumber ccn;
210
211   /**
212    * Can we send data to the client?
213    */
214   int client_ready;
215
216 };
217
218
219 /**
220  * Struct containing all information regarding a channel to a remote client.
221  */
222 struct CadetChannel
223 {
224   /**
225    * Tunnel this channel is in.
226    */
227   struct CadetTunnel *t;
228
229   /**
230    * Client owner of the tunnel, if any.
231    * (Used if this channel represends the initiating end of the tunnel.)
232    */
233   struct CadetChannelClient *owner;
234
235   /**
236    * Client destination of the tunnel, if any.
237    * (Used if this channel represents the listening end of the tunnel.)
238    */
239   struct CadetChannelClient *dest;
240
241   /**
242    * Last entry in the tunnel's queue relating to control messages
243    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
244    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
245    * transmission in case we receive updated information.
246    */
247   struct CadetTunnelQueueEntry *last_control_qe;
248
249   /**
250    * Head of DLL of messages sent and not yet ACK'd.
251    */
252   struct CadetReliableMessage *head_sent;
253
254   /**
255    * Tail of DLL of messages sent and not yet ACK'd.
256    */
257   struct CadetReliableMessage *tail_sent;
258
259   /**
260    * Task to resend/poll in case no ACK is received.
261    */
262   struct GNUNET_SCHEDULER_Task *retry_control_task;
263
264   /**
265    * Task to resend/poll in case no ACK is received.
266    */
267   struct GNUNET_SCHEDULER_Task *retry_data_task;
268
269   /**
270    * Last time the channel was used
271    */
272   struct GNUNET_TIME_Absolute timestamp;
273
274   /**
275    * Destination port of the channel.
276    */
277   struct GNUNET_HashCode port;
278
279   /**
280    * Counter for exponential backoff.
281    */
282   struct GNUNET_TIME_Relative retry_time;
283
284   /**
285    * How long does it usually take to get an ACK.
286    */
287   struct GNUNET_TIME_Relative expected_delay;
288
289   /**
290    * Bitfield of already-received messages past @e mid_recv.
291    *
292    * FIXME: not yet properly used (bits here are never set!)
293    */
294   uint64_t mid_futures;
295
296   /**
297    * Next MID expected for incoming traffic.
298    */
299   struct ChannelMessageIdentifier mid_recv;
300
301   /**
302    * Next MID to use for outgoing traffic.
303    */
304   struct ChannelMessageIdentifier mid_send;
305
306   /**
307    * Total (reliable) messages pending ACK for this channel.
308    */
309   unsigned int pending_messages;
310
311   /**
312    * Maximum (reliable) messages pending ACK for this channel
313    * before we throttle the client.
314    */
315   unsigned int max_pending_messages;
316
317   /**
318    * Number identifying this channel in its tunnel.
319    */
320   struct GNUNET_CADET_ChannelTunnelNumber ctn;
321
322   /**
323    * Channel state.
324    */
325   enum CadetChannelState state;
326
327   /**
328    * Is the tunnel bufferless (minimum latency)?
329    */
330   int nobuffer;
331
332   /**
333    * Is the tunnel reliable?
334    */
335   int reliable;
336
337   /**
338    * Is the tunnel out-of-order?
339    */
340   int out_of_order;
341
342   /**
343    * Is this channel a loopback channel, where the destination is us again?
344    */
345   int is_loopback;
346
347   /**
348    * Flag to signal the destruction of the channel.  If this is set to
349    * #GNUNET_YES the channel will be destroyed once the queue is
350    * empty.
351    */
352   int destroy;
353
354 };
355
356
357 /**
358  * Get the static string for identification of the channel.
359  *
360  * @param ch Channel.
361  *
362  * @return Static string with the channel IDs.
363  */
364 const char *
365 GCCH_2s (const struct CadetChannel *ch)
366 {
367   static char buf[128];
368
369   GNUNET_snprintf (buf,
370                    sizeof (buf),
371                    "Channel %s:%s ctn:%X(%X/%X)",
372                    (GNUNET_YES == ch->is_loopback)
373                    ? "loopback"
374                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
375                    GNUNET_h2s (&ch->port),
376                    ch->ctn,
377                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
378                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
379   return buf;
380 }
381
382
383 /**
384  * Get the channel's public ID.
385  *
386  * @param ch Channel.
387  *
388  * @return ID used to identify the channel with the remote peer.
389  */
390 struct GNUNET_CADET_ChannelTunnelNumber
391 GCCH_get_id (const struct CadetChannel *ch)
392 {
393   return ch->ctn;
394 }
395
396
397 /**
398  * Release memory associated with @a ccc
399  *
400  * @param ccc data structure to clean up
401  */
402 static void
403 free_channel_client (struct CadetChannelClient *ccc)
404 {
405   struct CadetOutOfOrderMessage *com;
406
407   while (NULL != (com = ccc->head_recv))
408   {
409     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
410                                  ccc->tail_recv,
411                                  com);
412     GNUNET_MQ_discard (com->env);
413     GNUNET_free (com);
414   }
415   GNUNET_free (ccc);
416 }
417
418
419 /**
420  * Destroy the given channel.
421  *
422  * @param ch channel to destroy
423  */
424 static void
425 channel_destroy (struct CadetChannel *ch)
426 {
427   struct CadetReliableMessage *crm;
428
429   while (NULL != (crm = ch->head_sent))
430   {
431     GNUNET_assert (ch == crm->ch);
432     if (NULL != crm->qe)
433     {
434       GCT_send_cancel (crm->qe);
435       crm->qe = NULL;
436     }
437     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
438                                  ch->tail_sent,
439                                  crm);
440     GNUNET_free (crm->data_message);
441     GNUNET_free (crm);
442   }
443   if (NULL != ch->owner)
444   {
445     free_channel_client (ch->owner);
446     ch->owner = NULL;
447   }
448   if (NULL != ch->dest)
449   {
450     free_channel_client (ch->dest);
451     ch->dest = NULL;
452   }
453   if (NULL != ch->last_control_qe)
454   {
455     GCT_send_cancel (ch->last_control_qe);
456     ch->last_control_qe = NULL;
457   }
458   if (NULL != ch->retry_data_task)
459   {
460     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
461     ch->retry_data_task = NULL;
462   }
463   if (NULL != ch->retry_control_task)
464   {
465     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
466     ch->retry_control_task = NULL;
467   }
468   if (GNUNET_NO == ch->is_loopback)
469   {
470     GCT_remove_channel (ch->t,
471                         ch,
472                         ch->ctn);
473     ch->t = NULL;
474   }
475   GNUNET_free (ch);
476 }
477
478
479 /**
480  * Send a channel create message.
481  *
482  * @param cls Channel for which to send.
483  */
484 static void
485 send_channel_open (void *cls);
486
487
488 /**
489  * Function called once the tunnel confirms that we sent the
490  * create message.  Delays for a bit until we retry.
491  *
492  * @param cls our `struct CadetChannel`.
493  */
494 static void
495 channel_open_sent_cb (void *cls)
496 {
497   struct CadetChannel *ch = cls;
498
499   GNUNET_assert (NULL != ch->last_control_qe);
500   ch->last_control_qe = NULL;
501   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
502   LOG (GNUNET_ERROR_TYPE_DEBUG,
503        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
504        GCCH_2s (ch),
505        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
506                                                GNUNET_YES));
507   ch->retry_control_task
508     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
509                                     &send_channel_open,
510                                     ch);
511 }
512
513
514 /**
515  * Send a channel open message.
516  *
517  * @param cls Channel for which to send.
518  */
519 static void
520 send_channel_open (void *cls)
521 {
522   struct CadetChannel *ch = cls;
523   struct GNUNET_CADET_ChannelOpenMessage msgcc;
524   uint32_t options;
525
526   ch->retry_control_task = NULL;
527   LOG (GNUNET_ERROR_TYPE_DEBUG,
528        "Sending CHANNEL_OPEN message for %s\n",
529        GCCH_2s (ch));
530   options = 0;
531   if (ch->nobuffer)
532     options |= GNUNET_CADET_OPTION_NOBUFFER;
533   if (ch->reliable)
534     options |= GNUNET_CADET_OPTION_RELIABLE;
535   if (ch->out_of_order)
536     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
537   msgcc.header.size = htons (sizeof (msgcc));
538   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
539   msgcc.opt = htonl (options);
540   msgcc.port = ch->port;
541   msgcc.ctn = ch->ctn;
542   ch->state = CADET_CHANNEL_OPEN_SENT;
543   ch->last_control_qe = GCT_send (ch->t,
544                                   &msgcc.header,
545                                   &channel_open_sent_cb,
546                                   ch);
547   GNUNET_assert (NULL == ch->retry_control_task);
548 }
549
550
551 /**
552  * Function called once and only once after a channel was bound
553  * to its tunnel via #GCT_add_channel() is ready for transmission.
554  * Note that this is only the case for channels that this peer
555  * initiates, as for incoming channels we assume that they are
556  * ready for transmission immediately upon receiving the open
557  * message.  Used to bootstrap the #GCT_send() process.
558  *
559  * @param ch the channel for which the tunnel is now ready
560  */
561 void
562 GCCH_tunnel_up (struct CadetChannel *ch)
563 {
564   GNUNET_assert (NULL == ch->retry_control_task);
565   LOG (GNUNET_ERROR_TYPE_DEBUG,
566        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
567        GCCH_2s (ch));
568   ch->retry_control_task
569     = GNUNET_SCHEDULER_add_now (&send_channel_open,
570                                 ch);
571 }
572
573
574 /**
575  * Create a new channel.
576  *
577  * @param owner local client owning the channel
578  * @param ccn local number of this channel at the @a owner
579  * @param destination peer to which we should build the channel
580  * @param port desired port at @a destination
581  * @param options options for the channel
582  * @return handle to the new channel
583  */
584 struct CadetChannel *
585 GCCH_channel_local_new (struct CadetClient *owner,
586                         struct GNUNET_CADET_ClientChannelNumber ccn,
587                         struct CadetPeer *destination,
588                         const struct GNUNET_HashCode *port,
589                         uint32_t options)
590 {
591   struct CadetChannel *ch;
592   struct CadetChannelClient *ccco;
593
594   ccco = GNUNET_new (struct CadetChannelClient);
595   ccco->c = owner;
596   ccco->ccn = ccn;
597   ccco->client_ready = GNUNET_YES;
598
599   ch = GNUNET_new (struct CadetChannel);
600   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
601   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
602   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
603   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
604   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
605   ch->owner = ccco;
606   ch->port = *port;
607   if (0 == memcmp (&my_full_id,
608                    GCP_get_id (destination),
609                    sizeof (struct GNUNET_PeerIdentity)))
610   {
611     struct CadetClient *c;
612
613     ch->is_loopback = GNUNET_YES;
614     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
615                                            port);
616     if (NULL == c)
617     {
618       /* port closed, wait for it to possibly open */
619       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
620                                                 port,
621                                                 ch,
622                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
623       LOG (GNUNET_ERROR_TYPE_DEBUG,
624            "Created loose incoming loopback channel to port %s\n",
625            GNUNET_h2s (&ch->port));
626     }
627     else
628     {
629       ch->dest = GNUNET_new (struct CadetChannelClient);
630       ch->dest->c = c;
631       ch->dest->client_ready = GNUNET_YES;
632       GCCH_bind (ch,
633                  ch->dest->c);
634     }
635   }
636   else
637   {
638     ch->t = GCP_get_tunnel (destination,
639                             GNUNET_YES);
640     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
641     ch->ctn = GCT_add_channel (ch->t,
642                                ch);
643   }
644   GNUNET_STATISTICS_update (stats,
645                             "# channels",
646                             1,
647                             GNUNET_NO);
648   LOG (GNUNET_ERROR_TYPE_DEBUG,
649        "Created channel to port %s at peer %s for %s using %s\n",
650        GNUNET_h2s (port),
651        GCP_2s (destination),
652        GSC_2s (owner),
653        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
654   return ch;
655 }
656
657
658 /**
659  * We had an incoming channel to a port that is closed.
660  * It has not been opened for a while, drop it.
661  *
662  * @param cls the channel to drop
663  */
664 static void
665 timeout_closed_cb (void *cls)
666 {
667   struct CadetChannel *ch = cls;
668
669   ch->retry_control_task = NULL;
670   LOG (GNUNET_ERROR_TYPE_DEBUG,
671        "Closing incoming channel to port %s from peer %s due to timeout\n",
672        GNUNET_h2s (&ch->port),
673        GCP_2s (GCT_get_destination (ch->t)));
674   channel_destroy (ch);
675 }
676
677
678 /**
679  * Create a new channel based on a request coming in over the network.
680  *
681  * @param t tunnel to the remote peer
682  * @param ctn identifier of this channel in the tunnel
683  * @param port desired local port
684  * @param options options for the channel
685  * @return handle to the new channel
686  */
687 struct CadetChannel *
688 GCCH_channel_incoming_new (struct CadetTunnel *t,
689                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
690                            const struct GNUNET_HashCode *port,
691                            uint32_t options)
692 {
693   struct CadetChannel *ch;
694   struct CadetClient *c;
695
696   ch = GNUNET_new (struct CadetChannel);
697   ch->port = *port;
698   ch->t = t;
699   ch->ctn = ctn;
700   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
701   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
702   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
703   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
704   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
705   GNUNET_STATISTICS_update (stats,
706                             "# channels",
707                             1,
708                             GNUNET_NO);
709
710   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
711                                          port);
712   if (NULL == c)
713   {
714     /* port closed, wait for it to possibly open */
715     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
716                                               port,
717                                               ch,
718                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
719     ch->retry_control_task
720       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
721                                       &timeout_closed_cb,
722                                       ch);
723     LOG (GNUNET_ERROR_TYPE_DEBUG,
724          "Created loose incoming channel to port %s from peer %s\n",
725          GNUNET_h2s (&ch->port),
726          GCP_2s (GCT_get_destination (ch->t)));
727   }
728   else
729   {
730     GCCH_bind (ch,
731                c);
732   }
733   GNUNET_STATISTICS_update (stats,
734                             "# channels",
735                             1,
736                             GNUNET_NO);
737   return ch;
738 }
739
740
741 /**
742  * Function called once the tunnel confirms that we sent the
743  * ACK message.  Just remembers it was sent, we do not expect
744  * ACKs for ACKs ;-).
745  *
746  * @param cls our `struct CadetChannel`.
747  */
748 static void
749 send_ack_cb (void *cls)
750 {
751   struct CadetChannel *ch = cls;
752
753   GNUNET_assert (NULL != ch->last_control_qe);
754   ch->last_control_qe = NULL;
755 }
756
757
758 /**
759  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
760  *
761  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
762  */
763 static void
764 send_channel_data_ack (struct CadetChannel *ch)
765 {
766   struct GNUNET_CADET_ChannelDataAckMessage msg;
767
768   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
769   msg.header.size = htons (sizeof (msg));
770   msg.ctn = ch->ctn;
771   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
772   msg.futures = GNUNET_htonll (ch->mid_futures);
773   if (NULL != ch->last_control_qe)
774     GCT_send_cancel (ch->last_control_qe);
775   LOG (GNUNET_ERROR_TYPE_DEBUG,
776        "Sending DATA_ACK %u:%llX via %s\n",
777        (unsigned int) ntohl (msg.mid.mid),
778        (unsigned long long) ch->mid_futures,
779        GCCH_2s (ch));
780   ch->last_control_qe = GCT_send (ch->t,
781                                   &msg.header,
782                                   &send_ack_cb,
783                                   ch);
784 }
785
786
787 /**
788  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
789  * connection is up.
790  *
791  * @param cls the `struct CadetChannel`
792  */
793 static void
794 send_open_ack (void *cls)
795 {
796   struct CadetChannel *ch = cls;
797   struct GNUNET_CADET_ChannelManageMessage msg;
798
799   LOG (GNUNET_ERROR_TYPE_DEBUG,
800        "Sending CHANNEL_OPEN_ACK on %s\n",
801        GCCH_2s (ch));
802   ch->retry_control_task = NULL;
803   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
804   msg.header.size = htons (sizeof (msg));
805   msg.reserved = htonl (0);
806   msg.ctn = ch->ctn;
807   if (NULL != ch->last_control_qe)
808     GCT_send_cancel (ch->last_control_qe);
809   ch->last_control_qe = GCT_send (ch->t,
810                                   &msg.header,
811                                   &send_ack_cb,
812                                   ch);
813 }
814
815
816 /**
817  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
818  * this channel.  If the binding was successful, (re)transmit the
819  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
820  *
821  * @param ch channel that got the duplicate open
822  */
823 void
824 GCCH_handle_duplicate_open (struct CadetChannel *ch)
825 {
826   if (NULL == ch->dest)
827   {
828     LOG (GNUNET_ERROR_TYPE_DEBUG,
829          "Ignoring duplicate channel OPEN on %s: port is closed\n",
830          GCCH_2s (ch));
831     return;
832   }
833   if (NULL != ch->retry_control_task)
834   {
835     LOG (GNUNET_ERROR_TYPE_DEBUG,
836          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
837          GCCH_2s (ch));
838     return;
839   }
840   LOG (GNUNET_ERROR_TYPE_DEBUG,
841        "Retransmitting OPEN_ACK on %s\n",
842        GCCH_2s (ch));
843   ch->retry_control_task
844     = GNUNET_SCHEDULER_add_now (&send_open_ack,
845                                 ch);
846 }
847
848
849 /**
850  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
851  *
852  * @param ch channel the ack is for
853  * @param to_owner #GNUNET_YES to send to owner,
854  *                 #GNUNET_NO to send to dest
855  */
856 static void
857 send_ack_to_client (struct CadetChannel *ch,
858                     int to_owner)
859 {
860   struct GNUNET_MQ_Envelope *env;
861   struct GNUNET_CADET_LocalAck *ack;
862   struct CadetChannelClient *ccc;
863
864   env = GNUNET_MQ_msg (ack,
865                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
866   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
867   ack->ccn = ccc->ccn;
868   LOG (GNUNET_ERROR_TYPE_DEBUG,
869        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
870        GSC_2s (ccc->c),
871        (GNUNET_YES == to_owner) ? "owner" : "dest",
872        ntohl (ack->ccn.channel_of_client),
873        ch->pending_messages,
874        ch->max_pending_messages);
875   GSC_send_to_client (ccc->c,
876                       env);
877 }
878
879
880 /**
881  * A client is bound to the port that we have a channel
882  * open to.  Send the acknowledgement for the connection
883  * request and establish the link with the client.
884  *
885  * @param ch open incoming channel
886  * @param c client listening on the respective port
887  */
888 void
889 GCCH_bind (struct CadetChannel *ch,
890            struct CadetClient *c)
891 {
892   uint32_t options;
893   struct CadetChannelClient *cccd;
894
895   LOG (GNUNET_ERROR_TYPE_DEBUG,
896        "Binding %s from %s to port %s of %s\n",
897        GCCH_2s (ch),
898        GCT_2s (ch->t),
899        GNUNET_h2s (&ch->port),
900        GSC_2s (c));
901   if (NULL != ch->retry_control_task)
902   {
903     /* there might be a timeout task here */
904     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
905     ch->retry_control_task = NULL;
906   }
907   options = 0;
908   if (ch->nobuffer)
909     options |= GNUNET_CADET_OPTION_NOBUFFER;
910   if (ch->reliable)
911     options |= GNUNET_CADET_OPTION_RELIABLE;
912   if (ch->out_of_order)
913     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
914   cccd = GNUNET_new (struct CadetChannelClient);
915   ch->dest = cccd;
916   cccd->c = c;
917   cccd->client_ready = GNUNET_YES;
918   cccd->ccn = GSC_bind (c,
919                         ch,
920                         (GNUNET_YES == ch->is_loopback)
921                         ? GCP_get (&my_full_id,
922                                    GNUNET_YES)
923                         : GCT_get_destination (ch->t),
924                         &ch->port,
925                         options);
926   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
927                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
928   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
929   if (GNUNET_YES == ch->is_loopback)
930   {
931     ch->state = CADET_CHANNEL_OPEN_SENT;
932     GCCH_handle_channel_open_ack (ch);
933   }
934   else
935   {
936     /* notify other peer that we accepted the connection */
937     ch->retry_control_task
938       = GNUNET_SCHEDULER_add_now (&send_open_ack,
939                                   ch);
940   }
941   /* give client it's initial supply of ACKs */
942   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
943                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
944   for (unsigned int i=0;i<ch->max_pending_messages;i++)
945     send_ack_to_client (ch,
946                         GNUNET_NO);
947 }
948
949
950 /**
951  * Destroy locally created channel.  Called by the local client, so no
952  * need to tell the client.
953  *
954  * @param ch channel to destroy
955  * @param c client that caused the destruction
956  * @param ccn client number of the client @a c
957  */
958 void
959 GCCH_channel_local_destroy (struct CadetChannel *ch,
960                             struct CadetClient *c,
961                             struct GNUNET_CADET_ClientChannelNumber ccn)
962 {
963   LOG (GNUNET_ERROR_TYPE_DEBUG,
964        "%s asks for destruction of %s\n",
965        GSC_2s (c),
966        GCCH_2s (ch));
967   GNUNET_assert (NULL != c);
968   if ( (NULL != ch->owner) &&
969        (c == ch->owner->c) &&
970        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
971   {
972     free_channel_client (ch->owner);
973     ch->owner = NULL;
974   }
975   else if ( (NULL != ch->dest) &&
976             (c == ch->dest->c) &&
977             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
978   {
979     free_channel_client (ch->dest);
980     ch->dest = NULL;
981   }
982   else
983   {
984     GNUNET_assert (0);
985   }
986
987   if (GNUNET_YES == ch->destroy)
988   {
989     /* other end already destroyed, with the local client gone, no need
990        to finish transmissions, just destroy immediately. */
991     channel_destroy (ch);
992     return;
993   }
994   if ( (NULL != ch->head_sent) ||
995        (NULL != ch->owner) ||
996        (NULL != ch->dest) )
997   {
998     /* Wait for other end to destroy us as well,
999        and otherwise allow send queue to be transmitted first */
1000     ch->destroy = GNUNET_YES;
1001     return;
1002   }
1003   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1004   if (CADET_CHANNEL_NEW != ch->state)
1005     GCT_send_channel_destroy (ch->t,
1006                               ch->ctn);
1007   /* Nothing left to do, just finish destruction */
1008   channel_destroy (ch);
1009 }
1010
1011
1012 /**
1013  * We got an acknowledgement for the creation of the channel
1014  * (the port is open on the other side). Begin transmissions.
1015  *
1016  * @param ch channel to destroy
1017  */
1018 void
1019 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
1020 {
1021   switch (ch->state)
1022   {
1023   case CADET_CHANNEL_NEW:
1024     /* this should be impossible */
1025     GNUNET_break (0);
1026     break;
1027   case CADET_CHANNEL_OPEN_SENT:
1028     if (NULL == ch->owner)
1029     {
1030       /* We're not the owner, wrong direction! */
1031       GNUNET_break_op (0);
1032       return;
1033     }
1034     LOG (GNUNET_ERROR_TYPE_DEBUG,
1035          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1036          GCCH_2s (ch));
1037     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1038     {
1039       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1040       ch->retry_control_task = NULL;
1041     }
1042     ch->state = CADET_CHANNEL_READY;
1043     /* On first connect, send client as many ACKs as we allow messages
1044        to be buffered! */
1045     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1046       send_ack_to_client (ch,
1047                           GNUNET_YES);
1048     break;
1049   case CADET_CHANNEL_READY:
1050     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1051     LOG (GNUNET_ERROR_TYPE_DEBUG,
1052          "Received duplicate channel OPEN_ACK for %s\n",
1053          GCCH_2s (ch));
1054     GNUNET_STATISTICS_update (stats,
1055                               "# duplicate CREATE_ACKs",
1056                               1,
1057                               GNUNET_NO);
1058     break;
1059   }
1060 }
1061
1062
1063 /**
1064  * Test if element @a e1 comes before element @a e2.
1065  *
1066  * @param cls closure, to a flag where we indicate duplicate packets
1067  * @param e1 an element of to sort
1068  * @param e2 another element to sort
1069  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1070  */
1071 static int
1072 is_before (void *cls,
1073            struct CadetOutOfOrderMessage *m1,
1074            struct CadetOutOfOrderMessage *m2)
1075 {
1076   int *duplicate = cls;
1077   uint32_t v1 = ntohl (m1->mid.mid);
1078   uint32_t v2 = ntohl (m2->mid.mid);
1079   uint32_t delta;
1080
1081   delta = v2 - v1;
1082   if (0 == delta)
1083     *duplicate = GNUNET_YES;
1084   if (delta > (uint32_t) INT_MAX)
1085   {
1086     /* in overflow range, we can safely assume we wrapped around */
1087     return GNUNET_NO;
1088   }
1089   else
1090   {
1091     /* result is small, thus v2 > v1, thus e1 < e2 */
1092     return GNUNET_YES;
1093   }
1094 }
1095
1096
1097 /**
1098  * We got payload data for a channel.  Pass it on to the client
1099  * and send an ACK to the other end (once flow control allows it!)
1100  *
1101  * @param ch channel that got data
1102  * @param msg message that was received
1103  */
1104 void
1105 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1106                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1107 {
1108   struct GNUNET_MQ_Envelope *env;
1109   struct GNUNET_CADET_LocalData *ld;
1110   struct CadetChannelClient *ccc;
1111   size_t payload_size;
1112   struct CadetOutOfOrderMessage *com;
1113   int duplicate;
1114   uint32_t mid_min;
1115   uint32_t mid_max;
1116   uint32_t mid_msg;
1117
1118   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1119   if ( (GNUNET_YES == ch->destroy) &&
1120        (NULL == ch->owner) &&
1121        (NULL == ch->dest) )
1122   {
1123     /* This client is gone, but we still have messages to send to
1124        the other end (which is why @a ch is not yet dead).  However,
1125        we cannot pass messages to our client anymore. */
1126     LOG (GNUNET_ERROR_TYPE_DEBUG,
1127          "Dropping incoming payload on %s as this end is already closed\n",
1128          GCCH_2s (ch));
1129     /* FIXME: send back ACK/NACK/Closed notification
1130        to stop retransmissions! */
1131     return;
1132   }
1133   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1134   env = GNUNET_MQ_msg_extra (ld,
1135                              payload_size,
1136                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1137   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1138   GNUNET_memcpy (&ld[1],
1139                  &msg[1],
1140                  payload_size);
1141   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1142   if ( (GNUNET_YES == ccc->client_ready) &&
1143        ( (GNUNET_YES == ch->out_of_order) ||
1144          (msg->mid.mid == ch->mid_recv.mid) ) )
1145   {
1146     LOG (GNUNET_ERROR_TYPE_DEBUG,
1147          "Giving %u bytes of payload with MID %u from %s to client %s\n",
1148          (unsigned int) payload_size,
1149          ntohl (msg->mid.mid),
1150          GCCH_2s (ch),
1151          GSC_2s (ccc->c));
1152     ccc->client_ready = GNUNET_NO;
1153     GSC_send_to_client (ccc->c,
1154                         env);
1155     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1156     ch->mid_futures >>= 1;
1157     if (GNUNET_YES == ch->reliable)
1158       send_channel_data_ack (ch);
1159     return;
1160   }
1161
1162   /* check if message ought to be dropped because it is anicent/too distant/duplicate */
1163   mid_min = ntohl (ch->mid_recv.mid);
1164   mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
1165   mid_msg = ntohl (msg->mid.mid);
1166   if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
1167        ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
1168   {
1169     LOG (GNUNET_ERROR_TYPE_DEBUG,
1170          "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
1171          (unsigned int) payload_size,
1172          GCCH_2s (ch),
1173          ntohl (msg->mid.mid));
1174     GNUNET_STATISTICS_update (stats,
1175                               "# duplicate DATA (ancient or future)",
1176                               1,
1177                               GNUNET_NO);
1178     GNUNET_MQ_discard (env);
1179     return;
1180   }
1181
1182   /* Insert message into sorted out-of-order queue */
1183   com = GNUNET_new (struct CadetOutOfOrderMessage);
1184   com->mid = msg->mid;
1185   com->env = env;
1186   duplicate = GNUNET_NO;
1187   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1188                                       is_before,
1189                                       &duplicate,
1190                                       ccc->head_recv,
1191                                       ccc->tail_recv,
1192                                       com);
1193   if (GNUNET_YES == duplicate)
1194   {
1195     /* Duplicate within the queue, drop also */
1196     LOG (GNUNET_ERROR_TYPE_DEBUG,
1197          "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1198          (unsigned int) payload_size,
1199          GCCH_2s (ch),
1200          ntohl (msg->mid.mid));
1201     GNUNET_STATISTICS_update (stats,
1202                               "# duplicate DATA",
1203                               1,
1204                               GNUNET_NO);
1205     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1206                                  ccc->tail_recv,
1207                                  com);
1208     GNUNET_MQ_discard (com->env);
1209     GNUNET_free (com);
1210     if (GNUNET_YES == ch->reliable)
1211       send_channel_data_ack (ch);
1212     return;
1213   }
1214   LOG (GNUNET_ERROR_TYPE_DEBUG,
1215        "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1216        (GNUNET_YES == ccc->client_ready)
1217        ? "out-of-order"
1218        : "client-not-ready",
1219        (unsigned int) payload_size,
1220        GCCH_2s (ch),
1221        ntohl (ccc->ccn.channel_of_client),
1222        ccc,
1223        ntohl (msg->mid.mid),
1224        ntohl (ch->mid_recv.mid));
1225   send_channel_data_ack (ch);
1226 }
1227
1228
1229 /**
1230  * Function called once the tunnel has sent one of our messages.
1231  * If the message is unreliable, simply frees the `crm`. If the
1232  * message was reliable, calculate retransmission time and
1233  * wait for ACK (or retransmit).
1234  *
1235  * @param cls the `struct CadetReliableMessage` that was sent
1236  */
1237 static void
1238 data_sent_cb (void *cls);
1239
1240
1241 /**
1242  * We need to retry a transmission, the last one took too long to
1243  * be acknowledged.
1244  *
1245  * @param cls the `struct CadetChannel` where we need to retransmit
1246  */
1247 static void
1248 retry_transmission (void *cls)
1249 {
1250   struct CadetChannel *ch = cls;
1251   struct CadetReliableMessage *crm = ch->head_sent;
1252
1253   ch->retry_data_task = NULL;
1254   GNUNET_assert (NULL == crm->qe);
1255   LOG (GNUNET_ERROR_TYPE_DEBUG,
1256        "Retrying transmission on %s of message %u\n",
1257        GCCH_2s (ch),
1258        (unsigned int) ntohl (crm->data_message->mid.mid));
1259   crm->qe = GCT_send (ch->t,
1260                       &crm->data_message->header,
1261                       &data_sent_cb,
1262                       crm);
1263   GNUNET_assert (NULL == ch->retry_data_task);
1264 }
1265
1266
1267 /**
1268  * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1269  * the queue and tell our client that it can send more.
1270  *
1271  * @param ch the channel that got the PLAINTEXT_DATA_ACK
1272  * @param crm the message that got acknowledged
1273  */
1274 static void
1275 handle_matching_ack (struct CadetChannel *ch,
1276                      struct CadetReliableMessage *crm)
1277 {
1278   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1279                                ch->tail_sent,
1280                                crm);
1281   ch->pending_messages--;
1282   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1283   LOG (GNUNET_ERROR_TYPE_DEBUG,
1284        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1285        GCCH_2s (ch),
1286        (unsigned int) ntohl (crm->data_message->mid.mid),
1287        ch->pending_messages);
1288   GNUNET_free (crm->data_message);
1289   GNUNET_free (crm);
1290   send_ack_to_client (ch,
1291                       (NULL == ch->owner)
1292                       ? GNUNET_NO
1293                       : GNUNET_YES);
1294 }
1295
1296
1297 /**
1298  * We got an acknowledgement for payload data for a channel.
1299  * Possibly resume transmissions.
1300  *
1301  * @param ch channel that got the ack
1302  * @param ack details about what was received
1303  */
1304 void
1305 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1306                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1307 {
1308   struct CadetReliableMessage *crm;
1309   struct CadetReliableMessage *crmn;
1310   int found;
1311   uint32_t mid_base;
1312   uint64_t mid_mask;
1313   unsigned int delta;
1314
1315   GNUNET_break (GNUNET_NO == ch->is_loopback);
1316   if (GNUNET_NO == ch->reliable)
1317   {
1318     /* not expecting ACKs on unreliable channel, odd */
1319     GNUNET_break_op (0);
1320     return;
1321   }
1322   mid_base = ntohl (ack->mid.mid);
1323   mid_mask = GNUNET_htonll (ack->futures);
1324   found = GNUNET_NO;
1325   for (crm = ch->head_sent;
1326         NULL != crm;
1327        crm = crmn)
1328   {
1329     crmn = crm->next;
1330     if (ack->mid.mid == crm->data_message->mid.mid)
1331     {
1332       handle_matching_ack (ch,
1333                            crm);
1334       continue;
1335     }
1336     delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1;
1337     if (delta >= 64)
1338       continue;
1339     if (0 != (mid_mask & (1LLU << delta)))
1340       handle_matching_ack (ch,
1341                            crm);
1342   }
1343   if (GNUNET_NO == found)
1344   {
1345     /* ACK for message we already dropped, might have been a
1346        duplicate ACK? Ignore. */
1347     LOG (GNUNET_ERROR_TYPE_DEBUG,
1348          "Duplicate DATA_ACK on %s, ignoring\n",
1349          GCCH_2s (ch));
1350     GNUNET_STATISTICS_update (stats,
1351                               "# duplicate DATA_ACKs",
1352                               1,
1353                               GNUNET_NO);
1354     return;
1355   }
1356   if (NULL != ch->retry_data_task)
1357   {
1358     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1359     ch->retry_data_task = NULL;
1360   }
1361   if (NULL != ch->head_sent)
1362     ch->retry_data_task
1363       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1364                                  &retry_transmission,
1365                                  ch);
1366 }
1367
1368
1369 /**
1370  * Destroy channel, based on the other peer closing the
1371  * connection.  Also needs to remove this channel from
1372  * the tunnel.
1373  *
1374  * @param ch channel to destroy
1375  */
1376 void
1377 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1378 {
1379   struct CadetChannelClient *ccc;
1380
1381   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1382   LOG (GNUNET_ERROR_TYPE_DEBUG,
1383        "Received remote channel DESTROY for %s\n",
1384        GCCH_2s (ch));
1385   if (GNUNET_YES == ch->destroy)
1386   {
1387     /* Local client already gone, this is instant-death. */
1388     channel_destroy (ch);
1389     return;
1390   }
1391   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1392   if (NULL != ccc->head_recv)
1393   {
1394     LOG (GNUNET_ERROR_TYPE_WARNING,
1395          "Lost end of transmission due to remote shutdown on %s\n",
1396          GCCH_2s (ch));
1397     /* FIXME: change API to notify client about truncated transmission! */
1398   }
1399   ch->destroy = GNUNET_YES;
1400   GSC_handle_remote_channel_destroy (ccc->c,
1401                                      ccc->ccn,
1402                                      ch);
1403   channel_destroy (ch);
1404 }
1405
1406
1407 /**
1408  * Test if element @a e1 comes before element @a e2.
1409  *
1410  * @param cls closure, to a flag where we indicate duplicate packets
1411  * @param crm1 an element of to sort
1412  * @param crm2 another element to sort
1413  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1414  */
1415 static int
1416 cmp_crm_by_next_retry (void *cls,
1417                        struct CadetReliableMessage *crm1,
1418                        struct CadetReliableMessage *crm2)
1419 {
1420   if (crm1->next_retry.abs_value_us <
1421       crm2->next_retry.abs_value_us)
1422     return GNUNET_YES;
1423   return GNUNET_NO;
1424 }
1425
1426 /**
1427  * Function called once the tunnel has sent one of our messages.
1428  * If the message is unreliable, simply frees the `crm`. If the
1429  * message was reliable, calculate retransmission time and
1430  * wait for ACK (or retransmit).
1431  *
1432  * @param cls the `struct CadetReliableMessage` that was sent
1433  */
1434 static void
1435 data_sent_cb (void *cls)
1436 {
1437   struct CadetReliableMessage *crm = cls;
1438   struct CadetChannel *ch = crm->ch;
1439
1440   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1441   GNUNET_assert (NULL != crm->qe);
1442   crm->qe = NULL;
1443   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1444                                ch->tail_sent,
1445                                crm);
1446   if (GNUNET_NO == ch->reliable)
1447   {
1448     GNUNET_free (crm->data_message);
1449     GNUNET_free (crm);
1450     ch->pending_messages--;
1451     send_ack_to_client (ch,
1452                         (NULL == ch->owner)
1453                         ? GNUNET_NO
1454                         : GNUNET_YES);
1455     return;
1456   }
1457   if (0 == crm->retry_delay.rel_value_us)
1458     crm->retry_delay = ch->expected_delay;
1459   else
1460     crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1461   crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1462                                                MIN_RTT_DELAY);
1463   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1464
1465   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1466                                       cmp_crm_by_next_retry,
1467                                       NULL,
1468                                       ch->head_sent,
1469                                       ch->tail_sent,
1470                                       crm);
1471   LOG (GNUNET_ERROR_TYPE_DEBUG,
1472        "Message %u sent, next transmission on %s in %s\n",
1473        (unsigned int) ntohl (crm->data_message->mid.mid),
1474        GCCH_2s (ch),
1475        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1476                                                GNUNET_YES));
1477   if (crm == ch->head_sent)
1478   {
1479     /* We are the new head, need to reschedule retry task */
1480     if (NULL != ch->retry_data_task)
1481       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1482     ch->retry_data_task
1483       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1484                                  &retry_transmission,
1485                                  ch);
1486   }
1487 }
1488
1489
1490 /**
1491  * Handle data given by a client.
1492  *
1493  * Check whether the client is allowed to send in this tunnel, save if
1494  * channel is reliable and send an ACK to the client if there is still
1495  * buffer space in the tunnel.
1496  *
1497  * @param ch Channel.
1498  * @param sender_ccn ccn of the sender
1499  * @param buf payload to transmit.
1500  * @param buf_len number of bytes in @a buf
1501  * @return #GNUNET_OK if everything goes well,
1502  *         #GNUNET_SYSERR in case of an error.
1503  */
1504 int
1505 GCCH_handle_local_data (struct CadetChannel *ch,
1506                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1507                         const char *buf,
1508                         size_t buf_len)
1509 {
1510   struct CadetReliableMessage *crm;
1511
1512   if (ch->pending_messages > ch->max_pending_messages)
1513   {
1514     GNUNET_break (0);
1515     return GNUNET_SYSERR;
1516   }
1517   ch->pending_messages++;
1518
1519   if (GNUNET_YES == ch->is_loopback)
1520   {
1521     struct CadetChannelClient *receiver;
1522     struct GNUNET_MQ_Envelope *env;
1523     struct GNUNET_CADET_LocalData *ld;
1524     int to_owner;
1525
1526     env = GNUNET_MQ_msg_extra (ld,
1527                                buf_len,
1528                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1529     if (sender_ccn.channel_of_client ==
1530         ch->owner->ccn.channel_of_client)
1531     {
1532       receiver = ch->dest;
1533       to_owner = GNUNET_NO;
1534     }
1535     else
1536     {
1537       GNUNET_assert (sender_ccn.channel_of_client ==
1538                      ch->dest->ccn.channel_of_client);
1539       receiver = ch->owner;
1540       to_owner = GNUNET_YES;
1541     }
1542     ld->ccn = receiver->ccn;
1543     GNUNET_memcpy (&ld[1],
1544                    buf,
1545                    buf_len);
1546     if (GNUNET_YES == receiver->client_ready)
1547     {
1548       GSC_send_to_client (receiver->c,
1549                           env);
1550       send_ack_to_client (ch,
1551                           to_owner);
1552     }
1553     else
1554     {
1555       struct CadetOutOfOrderMessage *oom;
1556
1557       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1558       oom->env = env;
1559       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1560                                         receiver->tail_recv,
1561                                         oom);
1562     }
1563     return GNUNET_OK;
1564   }
1565
1566   /* Everything is correct, send the message. */
1567   crm = GNUNET_malloc (sizeof (*crm));
1568   crm->ch = ch;
1569   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1570                                      + buf_len);
1571   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1572   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1573   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1574   crm->data_message->mid = ch->mid_send;
1575   crm->data_message->ctn = ch->ctn;
1576   GNUNET_memcpy (&crm->data_message[1],
1577                  buf,
1578                  buf_len);
1579   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1580                                ch->tail_sent,
1581                                crm);
1582   LOG (GNUNET_ERROR_TYPE_DEBUG,
1583        "Sending %u bytes from local client to %s with MID %u\n",
1584        buf_len,
1585        GCCH_2s (ch),
1586        ntohl (crm->data_message->mid.mid));
1587   if (NULL != ch->retry_data_task)
1588   {
1589     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1590     ch->retry_data_task = NULL;
1591   }
1592   crm->qe = GCT_send (ch->t,
1593                       &crm->data_message->header,
1594                       &data_sent_cb,
1595                       crm);
1596   GNUNET_assert (NULL == ch->retry_data_task);
1597   return GNUNET_OK;
1598 }
1599
1600
1601 /**
1602  * Handle ACK from client on local channel.  Means the client is ready
1603  * for more data, see if we have any for it.
1604  *
1605  * @param ch channel to destroy
1606  * @param client_ccn ccn of the client sending the ack
1607  */
1608 void
1609 GCCH_handle_local_ack (struct CadetChannel *ch,
1610                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1611 {
1612   struct CadetChannelClient *ccc;
1613   struct CadetOutOfOrderMessage *com;
1614
1615   if ( (NULL != ch->owner) &&
1616        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1617     ccc = ch->owner;
1618   else if ( (NULL != ch->dest) &&
1619             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1620     ccc = ch->dest;
1621   else
1622     GNUNET_assert (0);
1623   ccc->client_ready = GNUNET_YES;
1624   com = ccc->head_recv;
1625   if (NULL == com)
1626   {
1627     LOG (GNUNET_ERROR_TYPE_DEBUG,
1628          "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1629          GSC_2s (ccc->c),
1630          ntohl (client_ccn.channel_of_client),
1631          GCCH_2s (ch),
1632          ntohl (ccc->ccn.channel_of_client),
1633          ccc);
1634     return; /* none pending */
1635   }
1636   if (GNUNET_YES == ch->is_loopback)
1637   {
1638     int to_owner;
1639
1640     /* Messages are always in-order, just send */
1641     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1642                                  ccc->tail_recv,
1643                                  com);
1644     GSC_send_to_client (ccc->c,
1645                         com->env);
1646     /* Notify sender that we can receive more */
1647     if (ccc->ccn.channel_of_client ==
1648         ch->owner->ccn.channel_of_client)
1649     {
1650       to_owner = GNUNET_NO;
1651     }
1652     else
1653     {
1654       GNUNET_assert (ccc->ccn.channel_of_client ==
1655                      ch->dest->ccn.channel_of_client);
1656       to_owner = GNUNET_YES;
1657     }
1658     send_ack_to_client (ch,
1659                         to_owner);
1660     GNUNET_free (com);
1661     return;
1662   }
1663
1664   if ( (com->mid.mid != ch->mid_recv.mid) &&
1665        (GNUNET_NO == ch->out_of_order) )
1666   {
1667     LOG (GNUNET_ERROR_TYPE_DEBUG,
1668          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1669          GSC_2s (ccc->c),
1670          ntohl (ccc->ccn.channel_of_client),
1671          ntohl (com->mid.mid),
1672          ntohl (ch->mid_recv.mid));
1673     return; /* missing next one in-order */
1674   }
1675
1676   LOG (GNUNET_ERROR_TYPE_DEBUG,
1677        "Got LOCAL ACK, passing payload message %u to %s-%X on %s\n",
1678        ntohl (com->mid.mid),
1679        GSC_2s (ccc->c),
1680        ntohl (ccc->ccn.channel_of_client),
1681        GCCH_2s (ch));
1682
1683   /* all good, pass next message to client */
1684   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1685                                ccc->tail_recv,
1686                                com);
1687   /* FIXME: if unreliable, this is not aggressive
1688      enough, as it would be OK to have lost some! */
1689   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1690   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1691   ccc->client_ready = GNUNET_NO;
1692   GSC_send_to_client (ccc->c,
1693                       com->env);
1694   GNUNET_free (com);
1695   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1696        (GNUNET_YES == ch->reliable) )
1697   {
1698     /* The next 15 messages were also already received (0xFF), this
1699        suggests that the sender may be blocked on flow control
1700        urgently waiting for an ACK from us. (As we have an inherent
1701        maximum of 64 bits, and 15 is getting too close for comfort.)
1702        So we should send one now. */
1703     LOG (GNUNET_ERROR_TYPE_DEBUG,
1704          "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1705          GCCH_2s (ch));
1706     if (GNUNET_YES == ch->reliable)
1707       send_channel_data_ack (ch);
1708   }
1709
1710   if (NULL != ccc->head_recv)
1711     return;
1712   if (GNUNET_NO == ch->destroy)
1713     return;
1714   GCT_send_channel_destroy (ch->t,
1715                             ch->ctn);
1716   channel_destroy (ch);
1717 }
1718
1719
1720 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1721
1722
1723 /**
1724  * Log channel info.
1725  *
1726  * @param ch Channel.
1727  * @param level Debug level to use.
1728  */
1729 void
1730 GCCH_debug (struct CadetChannel *ch,
1731             enum GNUNET_ErrorType level)
1732 {
1733   int do_log;
1734
1735   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1736                                        "cadet-chn",
1737                                        __FILE__, __FUNCTION__, __LINE__);
1738   if (0 == do_log)
1739     return;
1740
1741   if (NULL == ch)
1742   {
1743     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1744     return;
1745   }
1746   LOG2 (level,
1747         "CHN %s:%X (%p)\n",
1748         GCT_2s (ch->t),
1749         ch->ctn,
1750         ch);
1751   if (NULL != ch->owner)
1752   {
1753     LOG2 (level,
1754           "CHN origin %s ready %s local-id: %u\n",
1755           GSC_2s (ch->owner->c),
1756           ch->owner->client_ready ? "YES" : "NO",
1757           ntohl (ch->owner->ccn.channel_of_client));
1758   }
1759   if (NULL != ch->dest)
1760   {
1761     LOG2 (level,
1762           "CHN destination %s ready %s local-id: %u\n",
1763           GSC_2s (ch->dest->c),
1764           ch->dest->client_ready ? "YES" : "NO",
1765           ntohl (ch->dest->ccn.channel_of_client));
1766   }
1767   LOG2 (level,
1768         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1769         ntohl (ch->mid_recv.mid),
1770         (unsigned long long) ch->mid_futures,
1771         ntohl (ch->mid_send.mid));
1772 }
1773
1774
1775
1776 /* end of gnunet-service-cadet-new_channel.c */