typo in comment
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - Optimize ACKs by using 'mid_futures' properly!
29  * - introduce shutdown so we can have half-closed channels, modify
30  *   destroy to include MID to have FIN-ACK equivalents, etc.
31  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
32  * - check that '0xFFULL' really is sufficient for flow control!
33  * - revisit handling of 'unreliable' traffic!
34  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
35  * - figure out flow control without ACKs (unreliable traffic!)
36  */
37 #include "platform.h"
38 #include "gnunet_util_lib.h"
39 #include "cadet.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet-service-cadet-new.h"
42 #include "gnunet-service-cadet-new_channel.h"
43 #include "gnunet-service-cadet-new_connection.h"
44 #include "gnunet-service-cadet-new_tunnels.h"
45 #include "gnunet-service-cadet-new_peer.h"
46 #include "gnunet-service-cadet-new_paths.h"
47
48 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
49
50 /**
51  * How long do we initially wait before retransmitting?
52  */
53 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
54
55 /**
56  * How long do we wait before dropping state about incoming
57  * connection to closed port?
58  */
59 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
60
61 /**
62  * How long do we wait at least before retransmitting ever?
63  */
64 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
65
66 /**
67  * Maximum message ID into the future we accept for out-of-order messages.
68  * If the message is more than this into the future, we drop it.  This is
69  * important both to detect values that are actually in the past, as well
70  * as to limit adversarially triggerable memory consumption.
71  *
72  * Note that right now we have "max_pending_messages = 4" hard-coded in
73  * the logic below, so a value of 4 would suffice here. But we plan to
74  * allow larger windows in the future...
75  */
76 #define MAX_OUT_OF_ORDER_DISTANCE 1024
77
78
79 /**
80  * All the states a connection can be in.
81  */
82 enum CadetChannelState
83 {
84   /**
85    * Uninitialized status, should never appear in operation.
86    */
87   CADET_CHANNEL_NEW,
88
89   /**
90    * Connection create message sent, waiting for ACK.
91    */
92   CADET_CHANNEL_OPEN_SENT,
93
94   /**
95    * Connection confirmed, ready to carry traffic.
96    */
97   CADET_CHANNEL_READY
98 };
99
100
101 /**
102  * Info needed to retry a message in case it gets lost.
103  * Note that we DO use this structure also for unreliable
104  * messages.
105  */
106 struct CadetReliableMessage
107 {
108   /**
109    * Double linked list, FIFO style
110    */
111   struct CadetReliableMessage *next;
112
113   /**
114    * Double linked list, FIFO style
115    */
116   struct CadetReliableMessage *prev;
117
118   /**
119    * Which channel is this message in?
120    */
121   struct CadetChannel *ch;
122
123   /**
124    * Entry in the tunnels queue for this message, NULL if it has left
125    * the tunnel.  Used to cancel transmission in case we receive an
126    * ACK in time.
127    */
128   struct CadetTunnelQueueEntry *qe;
129
130   /**
131    * How soon should we retry if we fail to get an ACK?
132    * Messages in the queue are sorted by this value.
133    */
134   struct GNUNET_TIME_Absolute next_retry;
135
136   /**
137    * How long do we wait for an ACK after transmission?
138    * Use for the back-off calculation.
139    */
140   struct GNUNET_TIME_Relative retry_delay;
141
142   /**
143    * Data message we are trying to send.
144    */
145   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
146
147 };
148
149
150 /**
151  * List of received out-of-order data messages.
152  */
153 struct CadetOutOfOrderMessage
154 {
155   /**
156    * Double linked list, FIFO style
157    */
158   struct CadetOutOfOrderMessage *next;
159
160   /**
161    * Double linked list, FIFO style
162    */
163   struct CadetOutOfOrderMessage *prev;
164
165   /**
166    * ID of the message (messages up to this point needed
167    * before we give this one to the client).
168    */
169   struct ChannelMessageIdentifier mid;
170
171   /**
172    * The envelope with the payload of the out-of-order message
173    */
174   struct GNUNET_MQ_Envelope *env;
175
176 };
177
178
179 /**
180  * Client endpoint of a `struct CadetChannel`.  A channel may be a
181  * loopback channel, in which case it has two of these endpoints.
182  * Note that flow control also is required in both directions.
183  */
184 struct CadetChannelClient
185 {
186   /**
187    * Client handle.  Not by itself sufficient to designate
188    * the client endpoint, as the same client handle may
189    * be used for both the owner and the destination, and
190    * we thus also need the channel ID to identify the client.
191    */
192   struct CadetClient *c;
193
194   /**
195    * Head of DLL of messages received out of order or while client was unready.
196    */
197   struct CadetOutOfOrderMessage *head_recv;
198
199   /**
200    * Tail DLL of messages received out of order or while client was unready.
201    */
202   struct CadetOutOfOrderMessage *tail_recv;
203
204   /**
205    * Local tunnel number for this client.
206    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
207    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
208    */
209   struct GNUNET_CADET_ClientChannelNumber ccn;
210
211   /**
212    * Can we send data to the client?
213    */
214   int client_ready;
215
216 };
217
218
219 /**
220  * Struct containing all information regarding a channel to a remote client.
221  */
222 struct CadetChannel
223 {
224   /**
225    * Tunnel this channel is in.
226    */
227   struct CadetTunnel *t;
228
229   /**
230    * Client owner of the tunnel, if any.
231    * (Used if this channel represends the initiating end of the tunnel.)
232    */
233   struct CadetChannelClient *owner;
234
235   /**
236    * Client destination of the tunnel, if any.
237    * (Used if this channel represents the listening end of the tunnel.)
238    */
239   struct CadetChannelClient *dest;
240
241   /**
242    * Last entry in the tunnel's queue relating to control messages
243    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
244    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
245    * transmission in case we receive updated information.
246    */
247   struct CadetTunnelQueueEntry *last_control_qe;
248
249   /**
250    * Head of DLL of messages sent and not yet ACK'd.
251    */
252   struct CadetReliableMessage *head_sent;
253
254   /**
255    * Tail of DLL of messages sent and not yet ACK'd.
256    */
257   struct CadetReliableMessage *tail_sent;
258
259   /**
260    * Task to resend/poll in case no ACK is received.
261    */
262   struct GNUNET_SCHEDULER_Task *retry_control_task;
263
264   /**
265    * Task to resend/poll in case no ACK is received.
266    */
267   struct GNUNET_SCHEDULER_Task *retry_data_task;
268
269   /**
270    * Last time the channel was used
271    */
272   struct GNUNET_TIME_Absolute timestamp;
273
274   /**
275    * Destination port of the channel.
276    */
277   struct GNUNET_HashCode port;
278
279   /**
280    * Counter for exponential backoff.
281    */
282   struct GNUNET_TIME_Relative retry_time;
283
284   /**
285    * How long does it usually take to get an ACK.
286    */
287   struct GNUNET_TIME_Relative expected_delay;
288
289   /**
290    * Bitfield of already-received messages past @e mid_recv.
291    *
292    * FIXME: not yet properly used (bits here are never set!)
293    */
294   uint64_t mid_futures;
295
296   /**
297    * Next MID expected for incoming traffic.
298    */
299   struct ChannelMessageIdentifier mid_recv;
300
301   /**
302    * Next MID to use for outgoing traffic.
303    */
304   struct ChannelMessageIdentifier mid_send;
305
306   /**
307    * Total (reliable) messages pending ACK for this channel.
308    */
309   unsigned int pending_messages;
310
311   /**
312    * Maximum (reliable) messages pending ACK for this channel
313    * before we throttle the client.
314    */
315   unsigned int max_pending_messages;
316
317   /**
318    * Number identifying this channel in its tunnel.
319    */
320   struct GNUNET_CADET_ChannelTunnelNumber ctn;
321
322   /**
323    * Channel state.
324    */
325   enum CadetChannelState state;
326
327   /**
328    * Is the tunnel bufferless (minimum latency)?
329    */
330   int nobuffer;
331
332   /**
333    * Is the tunnel reliable?
334    */
335   int reliable;
336
337   /**
338    * Is the tunnel out-of-order?
339    */
340   int out_of_order;
341
342   /**
343    * Is this channel a loopback channel, where the destination is us again?
344    */
345   int is_loopback;
346
347   /**
348    * Flag to signal the destruction of the channel.  If this is set to
349    * #GNUNET_YES the channel will be destroyed once the queue is
350    * empty.
351    */
352   int destroy;
353
354 };
355
356
357 /**
358  * Get the static string for identification of the channel.
359  *
360  * @param ch Channel.
361  *
362  * @return Static string with the channel IDs.
363  */
364 const char *
365 GCCH_2s (const struct CadetChannel *ch)
366 {
367   static char buf[128];
368
369   GNUNET_snprintf (buf,
370                    sizeof (buf),
371                    "Channel %s:%s ctn:%X(%X/%X)",
372                    (GNUNET_YES == ch->is_loopback)
373                    ? "loopback"
374                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
375                    GNUNET_h2s (&ch->port),
376                    ch->ctn,
377                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
378                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
379   return buf;
380 }
381
382
383 /**
384  * Get the channel's public ID.
385  *
386  * @param ch Channel.
387  *
388  * @return ID used to identify the channel with the remote peer.
389  */
390 struct GNUNET_CADET_ChannelTunnelNumber
391 GCCH_get_id (const struct CadetChannel *ch)
392 {
393   return ch->ctn;
394 }
395
396
397 /**
398  * Release memory associated with @a ccc
399  *
400  * @param ccc data structure to clean up
401  */
402 static void
403 free_channel_client (struct CadetChannelClient *ccc)
404 {
405   struct CadetOutOfOrderMessage *com;
406
407   while (NULL != (com = ccc->head_recv))
408   {
409     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
410                                  ccc->tail_recv,
411                                  com);
412     GNUNET_MQ_discard (com->env);
413     GNUNET_free (com);
414   }
415   GNUNET_free (ccc);
416 }
417
418
419 /**
420  * Destroy the given channel.
421  *
422  * @param ch channel to destroy
423  */
424 static void
425 channel_destroy (struct CadetChannel *ch)
426 {
427   struct CadetReliableMessage *crm;
428
429   while (NULL != (crm = ch->head_sent))
430   {
431     GNUNET_assert (ch == crm->ch);
432     if (NULL != crm->qe)
433     {
434       GCT_send_cancel (crm->qe);
435       crm->qe = NULL;
436     }
437     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
438                                  ch->tail_sent,
439                                  crm);
440     GNUNET_free (crm->data_message);
441     GNUNET_free (crm);
442   }
443   if (NULL != ch->owner)
444   {
445     free_channel_client (ch->owner);
446     ch->owner = NULL;
447   }
448   if (NULL != ch->dest)
449   {
450     free_channel_client (ch->dest);
451     ch->dest = NULL;
452   }
453   if (NULL != ch->last_control_qe)
454   {
455     GCT_send_cancel (ch->last_control_qe);
456     ch->last_control_qe = NULL;
457   }
458   if (NULL != ch->retry_data_task)
459   {
460     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
461     ch->retry_data_task = NULL;
462   }
463   if (NULL != ch->retry_control_task)
464   {
465     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
466     ch->retry_control_task = NULL;
467   }
468   if (GNUNET_NO == ch->is_loopback)
469   {
470     GCT_remove_channel (ch->t,
471                         ch,
472                         ch->ctn);
473     ch->t = NULL;
474   }
475   GNUNET_free (ch);
476 }
477
478
479 /**
480  * Send a channel create message.
481  *
482  * @param cls Channel for which to send.
483  */
484 static void
485 send_channel_open (void *cls);
486
487
488 /**
489  * Function called once the tunnel confirms that we sent the
490  * create message.  Delays for a bit until we retry.
491  *
492  * @param cls our `struct CadetChannel`.
493  */
494 static void
495 channel_open_sent_cb (void *cls)
496 {
497   struct CadetChannel *ch = cls;
498
499   GNUNET_assert (NULL != ch->last_control_qe);
500   ch->last_control_qe = NULL;
501   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
502   LOG (GNUNET_ERROR_TYPE_DEBUG,
503        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
504        GCCH_2s (ch),
505        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
506                                                GNUNET_YES));
507   ch->retry_control_task
508     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
509                                     &send_channel_open,
510                                     ch);
511 }
512
513
514 /**
515  * Send a channel open message.
516  *
517  * @param cls Channel for which to send.
518  */
519 static void
520 send_channel_open (void *cls)
521 {
522   struct CadetChannel *ch = cls;
523   struct GNUNET_CADET_ChannelOpenMessage msgcc;
524   uint32_t options;
525
526   ch->retry_control_task = NULL;
527   LOG (GNUNET_ERROR_TYPE_DEBUG,
528        "Sending CHANNEL_OPEN message for %s\n",
529        GCCH_2s (ch));
530   options = 0;
531   if (ch->nobuffer)
532     options |= GNUNET_CADET_OPTION_NOBUFFER;
533   if (ch->reliable)
534     options |= GNUNET_CADET_OPTION_RELIABLE;
535   if (ch->out_of_order)
536     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
537   msgcc.header.size = htons (sizeof (msgcc));
538   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
539   msgcc.opt = htonl (options);
540   msgcc.port = ch->port;
541   msgcc.ctn = ch->ctn;
542   ch->state = CADET_CHANNEL_OPEN_SENT;
543   ch->last_control_qe = GCT_send (ch->t,
544                                   &msgcc.header,
545                                   &channel_open_sent_cb,
546                                   ch);
547   GNUNET_assert (NULL == ch->retry_control_task);
548 }
549
550
551 /**
552  * Function called once and only once after a channel was bound
553  * to its tunnel via #GCT_add_channel() is ready for transmission.
554  * Note that this is only the case for channels that this peer
555  * initiates, as for incoming channels we assume that they are
556  * ready for transmission immediately upon receiving the open
557  * message.  Used to bootstrap the #GCT_send() process.
558  *
559  * @param ch the channel for which the tunnel is now ready
560  */
561 void
562 GCCH_tunnel_up (struct CadetChannel *ch)
563 {
564   GNUNET_assert (NULL == ch->retry_control_task);
565   LOG (GNUNET_ERROR_TYPE_DEBUG,
566        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
567        GCCH_2s (ch));
568   ch->retry_control_task
569     = GNUNET_SCHEDULER_add_now (&send_channel_open,
570                                 ch);
571 }
572
573
574 /**
575  * Create a new channel.
576  *
577  * @param owner local client owning the channel
578  * @param ccn local number of this channel at the @a owner
579  * @param destination peer to which we should build the channel
580  * @param port desired port at @a destination
581  * @param options options for the channel
582  * @return handle to the new channel
583  */
584 struct CadetChannel *
585 GCCH_channel_local_new (struct CadetClient *owner,
586                         struct GNUNET_CADET_ClientChannelNumber ccn,
587                         struct CadetPeer *destination,
588                         const struct GNUNET_HashCode *port,
589                         uint32_t options)
590 {
591   struct CadetChannel *ch;
592   struct CadetChannelClient *ccco;
593
594   ccco = GNUNET_new (struct CadetChannelClient);
595   ccco->c = owner;
596   ccco->ccn = ccn;
597   ccco->client_ready = GNUNET_YES;
598
599   ch = GNUNET_new (struct CadetChannel);
600   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
601   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
602   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
603   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
604   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
605   ch->owner = ccco;
606   ch->port = *port;
607   if (0 == memcmp (&my_full_id,
608                    GCP_get_id (destination),
609                    sizeof (struct GNUNET_PeerIdentity)))
610   {
611     struct CadetClient *c;
612
613     ch->is_loopback = GNUNET_YES;
614     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
615                                            port);
616     if (NULL == c)
617     {
618       /* port closed, wait for it to possibly open */
619       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
620                                                 port,
621                                                 ch,
622                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
623       LOG (GNUNET_ERROR_TYPE_DEBUG,
624            "Created loose incoming loopback channel to port %s\n",
625            GNUNET_h2s (&ch->port));
626     }
627     else
628     {
629       ch->dest = GNUNET_new (struct CadetChannelClient);
630       ch->dest->c = c;
631       ch->dest->client_ready = GNUNET_YES;
632       GCCH_bind (ch,
633                  ch->dest->c);
634     }
635   }
636   else
637   {
638     ch->t = GCP_get_tunnel (destination,
639                             GNUNET_YES);
640     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
641     ch->ctn = GCT_add_channel (ch->t,
642                                ch);
643   }
644   GNUNET_STATISTICS_update (stats,
645                             "# channels",
646                             1,
647                             GNUNET_NO);
648   LOG (GNUNET_ERROR_TYPE_DEBUG,
649        "Created channel to port %s at peer %s for %s using %s\n",
650        GNUNET_h2s (port),
651        GCP_2s (destination),
652        GSC_2s (owner),
653        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
654   return ch;
655 }
656
657
658 /**
659  * We had an incoming channel to a port that is closed.
660  * It has not been opened for a while, drop it.
661  *
662  * @param cls the channel to drop
663  */
664 static void
665 timeout_closed_cb (void *cls)
666 {
667   struct CadetChannel *ch = cls;
668
669   ch->retry_control_task = NULL;
670   LOG (GNUNET_ERROR_TYPE_DEBUG,
671        "Closing incoming channel to port %s from peer %s due to timeout\n",
672        GNUNET_h2s (&ch->port),
673        GCP_2s (GCT_get_destination (ch->t)));
674   channel_destroy (ch);
675 }
676
677
678 /**
679  * Create a new channel based on a request coming in over the network.
680  *
681  * @param t tunnel to the remote peer
682  * @param ctn identifier of this channel in the tunnel
683  * @param port desired local port
684  * @param options options for the channel
685  * @return handle to the new channel
686  */
687 struct CadetChannel *
688 GCCH_channel_incoming_new (struct CadetTunnel *t,
689                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
690                            const struct GNUNET_HashCode *port,
691                            uint32_t options)
692 {
693   struct CadetChannel *ch;
694   struct CadetClient *c;
695
696   ch = GNUNET_new (struct CadetChannel);
697   ch->port = *port;
698   ch->t = t;
699   ch->ctn = ctn;
700   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
701   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
702   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
703   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
704   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
705   GNUNET_STATISTICS_update (stats,
706                             "# channels",
707                             1,
708                             GNUNET_NO);
709
710   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
711                                          port);
712   if (NULL == c)
713   {
714     /* port closed, wait for it to possibly open */
715     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
716                                               port,
717                                               ch,
718                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
719     ch->retry_control_task
720       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
721                                       &timeout_closed_cb,
722                                       ch);
723     LOG (GNUNET_ERROR_TYPE_DEBUG,
724          "Created loose incoming channel to port %s from peer %s\n",
725          GNUNET_h2s (&ch->port),
726          GCP_2s (GCT_get_destination (ch->t)));
727   }
728   else
729   {
730     GCCH_bind (ch,
731                c);
732   }
733   GNUNET_STATISTICS_update (stats,
734                             "# channels",
735                             1,
736                             GNUNET_NO);
737   return ch;
738 }
739
740
741 /**
742  * Function called once the tunnel confirms that we sent the
743  * ACK message.  Just remembers it was sent, we do not expect
744  * ACKs for ACKs ;-).
745  *
746  * @param cls our `struct CadetChannel`.
747  */
748 static void
749 send_ack_cb (void *cls)
750 {
751   struct CadetChannel *ch = cls;
752
753   GNUNET_assert (NULL != ch->last_control_qe);
754   ch->last_control_qe = NULL;
755 }
756
757
758 /**
759  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
760  *
761  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
762  */
763 static void
764 send_channel_data_ack (struct CadetChannel *ch)
765 {
766   struct GNUNET_CADET_ChannelDataAckMessage msg;
767
768   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
769   msg.header.size = htons (sizeof (msg));
770   msg.ctn = ch->ctn;
771   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
772   msg.futures = GNUNET_htonll (ch->mid_futures);
773   if (NULL != ch->last_control_qe)
774     GCT_send_cancel (ch->last_control_qe);
775   LOG (GNUNET_ERROR_TYPE_DEBUG,
776        "Sending DATA_ACK %u:%llX via %s\n",
777        (unsigned int) ntohl (msg.mid.mid),
778        (unsigned long long) ch->mid_futures,
779        GCCH_2s (ch));
780   ch->last_control_qe = GCT_send (ch->t,
781                                   &msg.header,
782                                   &send_ack_cb,
783                                   ch);
784 }
785
786
787 /**
788  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
789  * connection is up.
790  *
791  * @param cls the `struct CadetChannel`
792  */
793 static void
794 send_open_ack (void *cls)
795 {
796   struct CadetChannel *ch = cls;
797   struct GNUNET_CADET_ChannelManageMessage msg;
798
799   LOG (GNUNET_ERROR_TYPE_DEBUG,
800        "Sending CHANNEL_OPEN_ACK on %s\n",
801        GCCH_2s (ch));
802   ch->retry_control_task = NULL;
803   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
804   msg.header.size = htons (sizeof (msg));
805   msg.reserved = htonl (0);
806   msg.ctn = ch->ctn;
807   if (NULL != ch->last_control_qe)
808     GCT_send_cancel (ch->last_control_qe);
809   ch->last_control_qe = GCT_send (ch->t,
810                                   &msg.header,
811                                   &send_ack_cb,
812                                   ch);
813 }
814
815
816 /**
817  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
818  * this channel.  If the binding was successful, (re)transmit the
819  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
820  *
821  * @param ch channel that got the duplicate open
822  */
823 void
824 GCCH_handle_duplicate_open (struct CadetChannel *ch)
825 {
826   if (NULL == ch->dest)
827   {
828     LOG (GNUNET_ERROR_TYPE_DEBUG,
829          "Ignoring duplicate channel OPEN on %s: port is closed\n",
830          GCCH_2s (ch));
831     return;
832   }
833   if (NULL != ch->retry_control_task)
834   {
835     LOG (GNUNET_ERROR_TYPE_DEBUG,
836          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
837          GCCH_2s (ch));
838     return;
839   }
840   LOG (GNUNET_ERROR_TYPE_DEBUG,
841        "Retransmitting OPEN_ACK on %s\n",
842        GCCH_2s (ch));
843   ch->retry_control_task
844     = GNUNET_SCHEDULER_add_now (&send_open_ack,
845                                 ch);
846 }
847
848
849 /**
850  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
851  *
852  * @param ch channel the ack is for
853  * @param to_owner #GNUNET_YES to send to owner,
854  *                 #GNUNET_NO to send to dest
855  */
856 static void
857 send_ack_to_client (struct CadetChannel *ch,
858                     int to_owner)
859 {
860   struct GNUNET_MQ_Envelope *env;
861   struct GNUNET_CADET_LocalAck *ack;
862   struct CadetChannelClient *ccc;
863
864   env = GNUNET_MQ_msg (ack,
865                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
866   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
867   ack->ccn = ccc->ccn;
868   LOG (GNUNET_ERROR_TYPE_DEBUG,
869        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
870        GSC_2s (ccc->c),
871        (GNUNET_YES == to_owner) ? "owner" : "dest",
872        ntohl (ack->ccn.channel_of_client),
873        ch->pending_messages,
874        ch->max_pending_messages);
875   GSC_send_to_client (ccc->c,
876                       env);
877 }
878
879
880 /**
881  * A client is bound to the port that we have a channel
882  * open to.  Send the acknowledgement for the connection
883  * request and establish the link with the client.
884  *
885  * @param ch open incoming channel
886  * @param c client listening on the respective port
887  */
888 void
889 GCCH_bind (struct CadetChannel *ch,
890            struct CadetClient *c)
891 {
892   uint32_t options;
893   struct CadetChannelClient *cccd;
894
895   LOG (GNUNET_ERROR_TYPE_DEBUG,
896        "Binding %s from %s to port %s of %s\n",
897        GCCH_2s (ch),
898        GCT_2s (ch->t),
899        GNUNET_h2s (&ch->port),
900        GSC_2s (c));
901   if (NULL != ch->retry_control_task)
902   {
903     /* there might be a timeout task here */
904     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
905     ch->retry_control_task = NULL;
906   }
907   options = 0;
908   if (ch->nobuffer)
909     options |= GNUNET_CADET_OPTION_NOBUFFER;
910   if (ch->reliable)
911     options |= GNUNET_CADET_OPTION_RELIABLE;
912   if (ch->out_of_order)
913     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
914   cccd = GNUNET_new (struct CadetChannelClient);
915   ch->dest = cccd;
916   cccd->c = c;
917   cccd->client_ready = GNUNET_YES;
918   cccd->ccn = GSC_bind (c,
919                         ch,
920                         (GNUNET_YES == ch->is_loopback)
921                         ? GCP_get (&my_full_id,
922                                    GNUNET_YES)
923                         : GCT_get_destination (ch->t),
924                         &ch->port,
925                         options);
926   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
927                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
928   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
929   if (GNUNET_YES == ch->is_loopback)
930   {
931     ch->state = CADET_CHANNEL_OPEN_SENT;
932     GCCH_handle_channel_open_ack (ch);
933   }
934   else
935   {
936     /* notify other peer that we accepted the connection */
937     ch->retry_control_task
938       = GNUNET_SCHEDULER_add_now (&send_open_ack,
939                                   ch);
940   }
941   /* give client it's initial supply of ACKs */
942   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
943                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
944   for (unsigned int i=0;i<ch->max_pending_messages;i++)
945     send_ack_to_client (ch,
946                         GNUNET_NO);
947 }
948
949
950 /**
951  * Destroy locally created channel.  Called by the local client, so no
952  * need to tell the client.
953  *
954  * @param ch channel to destroy
955  * @param c client that caused the destruction
956  * @param ccn client number of the client @a c
957  */
958 void
959 GCCH_channel_local_destroy (struct CadetChannel *ch,
960                             struct CadetClient *c,
961                             struct GNUNET_CADET_ClientChannelNumber ccn)
962 {
963   LOG (GNUNET_ERROR_TYPE_DEBUG,
964        "%s asks for destruction of %s\n",
965        GSC_2s (c),
966        GCCH_2s (ch));
967   GNUNET_assert (NULL != c);
968   if ( (NULL != ch->owner) &&
969        (c == ch->owner->c) &&
970        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
971   {
972     free_channel_client (ch->owner);
973     ch->owner = NULL;
974   }
975   else if ( (NULL != ch->dest) &&
976             (c == ch->dest->c) &&
977             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
978   {
979     free_channel_client (ch->dest);
980     ch->dest = NULL;
981   }
982   else
983   {
984     GNUNET_assert (0);
985   }
986
987   if (GNUNET_YES == ch->destroy)
988   {
989     /* other end already destroyed, with the local client gone, no need
990        to finish transmissions, just destroy immediately. */
991     channel_destroy (ch);
992     return;
993   }
994   if ( (NULL != ch->head_sent) ||
995        (NULL != ch->owner) ||
996        (NULL != ch->dest) )
997   {
998     /* Wait for other end to destroy us as well,
999        and otherwise allow send queue to be transmitted first */
1000     ch->destroy = GNUNET_YES;
1001     return;
1002   }
1003   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1004   if (CADET_CHANNEL_NEW != ch->state)
1005     GCT_send_channel_destroy (ch->t,
1006                               ch->ctn);
1007   /* Nothing left to do, just finish destruction */
1008   channel_destroy (ch);
1009 }
1010
1011
1012 /**
1013  * We got an acknowledgement for the creation of the channel
1014  * (the port is open on the other side). Begin transmissions.
1015  *
1016  * @param ch channel to destroy
1017  */
1018 void
1019 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
1020 {
1021   switch (ch->state)
1022   {
1023   case CADET_CHANNEL_NEW:
1024     /* this should be impossible */
1025     GNUNET_break (0);
1026     break;
1027   case CADET_CHANNEL_OPEN_SENT:
1028     if (NULL == ch->owner)
1029     {
1030       /* We're not the owner, wrong direction! */
1031       GNUNET_break_op (0);
1032       return;
1033     }
1034     LOG (GNUNET_ERROR_TYPE_DEBUG,
1035          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1036          GCCH_2s (ch));
1037     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1038     {
1039       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1040       ch->retry_control_task = NULL;
1041     }
1042     ch->state = CADET_CHANNEL_READY;
1043     /* On first connect, send client as many ACKs as we allow messages
1044        to be buffered! */
1045     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1046       send_ack_to_client (ch,
1047                           GNUNET_YES);
1048     break;
1049   case CADET_CHANNEL_READY:
1050     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1051     LOG (GNUNET_ERROR_TYPE_DEBUG,
1052          "Received duplicate channel OPEN_ACK for %s\n",
1053          GCCH_2s (ch));
1054     GNUNET_STATISTICS_update (stats,
1055                               "# duplicate CREATE_ACKs",
1056                               1,
1057                               GNUNET_NO);
1058     break;
1059   }
1060 }
1061
1062
1063 /**
1064  * Test if element @a e1 comes before element @a e2.
1065  *
1066  * @param cls closure, to a flag where we indicate duplicate packets
1067  * @param e1 an element of to sort
1068  * @param e2 another element to sort
1069  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1070  */
1071 static int
1072 is_before (void *cls,
1073            struct CadetOutOfOrderMessage *m1,
1074            struct CadetOutOfOrderMessage *m2)
1075 {
1076   int *duplicate = cls;
1077   uint32_t v1 = ntohl (m1->mid.mid);
1078   uint32_t v2 = ntohl (m2->mid.mid);
1079   uint32_t delta;
1080
1081   delta = v2 - v1;
1082   if (0 == delta)
1083     *duplicate = GNUNET_YES;
1084   if (delta > (uint32_t) INT_MAX)
1085   {
1086     /* in overflow range, we can safely assume we wrapped around */
1087     return GNUNET_NO;
1088   }
1089   else
1090   {
1091     /* result is small, thus v2 > v1, thus e1 < e2 */
1092     return GNUNET_YES;
1093   }
1094 }
1095
1096
1097 /**
1098  * We got payload data for a channel.  Pass it on to the client
1099  * and send an ACK to the other end (once flow control allows it!)
1100  *
1101  * @param ch channel that got data
1102  * @param msg message that was received
1103  */
1104 void
1105 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1106                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1107 {
1108   struct GNUNET_MQ_Envelope *env;
1109   struct GNUNET_CADET_LocalData *ld;
1110   struct CadetChannelClient *ccc;
1111   size_t payload_size;
1112   struct CadetOutOfOrderMessage *com;
1113   int duplicate;
1114   uint32_t mid_min;
1115   uint32_t mid_max;
1116   uint32_t mid_msg;
1117
1118   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1119   if ( (GNUNET_YES == ch->destroy) &&
1120        (NULL == ch->owner) &&
1121        (NULL == ch->dest) )
1122   {
1123     /* This client is gone, but we still have messages to send to
1124        the other end (which is why @a ch is not yet dead).  However,
1125        we cannot pass messages to our client anymore. */
1126     LOG (GNUNET_ERROR_TYPE_DEBUG,
1127          "Dropping incoming payload on %s as this end is already closed\n",
1128          GCCH_2s (ch));
1129     /* FIXME: send back ACK/NACK/Closed notification
1130        to stop retransmissions! */
1131     return;
1132   }
1133   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1134   env = GNUNET_MQ_msg_extra (ld,
1135                              payload_size,
1136                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1137   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1138   GNUNET_memcpy (&ld[1],
1139                  &msg[1],
1140                  payload_size);
1141   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1142   if ( (GNUNET_YES == ccc->client_ready) &&
1143        ( (GNUNET_YES == ch->out_of_order) ||
1144          (msg->mid.mid == ch->mid_recv.mid) ) )
1145   {
1146     LOG (GNUNET_ERROR_TYPE_DEBUG,
1147          "Giving %u bytes of payload with MID %u from %s to client %s\n",
1148          (unsigned int) payload_size,
1149          ntohl (msg->mid.mid),
1150          GCCH_2s (ch),
1151          GSC_2s (ccc->c));
1152     ccc->client_ready = GNUNET_NO;
1153     GSC_send_to_client (ccc->c,
1154                         env);
1155     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1156     ch->mid_futures >>= 1;
1157     if (GNUNET_YES == ch->reliable)
1158       send_channel_data_ack (ch);
1159     return;
1160   }
1161
1162   /* check if message ought to be dropped because it is anicent/too distant/duplicate */
1163   mid_min = ntohl (ch->mid_recv.mid);
1164   mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
1165   mid_msg = ntohl (msg->mid.mid);
1166   if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
1167        ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
1168   {
1169     LOG (GNUNET_ERROR_TYPE_DEBUG,
1170          "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
1171          (unsigned int) payload_size,
1172          GCCH_2s (ch),
1173          ntohl (msg->mid.mid));
1174     GNUNET_STATISTICS_update (stats,
1175                               "# duplicate DATA (ancient or future)",
1176                               1,
1177                               GNUNET_NO);
1178     GNUNET_MQ_discard (env);
1179     return;
1180   }
1181
1182   /* Insert message into sorted out-of-order queue */
1183   com = GNUNET_new (struct CadetOutOfOrderMessage);
1184   com->mid = msg->mid;
1185   com->env = env;
1186   duplicate = GNUNET_NO;
1187   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1188                                       is_before,
1189                                       &duplicate,
1190                                       ccc->head_recv,
1191                                       ccc->tail_recv,
1192                                       com);
1193   if (GNUNET_YES == duplicate)
1194   {
1195     /* Duplicate within the queue, drop also */
1196     LOG (GNUNET_ERROR_TYPE_DEBUG,
1197          "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1198          (unsigned int) payload_size,
1199          GCCH_2s (ch),
1200          ntohl (msg->mid.mid));
1201     GNUNET_STATISTICS_update (stats,
1202                               "# duplicate DATA",
1203                               1,
1204                               GNUNET_NO);
1205     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1206                                  ccc->tail_recv,
1207                                  com);
1208     GNUNET_MQ_discard (com->env);
1209     GNUNET_free (com);
1210     if (GNUNET_YES == ch->reliable)
1211       send_channel_data_ack (ch);
1212     return;
1213   }
1214   LOG (GNUNET_ERROR_TYPE_DEBUG,
1215        "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1216        (GNUNET_YES == ccc->client_ready)
1217        ? "out-of-order"
1218        : "client-not-ready",
1219        (unsigned int) payload_size,
1220        GCCH_2s (ch),
1221        ntohl (ccc->ccn.channel_of_client),
1222        ccc,
1223        ntohl (msg->mid.mid),
1224        ntohl (ch->mid_recv.mid));
1225   send_channel_data_ack (ch);
1226 }
1227
1228
1229 /**
1230  * Function called once the tunnel has sent one of our messages.
1231  * If the message is unreliable, simply frees the `crm`. If the
1232  * message was reliable, calculate retransmission time and
1233  * wait for ACK (or retransmit).
1234  *
1235  * @param cls the `struct CadetReliableMessage` that was sent
1236  */
1237 static void
1238 data_sent_cb (void *cls);
1239
1240
1241 /**
1242  * We need to retry a transmission, the last one took too long to
1243  * be acknowledged.
1244  *
1245  * @param cls the `struct CadetChannel` where we need to retransmit
1246  */
1247 static void
1248 retry_transmission (void *cls)
1249 {
1250   struct CadetChannel *ch = cls;
1251   struct CadetReliableMessage *crm = ch->head_sent;
1252
1253   ch->retry_data_task = NULL;
1254   GNUNET_assert (NULL == crm->qe);
1255   LOG (GNUNET_ERROR_TYPE_DEBUG,
1256        "Retrying transmission on %s of message %u\n",
1257        GCCH_2s (ch),
1258        (unsigned int) ntohl (crm->data_message->mid.mid));
1259   crm->qe = GCT_send (ch->t,
1260                       &crm->data_message->header,
1261                       &data_sent_cb,
1262                       crm);
1263   GNUNET_assert (NULL == ch->retry_data_task);
1264 }
1265
1266
1267 /**
1268  * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1269  * the queue and tell our client that it can send more.
1270  *
1271  * @param ch the channel that got the PLAINTEXT_DATA_ACK
1272  * @param crm the message that got acknowledged
1273  */
1274 static void
1275 handle_matching_ack (struct CadetChannel *ch,
1276                      struct CadetReliableMessage *crm)
1277 {
1278   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1279                                ch->tail_sent,
1280                                crm);
1281   ch->pending_messages--;
1282   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1283   LOG (GNUNET_ERROR_TYPE_DEBUG,
1284        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1285        GCCH_2s (ch),
1286        (unsigned int) ntohl (crm->data_message->mid.mid),
1287        ch->pending_messages);
1288   if (NULL != crm->qe)
1289   {
1290     GCT_send_cancel (crm->qe);
1291     crm->qe = NULL;
1292   }
1293   GNUNET_free (crm->data_message);
1294   GNUNET_free (crm);
1295   send_ack_to_client (ch,
1296                       (NULL == ch->owner)
1297                       ? GNUNET_NO
1298                       : GNUNET_YES);
1299 }
1300
1301
1302 /**
1303  * We got an acknowledgement for payload data for a channel.
1304  * Possibly resume transmissions.
1305  *
1306  * @param ch channel that got the ack
1307  * @param ack details about what was received
1308  */
1309 void
1310 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1311                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1312 {
1313   struct CadetReliableMessage *crm;
1314   struct CadetReliableMessage *crmn;
1315   int found;
1316   uint32_t mid_base;
1317   uint64_t mid_mask;
1318   unsigned int delta;
1319
1320   GNUNET_break (GNUNET_NO == ch->is_loopback);
1321   if (GNUNET_NO == ch->reliable)
1322   {
1323     /* not expecting ACKs on unreliable channel, odd */
1324     GNUNET_break_op (0);
1325     return;
1326   }
1327   mid_base = ntohl (ack->mid.mid);
1328   mid_mask = GNUNET_htonll (ack->futures);
1329   found = GNUNET_NO;
1330   for (crm = ch->head_sent;
1331         NULL != crm;
1332        crm = crmn)
1333   {
1334     crmn = crm->next;
1335     if (ack->mid.mid == crm->data_message->mid.mid)
1336     {
1337       handle_matching_ack (ch,
1338                            crm);
1339       found = GNUNET_YES;
1340       continue;
1341     }
1342     delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base) - 1;
1343     if (delta >= 64)
1344       continue;
1345     if (0 != (mid_mask & (1LLU << delta)))
1346     {
1347       handle_matching_ack (ch,
1348                            crm);
1349       found = GNUNET_YES;
1350     }
1351   }
1352   if (GNUNET_NO == found)
1353   {
1354     /* ACK for message we already dropped, might have been a
1355        duplicate ACK? Ignore. */
1356     LOG (GNUNET_ERROR_TYPE_DEBUG,
1357          "Duplicate DATA_ACK on %s, ignoring\n",
1358          GCCH_2s (ch));
1359     GNUNET_STATISTICS_update (stats,
1360                               "# duplicate DATA_ACKs",
1361                               1,
1362                               GNUNET_NO);
1363     return;
1364   }
1365   if (NULL != ch->retry_data_task)
1366   {
1367     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1368     ch->retry_data_task = NULL;
1369   }
1370   if (NULL != ch->head_sent)
1371     ch->retry_data_task
1372       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1373                                  &retry_transmission,
1374                                  ch);
1375 }
1376
1377
1378 /**
1379  * Destroy channel, based on the other peer closing the
1380  * connection.  Also needs to remove this channel from
1381  * the tunnel.
1382  *
1383  * @param ch channel to destroy
1384  */
1385 void
1386 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1387 {
1388   struct CadetChannelClient *ccc;
1389
1390   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1391   LOG (GNUNET_ERROR_TYPE_DEBUG,
1392        "Received remote channel DESTROY for %s\n",
1393        GCCH_2s (ch));
1394   if (GNUNET_YES == ch->destroy)
1395   {
1396     /* Local client already gone, this is instant-death. */
1397     channel_destroy (ch);
1398     return;
1399   }
1400   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1401   if (NULL != ccc->head_recv)
1402   {
1403     LOG (GNUNET_ERROR_TYPE_WARNING,
1404          "Lost end of transmission due to remote shutdown on %s\n",
1405          GCCH_2s (ch));
1406     /* FIXME: change API to notify client about truncated transmission! */
1407   }
1408   ch->destroy = GNUNET_YES;
1409   GSC_handle_remote_channel_destroy (ccc->c,
1410                                      ccc->ccn,
1411                                      ch);
1412   channel_destroy (ch);
1413 }
1414
1415
1416 /**
1417  * Test if element @a e1 comes before element @a e2.
1418  *
1419  * @param cls closure, to a flag where we indicate duplicate packets
1420  * @param crm1 an element of to sort
1421  * @param crm2 another element to sort
1422  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1423  */
1424 static int
1425 cmp_crm_by_next_retry (void *cls,
1426                        struct CadetReliableMessage *crm1,
1427                        struct CadetReliableMessage *crm2)
1428 {
1429   if (crm1->next_retry.abs_value_us <
1430       crm2->next_retry.abs_value_us)
1431     return GNUNET_YES;
1432   return GNUNET_NO;
1433 }
1434
1435 /**
1436  * Function called once the tunnel has sent one of our messages.
1437  * If the message is unreliable, simply frees the `crm`. If the
1438  * message was reliable, calculate retransmission time and
1439  * wait for ACK (or retransmit).
1440  *
1441  * @param cls the `struct CadetReliableMessage` that was sent
1442  */
1443 static void
1444 data_sent_cb (void *cls)
1445 {
1446   struct CadetReliableMessage *crm = cls;
1447   struct CadetChannel *ch = crm->ch;
1448
1449   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1450   GNUNET_assert (NULL != crm->qe);
1451   crm->qe = NULL;
1452   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1453                                ch->tail_sent,
1454                                crm);
1455   if (GNUNET_NO == ch->reliable)
1456   {
1457     GNUNET_free (crm->data_message);
1458     GNUNET_free (crm);
1459     ch->pending_messages--;
1460     send_ack_to_client (ch,
1461                         (NULL == ch->owner)
1462                         ? GNUNET_NO
1463                         : GNUNET_YES);
1464     return;
1465   }
1466   if (0 == crm->retry_delay.rel_value_us)
1467     crm->retry_delay = ch->expected_delay;
1468   else
1469     crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1470   crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1471                                                MIN_RTT_DELAY);
1472   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1473
1474   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1475                                       cmp_crm_by_next_retry,
1476                                       NULL,
1477                                       ch->head_sent,
1478                                       ch->tail_sent,
1479                                       crm);
1480   LOG (GNUNET_ERROR_TYPE_DEBUG,
1481        "Message %u sent, next transmission on %s in %s\n",
1482        (unsigned int) ntohl (crm->data_message->mid.mid),
1483        GCCH_2s (ch),
1484        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1485                                                GNUNET_YES));
1486   if (crm == ch->head_sent)
1487   {
1488     /* We are the new head, need to reschedule retry task */
1489     if (NULL != ch->retry_data_task)
1490       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1491     ch->retry_data_task
1492       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1493                                  &retry_transmission,
1494                                  ch);
1495   }
1496 }
1497
1498
1499 /**
1500  * Handle data given by a client.
1501  *
1502  * Check whether the client is allowed to send in this tunnel, save if
1503  * channel is reliable and send an ACK to the client if there is still
1504  * buffer space in the tunnel.
1505  *
1506  * @param ch Channel.
1507  * @param sender_ccn ccn of the sender
1508  * @param buf payload to transmit.
1509  * @param buf_len number of bytes in @a buf
1510  * @return #GNUNET_OK if everything goes well,
1511  *         #GNUNET_SYSERR in case of an error.
1512  */
1513 int
1514 GCCH_handle_local_data (struct CadetChannel *ch,
1515                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1516                         const char *buf,
1517                         size_t buf_len)
1518 {
1519   struct CadetReliableMessage *crm;
1520
1521   if (ch->pending_messages > ch->max_pending_messages)
1522   {
1523     GNUNET_break (0);
1524     return GNUNET_SYSERR;
1525   }
1526   ch->pending_messages++;
1527
1528   if (GNUNET_YES == ch->is_loopback)
1529   {
1530     struct CadetChannelClient *receiver;
1531     struct GNUNET_MQ_Envelope *env;
1532     struct GNUNET_CADET_LocalData *ld;
1533     int to_owner;
1534
1535     env = GNUNET_MQ_msg_extra (ld,
1536                                buf_len,
1537                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1538     if (sender_ccn.channel_of_client ==
1539         ch->owner->ccn.channel_of_client)
1540     {
1541       receiver = ch->dest;
1542       to_owner = GNUNET_NO;
1543     }
1544     else
1545     {
1546       GNUNET_assert (sender_ccn.channel_of_client ==
1547                      ch->dest->ccn.channel_of_client);
1548       receiver = ch->owner;
1549       to_owner = GNUNET_YES;
1550     }
1551     ld->ccn = receiver->ccn;
1552     GNUNET_memcpy (&ld[1],
1553                    buf,
1554                    buf_len);
1555     if (GNUNET_YES == receiver->client_ready)
1556     {
1557       GSC_send_to_client (receiver->c,
1558                           env);
1559       send_ack_to_client (ch,
1560                           to_owner);
1561     }
1562     else
1563     {
1564       struct CadetOutOfOrderMessage *oom;
1565
1566       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1567       oom->env = env;
1568       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1569                                         receiver->tail_recv,
1570                                         oom);
1571     }
1572     return GNUNET_OK;
1573   }
1574
1575   /* Everything is correct, send the message. */
1576   crm = GNUNET_malloc (sizeof (*crm));
1577   crm->ch = ch;
1578   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1579                                      + buf_len);
1580   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1581   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1582   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1583   crm->data_message->mid = ch->mid_send;
1584   crm->data_message->ctn = ch->ctn;
1585   GNUNET_memcpy (&crm->data_message[1],
1586                  buf,
1587                  buf_len);
1588   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1589                                ch->tail_sent,
1590                                crm);
1591   LOG (GNUNET_ERROR_TYPE_DEBUG,
1592        "Sending %u bytes from local client to %s with MID %u\n",
1593        buf_len,
1594        GCCH_2s (ch),
1595        ntohl (crm->data_message->mid.mid));
1596   if (NULL != ch->retry_data_task)
1597   {
1598     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1599     ch->retry_data_task = NULL;
1600   }
1601   crm->qe = GCT_send (ch->t,
1602                       &crm->data_message->header,
1603                       &data_sent_cb,
1604                       crm);
1605   GNUNET_assert (NULL == ch->retry_data_task);
1606   return GNUNET_OK;
1607 }
1608
1609
1610 /**
1611  * Handle ACK from client on local channel.  Means the client is ready
1612  * for more data, see if we have any for it.
1613  *
1614  * @param ch channel to destroy
1615  * @param client_ccn ccn of the client sending the ack
1616  */
1617 void
1618 GCCH_handle_local_ack (struct CadetChannel *ch,
1619                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1620 {
1621   struct CadetChannelClient *ccc;
1622   struct CadetOutOfOrderMessage *com;
1623
1624   if ( (NULL != ch->owner) &&
1625        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1626     ccc = ch->owner;
1627   else if ( (NULL != ch->dest) &&
1628             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1629     ccc = ch->dest;
1630   else
1631     GNUNET_assert (0);
1632   ccc->client_ready = GNUNET_YES;
1633   com = ccc->head_recv;
1634   if (NULL == com)
1635   {
1636     LOG (GNUNET_ERROR_TYPE_DEBUG,
1637          "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1638          GSC_2s (ccc->c),
1639          ntohl (client_ccn.channel_of_client),
1640          GCCH_2s (ch),
1641          ntohl (ccc->ccn.channel_of_client),
1642          ccc);
1643     return; /* none pending */
1644   }
1645   if (GNUNET_YES == ch->is_loopback)
1646   {
1647     int to_owner;
1648
1649     /* Messages are always in-order, just send */
1650     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1651                                  ccc->tail_recv,
1652                                  com);
1653     GSC_send_to_client (ccc->c,
1654                         com->env);
1655     /* Notify sender that we can receive more */
1656     if (ccc->ccn.channel_of_client ==
1657         ch->owner->ccn.channel_of_client)
1658     {
1659       to_owner = GNUNET_NO;
1660     }
1661     else
1662     {
1663       GNUNET_assert (ccc->ccn.channel_of_client ==
1664                      ch->dest->ccn.channel_of_client);
1665       to_owner = GNUNET_YES;
1666     }
1667     send_ack_to_client (ch,
1668                         to_owner);
1669     GNUNET_free (com);
1670     return;
1671   }
1672
1673   if ( (com->mid.mid != ch->mid_recv.mid) &&
1674        (GNUNET_NO == ch->out_of_order) )
1675   {
1676     LOG (GNUNET_ERROR_TYPE_DEBUG,
1677          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1678          GSC_2s (ccc->c),
1679          ntohl (ccc->ccn.channel_of_client),
1680          ntohl (com->mid.mid),
1681          ntohl (ch->mid_recv.mid));
1682     return; /* missing next one in-order */
1683   }
1684
1685   LOG (GNUNET_ERROR_TYPE_DEBUG,
1686        "Got LOCAL ACK, passing payload message %u to %s-%X on %s\n",
1687        ntohl (com->mid.mid),
1688        GSC_2s (ccc->c),
1689        ntohl (ccc->ccn.channel_of_client),
1690        GCCH_2s (ch));
1691
1692   /* all good, pass next message to client */
1693   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1694                                ccc->tail_recv,
1695                                com);
1696   /* FIXME: if unreliable, this is not aggressive
1697      enough, as it would be OK to have lost some! */
1698   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1699   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1700   ccc->client_ready = GNUNET_NO;
1701   GSC_send_to_client (ccc->c,
1702                       com->env);
1703   GNUNET_free (com);
1704   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1705        (GNUNET_YES == ch->reliable) )
1706   {
1707     /* The next 15 messages were also already received (0xFF), this
1708        suggests that the sender may be blocked on flow control
1709        urgently waiting for an ACK from us. (As we have an inherent
1710        maximum of 64 bits, and 15 is getting too close for comfort.)
1711        So we should send one now. */
1712     LOG (GNUNET_ERROR_TYPE_DEBUG,
1713          "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1714          GCCH_2s (ch));
1715     if (GNUNET_YES == ch->reliable)
1716       send_channel_data_ack (ch);
1717   }
1718
1719   if (NULL != ccc->head_recv)
1720     return;
1721   if (GNUNET_NO == ch->destroy)
1722     return;
1723   GCT_send_channel_destroy (ch->t,
1724                             ch->ctn);
1725   channel_destroy (ch);
1726 }
1727
1728
1729 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1730
1731
1732 /**
1733  * Log channel info.
1734  *
1735  * @param ch Channel.
1736  * @param level Debug level to use.
1737  */
1738 void
1739 GCCH_debug (struct CadetChannel *ch,
1740             enum GNUNET_ErrorType level)
1741 {
1742   int do_log;
1743
1744   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1745                                        "cadet-chn",
1746                                        __FILE__, __FUNCTION__, __LINE__);
1747   if (0 == do_log)
1748     return;
1749
1750   if (NULL == ch)
1751   {
1752     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1753     return;
1754   }
1755   LOG2 (level,
1756         "CHN %s:%X (%p)\n",
1757         GCT_2s (ch->t),
1758         ch->ctn,
1759         ch);
1760   if (NULL != ch->owner)
1761   {
1762     LOG2 (level,
1763           "CHN origin %s ready %s local-id: %u\n",
1764           GSC_2s (ch->owner->c),
1765           ch->owner->client_ready ? "YES" : "NO",
1766           ntohl (ch->owner->ccn.channel_of_client));
1767   }
1768   if (NULL != ch->dest)
1769   {
1770     LOG2 (level,
1771           "CHN destination %s ready %s local-id: %u\n",
1772           GSC_2s (ch->dest->c),
1773           ch->dest->client_ready ? "YES" : "NO",
1774           ntohl (ch->dest->ccn.channel_of_client));
1775   }
1776   LOG2 (level,
1777         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1778         ntohl (ch->mid_recv.mid),
1779         (unsigned long long) ch->mid_futures,
1780         ntohl (ch->mid_send.mid));
1781 }
1782
1783
1784
1785 /* end of gnunet-service-cadet-new_channel.c */