handle ancient/future duplicate payload properly
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - introduce shutdown so we can have half-closed channels, modify
29  *   destroy to include MID to have FIN-ACK equivalents, etc.
30  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31  * - check that '0xFFULL' really is sufficient for flow control!
32  * - revisit handling of 'unreliable' traffic!
33  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34  * - figure out flow control without ACKs (unreliable traffic!)
35  */
36 #include "platform.h"
37 #include "gnunet_util_lib.h"
38 #include "cadet.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
46
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
48
49 /**
50  * How long do we initially wait before retransmitting?
51  */
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
53
54 /**
55  * How long do we wait before dropping state about incoming
56  * connection to closed port?
57  */
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
59
60 /**
61  * How long do we wait at least before retransmitting ever?
62  */
63 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
64
65 /**
66  * Maximum message ID into the future we accept for out-of-order messages.
67  * If the message is more than this into the future, we drop it.  This is
68  * important both to detect values that are actually in the past, as well
69  * as to limit adversarially triggerable memory consumption.
70  *
71  * Note that right now we have "max_pending_messages = 4" hard-coded in
72  * the logic below, so a value of 4 would suffice here. But we plan to
73  * allow larger windows in the future...
74  */
75 #define MAX_OUT_OF_ORDER_DISTANCE 1024
76
77
78 /**
79  * All the states a connection can be in.
80  */
81 enum CadetChannelState
82 {
83   /**
84    * Uninitialized status, should never appear in operation.
85    */
86   CADET_CHANNEL_NEW,
87
88   /**
89    * Connection create message sent, waiting for ACK.
90    */
91   CADET_CHANNEL_OPEN_SENT,
92
93   /**
94    * Connection confirmed, ready to carry traffic.
95    */
96   CADET_CHANNEL_READY
97 };
98
99
100 /**
101  * Info needed to retry a message in case it gets lost.
102  * Note that we DO use this structure also for unreliable
103  * messages.
104  */
105 struct CadetReliableMessage
106 {
107   /**
108    * Double linked list, FIFO style
109    */
110   struct CadetReliableMessage *next;
111
112   /**
113    * Double linked list, FIFO style
114    */
115   struct CadetReliableMessage *prev;
116
117   /**
118    * Which channel is this message in?
119    */
120   struct CadetChannel *ch;
121
122   /**
123    * Entry in the tunnels queue for this message, NULL if it has left
124    * the tunnel.  Used to cancel transmission in case we receive an
125    * ACK in time.
126    */
127   struct CadetTunnelQueueEntry *qe;
128
129   /**
130    * How soon should we retry if we fail to get an ACK?
131    * Messages in the queue are sorted by this value.
132    */
133   struct GNUNET_TIME_Absolute next_retry;
134
135   /**
136    * How long do we wait for an ACK after transmission?
137    * Use for the back-off calculation.
138    */
139   struct GNUNET_TIME_Relative retry_delay;
140
141   /**
142    * Data message we are trying to send.
143    */
144   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
145
146 };
147
148
149 /**
150  * List of received out-of-order data messages.
151  */
152 struct CadetOutOfOrderMessage
153 {
154   /**
155    * Double linked list, FIFO style
156    */
157   struct CadetOutOfOrderMessage *next;
158
159   /**
160    * Double linked list, FIFO style
161    */
162   struct CadetOutOfOrderMessage *prev;
163
164   /**
165    * ID of the message (messages up to this point needed
166    * before we give this one to the client).
167    */
168   struct ChannelMessageIdentifier mid;
169
170   /**
171    * The envelope with the payload of the out-of-order message
172    */
173   struct GNUNET_MQ_Envelope *env;
174
175 };
176
177
178 /**
179  * Client endpoint of a `struct CadetChannel`.  A channel may be a
180  * loopback channel, in which case it has two of these endpoints.
181  * Note that flow control also is required in both directions.
182  */
183 struct CadetChannelClient
184 {
185   /**
186    * Client handle.  Not by itself sufficient to designate
187    * the client endpoint, as the same client handle may
188    * be used for both the owner and the destination, and
189    * we thus also need the channel ID to identify the client.
190    */
191   struct CadetClient *c;
192
193   /**
194    * Head of DLL of messages received out of order or while client was unready.
195    */
196   struct CadetOutOfOrderMessage *head_recv;
197
198   /**
199    * Tail DLL of messages received out of order or while client was unready.
200    */
201   struct CadetOutOfOrderMessage *tail_recv;
202
203   /**
204    * Local tunnel number for this client.
205    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
206    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
207    */
208   struct GNUNET_CADET_ClientChannelNumber ccn;
209
210   /**
211    * Can we send data to the client?
212    */
213   int client_ready;
214
215 };
216
217
218 /**
219  * Struct containing all information regarding a channel to a remote client.
220  */
221 struct CadetChannel
222 {
223   /**
224    * Tunnel this channel is in.
225    */
226   struct CadetTunnel *t;
227
228   /**
229    * Client owner of the tunnel, if any.
230    * (Used if this channel represends the initiating end of the tunnel.)
231    */
232   struct CadetChannelClient *owner;
233
234   /**
235    * Client destination of the tunnel, if any.
236    * (Used if this channel represents the listening end of the tunnel.)
237    */
238   struct CadetChannelClient *dest;
239
240   /**
241    * Last entry in the tunnel's queue relating to control messages
242    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
243    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
244    * transmission in case we receive updated information.
245    */
246   struct CadetTunnelQueueEntry *last_control_qe;
247
248   /**
249    * Head of DLL of messages sent and not yet ACK'd.
250    */
251   struct CadetReliableMessage *head_sent;
252
253   /**
254    * Tail of DLL of messages sent and not yet ACK'd.
255    */
256   struct CadetReliableMessage *tail_sent;
257
258   /**
259    * Task to resend/poll in case no ACK is received.
260    */
261   struct GNUNET_SCHEDULER_Task *retry_control_task;
262
263   /**
264    * Task to resend/poll in case no ACK is received.
265    */
266   struct GNUNET_SCHEDULER_Task *retry_data_task;
267
268   /**
269    * Last time the channel was used
270    */
271   struct GNUNET_TIME_Absolute timestamp;
272
273   /**
274    * Destination port of the channel.
275    */
276   struct GNUNET_HashCode port;
277
278   /**
279    * Counter for exponential backoff.
280    */
281   struct GNUNET_TIME_Relative retry_time;
282
283   /**
284    * How long does it usually take to get an ACK.
285    */
286   struct GNUNET_TIME_Relative expected_delay;
287
288   /**
289    * Bitfield of already-received messages past @e mid_recv.
290    */
291   uint64_t mid_futures;
292
293   /**
294    * Next MID expected for incoming traffic.
295    */
296   struct ChannelMessageIdentifier mid_recv;
297
298   /**
299    * Next MID to use for outgoing traffic.
300    */
301   struct ChannelMessageIdentifier mid_send;
302
303   /**
304    * Total (reliable) messages pending ACK for this channel.
305    */
306   unsigned int pending_messages;
307
308   /**
309    * Maximum (reliable) messages pending ACK for this channel
310    * before we throttle the client.
311    */
312   unsigned int max_pending_messages;
313
314   /**
315    * Number identifying this channel in its tunnel.
316    */
317   struct GNUNET_CADET_ChannelTunnelNumber ctn;
318
319   /**
320    * Channel state.
321    */
322   enum CadetChannelState state;
323
324   /**
325    * Is the tunnel bufferless (minimum latency)?
326    */
327   int nobuffer;
328
329   /**
330    * Is the tunnel reliable?
331    */
332   int reliable;
333
334   /**
335    * Is the tunnel out-of-order?
336    */
337   int out_of_order;
338
339   /**
340    * Is this channel a loopback channel, where the destination is us again?
341    */
342   int is_loopback;
343
344   /**
345    * Flag to signal the destruction of the channel.  If this is set to
346    * #GNUNET_YES the channel will be destroyed once the queue is
347    * empty.
348    */
349   int destroy;
350
351 };
352
353
354 /**
355  * Get the static string for identification of the channel.
356  *
357  * @param ch Channel.
358  *
359  * @return Static string with the channel IDs.
360  */
361 const char *
362 GCCH_2s (const struct CadetChannel *ch)
363 {
364   static char buf[128];
365
366   GNUNET_snprintf (buf,
367                    sizeof (buf),
368                    "Channel %s:%s ctn:%X(%X/%X)",
369                    (GNUNET_YES == ch->is_loopback)
370                    ? "loopback"
371                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
372                    GNUNET_h2s (&ch->port),
373                    ch->ctn,
374                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
375                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
376   return buf;
377 }
378
379
380 /**
381  * Get the channel's public ID.
382  *
383  * @param ch Channel.
384  *
385  * @return ID used to identify the channel with the remote peer.
386  */
387 struct GNUNET_CADET_ChannelTunnelNumber
388 GCCH_get_id (const struct CadetChannel *ch)
389 {
390   return ch->ctn;
391 }
392
393
394 /**
395  * Release memory associated with @a ccc
396  *
397  * @param ccc data structure to clean up
398  */
399 static void
400 free_channel_client (struct CadetChannelClient *ccc)
401 {
402   struct CadetOutOfOrderMessage *com;
403
404   while (NULL != (com = ccc->head_recv))
405   {
406     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
407                                  ccc->tail_recv,
408                                  com);
409     GNUNET_MQ_discard (com->env);
410     GNUNET_free (com);
411   }
412   GNUNET_free (ccc);
413 }
414
415
416 /**
417  * Destroy the given channel.
418  *
419  * @param ch channel to destroy
420  */
421 static void
422 channel_destroy (struct CadetChannel *ch)
423 {
424   struct CadetReliableMessage *crm;
425
426   while (NULL != (crm = ch->head_sent))
427   {
428     GNUNET_assert (ch == crm->ch);
429     if (NULL != crm->qe)
430     {
431       GCT_send_cancel (crm->qe);
432       crm->qe = NULL;
433     }
434     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
435                                  ch->tail_sent,
436                                  crm);
437     GNUNET_free (crm->data_message);
438     GNUNET_free (crm);
439   }
440   if (NULL != ch->owner)
441   {
442     free_channel_client (ch->owner);
443     ch->owner = NULL;
444   }
445   if (NULL != ch->dest)
446   {
447     free_channel_client (ch->dest);
448     ch->dest = NULL;
449   }
450   if (NULL != ch->last_control_qe)
451   {
452     GCT_send_cancel (ch->last_control_qe);
453     ch->last_control_qe = NULL;
454   }
455   if (NULL != ch->retry_data_task)
456   {
457     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
458     ch->retry_data_task = NULL;
459   }
460   if (NULL != ch->retry_control_task)
461   {
462     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
463     ch->retry_control_task = NULL;
464   }
465   if (GNUNET_NO == ch->is_loopback)
466   {
467     GCT_remove_channel (ch->t,
468                         ch,
469                         ch->ctn);
470     ch->t = NULL;
471   }
472   GNUNET_free (ch);
473 }
474
475
476 /**
477  * Send a channel create message.
478  *
479  * @param cls Channel for which to send.
480  */
481 static void
482 send_channel_open (void *cls);
483
484
485 /**
486  * Function called once the tunnel confirms that we sent the
487  * create message.  Delays for a bit until we retry.
488  *
489  * @param cls our `struct CadetChannel`.
490  */
491 static void
492 channel_open_sent_cb (void *cls)
493 {
494   struct CadetChannel *ch = cls;
495
496   GNUNET_assert (NULL != ch->last_control_qe);
497   ch->last_control_qe = NULL;
498   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
499   LOG (GNUNET_ERROR_TYPE_DEBUG,
500        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
501        GCCH_2s (ch),
502        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
503                                                GNUNET_YES));
504   ch->retry_control_task
505     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
506                                     &send_channel_open,
507                                     ch);
508 }
509
510
511 /**
512  * Send a channel open message.
513  *
514  * @param cls Channel for which to send.
515  */
516 static void
517 send_channel_open (void *cls)
518 {
519   struct CadetChannel *ch = cls;
520   struct GNUNET_CADET_ChannelOpenMessage msgcc;
521   uint32_t options;
522
523   ch->retry_control_task = NULL;
524   LOG (GNUNET_ERROR_TYPE_DEBUG,
525        "Sending CHANNEL_OPEN message for %s\n",
526        GCCH_2s (ch));
527   options = 0;
528   if (ch->nobuffer)
529     options |= GNUNET_CADET_OPTION_NOBUFFER;
530   if (ch->reliable)
531     options |= GNUNET_CADET_OPTION_RELIABLE;
532   if (ch->out_of_order)
533     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
534   msgcc.header.size = htons (sizeof (msgcc));
535   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
536   msgcc.opt = htonl (options);
537   msgcc.port = ch->port;
538   msgcc.ctn = ch->ctn;
539   ch->state = CADET_CHANNEL_OPEN_SENT;
540   ch->last_control_qe = GCT_send (ch->t,
541                                   &msgcc.header,
542                                   &channel_open_sent_cb,
543                                   ch);
544   GNUNET_assert (NULL == ch->retry_control_task);
545 }
546
547
548 /**
549  * Function called once and only once after a channel was bound
550  * to its tunnel via #GCT_add_channel() is ready for transmission.
551  * Note that this is only the case for channels that this peer
552  * initiates, as for incoming channels we assume that they are
553  * ready for transmission immediately upon receiving the open
554  * message.  Used to bootstrap the #GCT_send() process.
555  *
556  * @param ch the channel for which the tunnel is now ready
557  */
558 void
559 GCCH_tunnel_up (struct CadetChannel *ch)
560 {
561   GNUNET_assert (NULL == ch->retry_control_task);
562   LOG (GNUNET_ERROR_TYPE_DEBUG,
563        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
564        GCCH_2s (ch));
565   ch->retry_control_task
566     = GNUNET_SCHEDULER_add_now (&send_channel_open,
567                                 ch);
568 }
569
570
571 /**
572  * Create a new channel.
573  *
574  * @param owner local client owning the channel
575  * @param ccn local number of this channel at the @a owner
576  * @param destination peer to which we should build the channel
577  * @param port desired port at @a destination
578  * @param options options for the channel
579  * @return handle to the new channel
580  */
581 struct CadetChannel *
582 GCCH_channel_local_new (struct CadetClient *owner,
583                         struct GNUNET_CADET_ClientChannelNumber ccn,
584                         struct CadetPeer *destination,
585                         const struct GNUNET_HashCode *port,
586                         uint32_t options)
587 {
588   struct CadetChannel *ch;
589   struct CadetChannelClient *ccco;
590
591   ccco = GNUNET_new (struct CadetChannelClient);
592   ccco->c = owner;
593   ccco->ccn = ccn;
594   ccco->client_ready = GNUNET_YES;
595
596   ch = GNUNET_new (struct CadetChannel);
597   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
598   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
599   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
600   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
601   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
602   ch->owner = ccco;
603   ch->port = *port;
604   if (0 == memcmp (&my_full_id,
605                    GCP_get_id (destination),
606                    sizeof (struct GNUNET_PeerIdentity)))
607   {
608     struct CadetClient *c;
609
610     ch->is_loopback = GNUNET_YES;
611     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
612                                            port);
613     if (NULL == c)
614     {
615       /* port closed, wait for it to possibly open */
616       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
617                                                 port,
618                                                 ch,
619                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
620       LOG (GNUNET_ERROR_TYPE_DEBUG,
621            "Created loose incoming loopback channel to port %s\n",
622            GNUNET_h2s (&ch->port));
623     }
624     else
625     {
626       ch->dest = GNUNET_new (struct CadetChannelClient);
627       ch->dest->c = c;
628       ch->dest->client_ready = GNUNET_YES;
629       GCCH_bind (ch,
630                  ch->dest->c);
631     }
632   }
633   else
634   {
635     ch->t = GCP_get_tunnel (destination,
636                             GNUNET_YES);
637     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
638     ch->ctn = GCT_add_channel (ch->t,
639                                ch);
640   }
641   GNUNET_STATISTICS_update (stats,
642                             "# channels",
643                             1,
644                             GNUNET_NO);
645   LOG (GNUNET_ERROR_TYPE_DEBUG,
646        "Created channel to port %s at peer %s for %s using %s\n",
647        GNUNET_h2s (port),
648        GCP_2s (destination),
649        GSC_2s (owner),
650        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
651   return ch;
652 }
653
654
655 /**
656  * We had an incoming channel to a port that is closed.
657  * It has not been opened for a while, drop it.
658  *
659  * @param cls the channel to drop
660  */
661 static void
662 timeout_closed_cb (void *cls)
663 {
664   struct CadetChannel *ch = cls;
665
666   ch->retry_control_task = NULL;
667   LOG (GNUNET_ERROR_TYPE_DEBUG,
668        "Closing incoming channel to port %s from peer %s due to timeout\n",
669        GNUNET_h2s (&ch->port),
670        GCP_2s (GCT_get_destination (ch->t)));
671   channel_destroy (ch);
672 }
673
674
675 /**
676  * Create a new channel based on a request coming in over the network.
677  *
678  * @param t tunnel to the remote peer
679  * @param ctn identifier of this channel in the tunnel
680  * @param port desired local port
681  * @param options options for the channel
682  * @return handle to the new channel
683  */
684 struct CadetChannel *
685 GCCH_channel_incoming_new (struct CadetTunnel *t,
686                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
687                            const struct GNUNET_HashCode *port,
688                            uint32_t options)
689 {
690   struct CadetChannel *ch;
691   struct CadetClient *c;
692
693   ch = GNUNET_new (struct CadetChannel);
694   ch->port = *port;
695   ch->t = t;
696   ch->ctn = ctn;
697   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
698   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
699   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
700   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
701   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
702   GNUNET_STATISTICS_update (stats,
703                             "# channels",
704                             1,
705                             GNUNET_NO);
706
707   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
708                                          port);
709   if (NULL == c)
710   {
711     /* port closed, wait for it to possibly open */
712     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
713                                               port,
714                                               ch,
715                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
716     ch->retry_control_task
717       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
718                                       &timeout_closed_cb,
719                                       ch);
720     LOG (GNUNET_ERROR_TYPE_DEBUG,
721          "Created loose incoming channel to port %s from peer %s\n",
722          GNUNET_h2s (&ch->port),
723          GCP_2s (GCT_get_destination (ch->t)));
724   }
725   else
726   {
727     GCCH_bind (ch,
728                c);
729   }
730   GNUNET_STATISTICS_update (stats,
731                             "# channels",
732                             1,
733                             GNUNET_NO);
734   return ch;
735 }
736
737
738 /**
739  * Function called once the tunnel confirms that we sent the
740  * ACK message.  Just remembers it was sent, we do not expect
741  * ACKs for ACKs ;-).
742  *
743  * @param cls our `struct CadetChannel`.
744  */
745 static void
746 send_ack_cb (void *cls)
747 {
748   struct CadetChannel *ch = cls;
749
750   GNUNET_assert (NULL != ch->last_control_qe);
751   ch->last_control_qe = NULL;
752 }
753
754
755 /**
756  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
757  *
758  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
759  */
760 static void
761 send_channel_data_ack (struct CadetChannel *ch)
762 {
763   struct GNUNET_CADET_ChannelDataAckMessage msg;
764
765   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
766   msg.header.size = htons (sizeof (msg));
767   msg.ctn = ch->ctn;
768   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
769   msg.futures = GNUNET_htonll (ch->mid_futures);
770   if (NULL != ch->last_control_qe)
771     GCT_send_cancel (ch->last_control_qe);
772   ch->last_control_qe = GCT_send (ch->t,
773                                   &msg.header,
774                                   &send_ack_cb,
775                                   ch);
776 }
777
778
779 /**
780  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
781  * connection is up.
782  *
783  * @param cls the `struct CadetChannel`
784  */
785 static void
786 send_open_ack (void *cls)
787 {
788   struct CadetChannel *ch = cls;
789   struct GNUNET_CADET_ChannelManageMessage msg;
790
791   LOG (GNUNET_ERROR_TYPE_DEBUG,
792        "Sending CHANNEL_OPEN_ACK on %s\n",
793        GCCH_2s (ch));
794   ch->retry_control_task = NULL;
795   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
796   msg.header.size = htons (sizeof (msg));
797   msg.reserved = htonl (0);
798   msg.ctn = ch->ctn;
799   if (NULL != ch->last_control_qe)
800     GCT_send_cancel (ch->last_control_qe);
801   ch->last_control_qe = GCT_send (ch->t,
802                                   &msg.header,
803                                   &send_ack_cb,
804                                   ch);
805 }
806
807
808 /**
809  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
810  * this channel.  If the binding was successful, (re)transmit the
811  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
812  *
813  * @param ch channel that got the duplicate open
814  */
815 void
816 GCCH_handle_duplicate_open (struct CadetChannel *ch)
817 {
818   if (NULL == ch->dest)
819   {
820     LOG (GNUNET_ERROR_TYPE_DEBUG,
821          "Ignoring duplicate channel OPEN on %s: port is closed\n",
822          GCCH_2s (ch));
823     return;
824   }
825   if (NULL != ch->retry_control_task)
826   {
827     LOG (GNUNET_ERROR_TYPE_DEBUG,
828          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
829          GCCH_2s (ch));
830     return;
831   }
832   LOG (GNUNET_ERROR_TYPE_DEBUG,
833        "Retransmitting OPEN_ACK on %s\n",
834        GCCH_2s (ch));
835   ch->retry_control_task
836     = GNUNET_SCHEDULER_add_now (&send_open_ack,
837                                 ch);
838 }
839
840
841 /**
842  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
843  *
844  * @param ch channel the ack is for
845  * @param to_owner #GNUNET_YES to send to owner,
846  *                 #GNUNET_NO to send to dest
847  */
848 static void
849 send_ack_to_client (struct CadetChannel *ch,
850                     int to_owner)
851 {
852   struct GNUNET_MQ_Envelope *env;
853   struct GNUNET_CADET_LocalAck *ack;
854   struct CadetChannelClient *ccc;
855
856   env = GNUNET_MQ_msg (ack,
857                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
858   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
859   ack->ccn = ccc->ccn;
860   LOG (GNUNET_ERROR_TYPE_DEBUG,
861        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
862        GSC_2s (ccc->c),
863        (GNUNET_YES == to_owner) ? "owner" : "dest",
864        ntohl (ack->ccn.channel_of_client),
865        ch->pending_messages,
866        ch->max_pending_messages);
867   GSC_send_to_client (ccc->c,
868                       env);
869 }
870
871
872 /**
873  * A client is bound to the port that we have a channel
874  * open to.  Send the acknowledgement for the connection
875  * request and establish the link with the client.
876  *
877  * @param ch open incoming channel
878  * @param c client listening on the respective port
879  */
880 void
881 GCCH_bind (struct CadetChannel *ch,
882            struct CadetClient *c)
883 {
884   uint32_t options;
885   struct CadetChannelClient *cccd;
886
887   LOG (GNUNET_ERROR_TYPE_DEBUG,
888        "Binding %s from %s to port %s of %s\n",
889        GCCH_2s (ch),
890        GCT_2s (ch->t),
891        GNUNET_h2s (&ch->port),
892        GSC_2s (c));
893   if (NULL != ch->retry_control_task)
894   {
895     /* there might be a timeout task here */
896     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
897     ch->retry_control_task = NULL;
898   }
899   options = 0;
900   if (ch->nobuffer)
901     options |= GNUNET_CADET_OPTION_NOBUFFER;
902   if (ch->reliable)
903     options |= GNUNET_CADET_OPTION_RELIABLE;
904   if (ch->out_of_order)
905     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
906   cccd = GNUNET_new (struct CadetChannelClient);
907   ch->dest = cccd;
908   cccd->c = c;
909   cccd->client_ready = GNUNET_YES;
910   cccd->ccn = GSC_bind (c,
911                         ch,
912                         (GNUNET_YES == ch->is_loopback)
913                         ? GCP_get (&my_full_id,
914                                    GNUNET_YES)
915                         : GCT_get_destination (ch->t),
916                         &ch->port,
917                         options);
918   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
919                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
920   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
921   if (GNUNET_YES == ch->is_loopback)
922   {
923     ch->state = CADET_CHANNEL_OPEN_SENT;
924     GCCH_handle_channel_open_ack (ch);
925   }
926   else
927   {
928     /* notify other peer that we accepted the connection */
929     ch->retry_control_task
930       = GNUNET_SCHEDULER_add_now (&send_open_ack,
931                                   ch);
932   }
933   /* give client it's initial supply of ACKs */
934   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
935                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
936   for (unsigned int i=0;i<ch->max_pending_messages;i++)
937     send_ack_to_client (ch,
938                         GNUNET_NO);
939 }
940
941
942 /**
943  * Destroy locally created channel.  Called by the local client, so no
944  * need to tell the client.
945  *
946  * @param ch channel to destroy
947  * @param c client that caused the destruction
948  * @param ccn client number of the client @a c
949  */
950 void
951 GCCH_channel_local_destroy (struct CadetChannel *ch,
952                             struct CadetClient *c,
953                             struct GNUNET_CADET_ClientChannelNumber ccn)
954 {
955   LOG (GNUNET_ERROR_TYPE_DEBUG,
956        "%s asks for destruction of %s\n",
957        GSC_2s (c),
958        GCCH_2s (ch));
959   GNUNET_assert (NULL != c);
960   if ( (NULL != ch->owner) &&
961        (c == ch->owner->c) &&
962        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
963   {
964     free_channel_client (ch->owner);
965     ch->owner = NULL;
966   }
967   else if ( (NULL != ch->dest) &&
968             (c == ch->dest->c) &&
969             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
970   {
971     free_channel_client (ch->dest);
972     ch->dest = NULL;
973   }
974   else
975   {
976     GNUNET_assert (0);
977   }
978
979   if (GNUNET_YES == ch->destroy)
980   {
981     /* other end already destroyed, with the local client gone, no need
982        to finish transmissions, just destroy immediately. */
983     channel_destroy (ch);
984     return;
985   }
986   if ( (NULL != ch->head_sent) ||
987        (NULL != ch->owner) ||
988        (NULL != ch->dest) )
989   {
990     /* Wait for other end to destroy us as well,
991        and otherwise allow send queue to be transmitted first */
992     ch->destroy = GNUNET_YES;
993     return;
994   }
995   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
996   if (CADET_CHANNEL_NEW != ch->state)
997     GCT_send_channel_destroy (ch->t,
998                               ch->ctn);
999   /* Nothing left to do, just finish destruction */
1000   channel_destroy (ch);
1001 }
1002
1003
1004 /**
1005  * We got an acknowledgement for the creation of the channel
1006  * (the port is open on the other side). Begin transmissions.
1007  *
1008  * @param ch channel to destroy
1009  */
1010 void
1011 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
1012 {
1013   switch (ch->state)
1014   {
1015   case CADET_CHANNEL_NEW:
1016     /* this should be impossible */
1017     GNUNET_break (0);
1018     break;
1019   case CADET_CHANNEL_OPEN_SENT:
1020     if (NULL == ch->owner)
1021     {
1022       /* We're not the owner, wrong direction! */
1023       GNUNET_break_op (0);
1024       return;
1025     }
1026     LOG (GNUNET_ERROR_TYPE_DEBUG,
1027          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1028          GCCH_2s (ch));
1029     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1030     {
1031       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1032       ch->retry_control_task = NULL;
1033     }
1034     ch->state = CADET_CHANNEL_READY;
1035     /* On first connect, send client as many ACKs as we allow messages
1036        to be buffered! */
1037     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1038       send_ack_to_client (ch,
1039                           GNUNET_YES);
1040     break;
1041   case CADET_CHANNEL_READY:
1042     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1043     LOG (GNUNET_ERROR_TYPE_DEBUG,
1044          "Received duplicate channel OPEN_ACK for %s\n",
1045          GCCH_2s (ch));
1046     GNUNET_STATISTICS_update (stats,
1047                               "# duplicate CREATE_ACKs",
1048                               1,
1049                               GNUNET_NO);
1050     break;
1051   }
1052 }
1053
1054
1055 /**
1056  * Test if element @a e1 comes before element @a e2.
1057  *
1058  * @param cls closure, to a flag where we indicate duplicate packets
1059  * @param e1 an element of to sort
1060  * @param e2 another element to sort
1061  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1062  */
1063 static int
1064 is_before (void *cls,
1065            struct CadetOutOfOrderMessage *m1,
1066            struct CadetOutOfOrderMessage *m2)
1067 {
1068   int *duplicate = cls;
1069   uint32_t v1 = ntohl (m1->mid.mid);
1070   uint32_t v2 = ntohl (m2->mid.mid);
1071   uint32_t delta;
1072
1073   delta = v2 - v1;
1074   if (0 == delta)
1075     *duplicate = GNUNET_YES;
1076   if (delta > (uint32_t) INT_MAX)
1077   {
1078     /* in overflow range, we can safely assume we wrapped around */
1079     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1080                 "%u > %u => %p > %p\n",
1081                 (unsigned int) v1,
1082                 (unsigned int) v2,
1083                 m1,
1084                 m2);
1085     return GNUNET_NO;
1086   }
1087   else
1088   {
1089     /* result is small, thus v2 > v1, thus e1 < e2 */
1090     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1091                 "%u < %u => %p < %p\n",
1092                 (unsigned int) v1,
1093                 (unsigned int) v2,
1094                 m1,
1095                 m2);
1096     return GNUNET_YES;
1097   }
1098 }
1099
1100
1101 /**
1102  * We got payload data for a channel.  Pass it on to the client
1103  * and send an ACK to the other end (once flow control allows it!)
1104  *
1105  * @param ch channel that got data
1106  * @param msg message that was received
1107  */
1108 void
1109 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1110                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1111 {
1112   struct GNUNET_MQ_Envelope *env;
1113   struct GNUNET_CADET_LocalData *ld;
1114   struct CadetChannelClient *ccc;
1115   size_t payload_size;
1116
1117   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1118   if ( (GNUNET_YES == ch->destroy) &&
1119        (NULL == ch->owner) &&
1120        (NULL == ch->dest) )
1121   {
1122     /* This client is gone, but we still have messages to send to
1123        the other end (which is why @a ch is not yet dead).  However,
1124        we cannot pass messages to our client anymore. */
1125     LOG (GNUNET_ERROR_TYPE_DEBUG,
1126          "Dropping incoming payload on %s as this end is already closed\n",
1127          GCCH_2s (ch));
1128     /* FIXME: send back ACK/NACK/Closed notification
1129        to stop retransmissions! */
1130     return;
1131   }
1132   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1133   env = GNUNET_MQ_msg_extra (ld,
1134                              payload_size,
1135                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1136   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1137   GNUNET_memcpy (&ld[1],
1138                  &msg[1],
1139                  payload_size);
1140   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1141   if ( (GNUNET_YES == ccc->client_ready) &&
1142        ( (GNUNET_YES == ch->out_of_order) ||
1143          (msg->mid.mid == ch->mid_recv.mid) ) )
1144   {
1145     LOG (GNUNET_ERROR_TYPE_DEBUG,
1146          "Giving %u bytes of payload with MID %u from %s to client %s\n",
1147          (unsigned int) payload_size,
1148          ntohl (msg->mid.mid),
1149          GCCH_2s (ch),
1150          GSC_2s (ccc->c));
1151     ccc->client_ready = GNUNET_NO;
1152     GSC_send_to_client (ccc->c,
1153                         env);
1154     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1155     ch->mid_futures >>= 1;
1156   }
1157   else
1158   {
1159     struct CadetOutOfOrderMessage *com;
1160     int duplicate;
1161     uint32_t mid_min;
1162     uint32_t mid_max;
1163     uint32_t mid_msg;
1164
1165     mid_min = ntohl (ch->mid_recv.mid);
1166     mid_max = mid_min + MAX_OUT_OF_ORDER_DISTANCE;
1167     mid_msg = ntohl (msg->mid.mid);
1168     if ( ( (uint32_t) (mid_msg - mid_min) > MAX_OUT_OF_ORDER_DISTANCE) ||
1169          ( (uint32_t) (mid_max - mid_msg) > MAX_OUT_OF_ORDER_DISTANCE) )
1170     {
1171       LOG (GNUNET_ERROR_TYPE_DEBUG,
1172            "Duplicate ancient or future payload of %u bytes on %s (mid %u) dropped\n",
1173            (unsigned int) payload_size,
1174            GCCH_2s (ch),
1175            ntohl (msg->mid.mid));
1176       GNUNET_STATISTICS_update (stats,
1177                                 "# duplicate DATA (ancient or future)",
1178                                 1,
1179                                 GNUNET_NO);
1180       GNUNET_MQ_discard (env);
1181       return;
1182     }
1183
1184     com = GNUNET_new (struct CadetOutOfOrderMessage);
1185     com->mid = msg->mid;
1186     com->env = env;
1187     duplicate = GNUNET_NO;
1188     GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1189                                         is_before,
1190                                         &duplicate,
1191                                         ccc->head_recv,
1192                                         ccc->tail_recv,
1193                                         com);
1194     if (GNUNET_YES == duplicate)
1195     {
1196       LOG (GNUNET_ERROR_TYPE_DEBUG,
1197            "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1198            (unsigned int) payload_size,
1199            GCCH_2s (ch),
1200            ntohl (msg->mid.mid));
1201       GNUNET_STATISTICS_update (stats,
1202                                 "# duplicate DATA",
1203                                 1,
1204                                 GNUNET_NO);
1205       GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1206                                    ccc->tail_recv,
1207                                    com);
1208       GNUNET_MQ_discard (com->env);
1209       GNUNET_free (com);
1210       return;
1211     }
1212     LOG (GNUNET_ERROR_TYPE_DEBUG,
1213          "Queued %s payload of %u bytes on %s (mid %u, need %u first)\n",
1214          (GNUNET_YES == ccc->client_ready)
1215          ? "out-of-order"
1216          : "client-not-ready",
1217          (unsigned int) payload_size,
1218          GCCH_2s (ch),
1219          ntohl (msg->mid.mid),
1220          ntohl (ch->mid_recv.mid));
1221   }
1222 }
1223
1224
1225 /**
1226  * Function called once the tunnel has sent one of our messages.
1227  * If the message is unreliable, simply frees the `crm`. If the
1228  * message was reliable, calculate retransmission time and
1229  * wait for ACK (or retransmit).
1230  *
1231  * @param cls the `struct CadetReliableMessage` that was sent
1232  */
1233 static void
1234 data_sent_cb (void *cls);
1235
1236
1237 /**
1238  * We need to retry a transmission, the last one took too long to
1239  * be acknowledged.
1240  *
1241  * @param cls the `struct CadetChannel` where we need to retransmit
1242  */
1243 static void
1244 retry_transmission (void *cls)
1245 {
1246   struct CadetChannel *ch = cls;
1247   struct CadetReliableMessage *crm = ch->head_sent;
1248
1249   ch->retry_data_task = NULL;
1250   GNUNET_assert (NULL == crm->qe);
1251   LOG (GNUNET_ERROR_TYPE_DEBUG,
1252        "Retrying transmission on %s of message %u\n",
1253        GCCH_2s (ch),
1254        (unsigned int) ntohl (crm->data_message->mid.mid));
1255   crm->qe = GCT_send (ch->t,
1256                       &crm->data_message->header,
1257                       &data_sent_cb,
1258                       crm);
1259   GNUNET_assert (NULL == ch->retry_data_task);
1260 }
1261
1262
1263 /**
1264  * We got an acknowledgement for payload data for a channel.
1265  * Possibly resume transmissions.
1266  *
1267  * @param ch channel that got the ack
1268  * @param ack details about what was received
1269  */
1270 void
1271 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1272                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1273 {
1274   struct CadetReliableMessage *crm;
1275   int was_head;
1276
1277   GNUNET_break (GNUNET_NO == ch->is_loopback);
1278   if (GNUNET_NO == ch->reliable)
1279   {
1280     /* not expecting ACKs on unreliable channel, odd */
1281     GNUNET_break_op (0);
1282     return;
1283   }
1284   for (crm = ch->head_sent;
1285         NULL != crm;
1286        crm = crm->next)
1287     if (ack->mid.mid == crm->data_message->mid.mid)
1288       break;
1289   if (NULL == crm)
1290   {
1291     /* ACK for message we already dropped, might have been a
1292        duplicate ACK? Ignore. */
1293     LOG (GNUNET_ERROR_TYPE_DEBUG,
1294          "Duplicate DATA_ACK on %s, ignoring\n",
1295          GCCH_2s (ch));
1296     GNUNET_STATISTICS_update (stats,
1297                               "# duplicate DATA_ACKs",
1298                               1,
1299                               GNUNET_NO);
1300     return;
1301   }
1302   was_head = (crm == ch->head_sent);
1303   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1304                                ch->tail_sent,
1305                                crm);
1306   GNUNET_free (crm->data_message);
1307   GNUNET_free (crm);
1308   ch->pending_messages--;
1309   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1310   LOG (GNUNET_ERROR_TYPE_DEBUG,
1311        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1312        GCCH_2s (ch),
1313        (unsigned int) ntohl (ack->mid.mid),
1314        ch->pending_messages);
1315   send_ack_to_client (ch,
1316                       (NULL == ch->owner)
1317                       ? GNUNET_NO
1318                       : GNUNET_YES);
1319   if (was_head)
1320   {
1321     if (NULL != ch->retry_data_task)
1322     {
1323       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1324       ch->retry_data_task = NULL;
1325     }
1326     if (NULL != ch->head_sent)
1327       ch->retry_data_task
1328         = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1329                                    &retry_transmission,
1330                                    ch);
1331   }
1332 }
1333
1334
1335 /**
1336  * Destroy channel, based on the other peer closing the
1337  * connection.  Also needs to remove this channel from
1338  * the tunnel.
1339  *
1340  * @param ch channel to destroy
1341  */
1342 void
1343 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1344 {
1345   struct CadetChannelClient *ccc;
1346
1347   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1348   LOG (GNUNET_ERROR_TYPE_DEBUG,
1349        "Received remote channel DESTROY for %s\n",
1350        GCCH_2s (ch));
1351   if (GNUNET_YES == ch->destroy)
1352   {
1353     /* Local client already gone, this is instant-death. */
1354     channel_destroy (ch);
1355     return;
1356   }
1357   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1358   if (NULL != ccc->head_recv)
1359   {
1360     LOG (GNUNET_ERROR_TYPE_WARNING,
1361          "Lost end of transmission due to remote shutdown on %s\n",
1362          GCCH_2s (ch));
1363     /* FIXME: change API to notify client about truncated transmission! */
1364   }
1365   ch->destroy = GNUNET_YES;
1366   GSC_handle_remote_channel_destroy (ccc->c,
1367                                      ccc->ccn,
1368                                      ch);
1369   channel_destroy (ch);
1370 }
1371
1372
1373 /**
1374  * Test if element @a e1 comes before element @a e2.
1375  *
1376  * @param cls closure, to a flag where we indicate duplicate packets
1377  * @param crm1 an element of to sort
1378  * @param crm2 another element to sort
1379  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1380  */
1381 static int
1382 cmp_crm_by_next_retry (void *cls,
1383                        struct CadetReliableMessage *crm1,
1384                        struct CadetReliableMessage *crm2)
1385 {
1386   if (crm1->next_retry.abs_value_us <
1387       crm2->next_retry.abs_value_us)
1388     return GNUNET_YES;
1389   return GNUNET_NO;
1390 }
1391
1392 /**
1393  * Function called once the tunnel has sent one of our messages.
1394  * If the message is unreliable, simply frees the `crm`. If the
1395  * message was reliable, calculate retransmission time and
1396  * wait for ACK (or retransmit).
1397  *
1398  * @param cls the `struct CadetReliableMessage` that was sent
1399  */
1400 static void
1401 data_sent_cb (void *cls)
1402 {
1403   struct CadetReliableMessage *crm = cls;
1404   struct CadetChannel *ch = crm->ch;
1405
1406   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1407   GNUNET_assert (NULL != crm->qe);
1408   crm->qe = NULL;
1409   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1410                                ch->tail_sent,
1411                                crm);
1412   if (GNUNET_NO == ch->reliable)
1413   {
1414     GNUNET_free (crm->data_message);
1415     GNUNET_free (crm);
1416     ch->pending_messages--;
1417     send_ack_to_client (ch,
1418                         (NULL == ch->owner)
1419                         ? GNUNET_NO
1420                         : GNUNET_YES);
1421     return;
1422   }
1423   if (0 == crm->retry_delay.rel_value_us)
1424     crm->retry_delay = ch->expected_delay;
1425   else
1426     crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1427   crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1428                                                MIN_RTT_DELAY);
1429   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1430
1431   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1432                                       cmp_crm_by_next_retry,
1433                                       NULL,
1434                                       ch->head_sent,
1435                                       ch->tail_sent,
1436                                       crm);
1437   LOG (GNUNET_ERROR_TYPE_DEBUG,
1438        "Message %u sent, next transmission on %s in %s\n",
1439        (unsigned int) ntohl (crm->data_message->mid.mid),
1440        GCCH_2s (ch),
1441        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1442                                                GNUNET_YES));
1443   if (crm == ch->head_sent)
1444   {
1445     /* We are the new head, need to reschedule retry task */
1446     if (NULL != ch->retry_data_task)
1447       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1448     ch->retry_data_task
1449       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1450                                  &retry_transmission,
1451                                  ch);
1452   }
1453 }
1454
1455
1456 /**
1457  * Handle data given by a client.
1458  *
1459  * Check whether the client is allowed to send in this tunnel, save if
1460  * channel is reliable and send an ACK to the client if there is still
1461  * buffer space in the tunnel.
1462  *
1463  * @param ch Channel.
1464  * @param sender_ccn ccn of the sender
1465  * @param buf payload to transmit.
1466  * @param buf_len number of bytes in @a buf
1467  * @return #GNUNET_OK if everything goes well,
1468  *         #GNUNET_SYSERR in case of an error.
1469  */
1470 int
1471 GCCH_handle_local_data (struct CadetChannel *ch,
1472                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1473                         const char *buf,
1474                         size_t buf_len)
1475 {
1476   struct CadetReliableMessage *crm;
1477
1478   if (ch->pending_messages > ch->max_pending_messages)
1479   {
1480     GNUNET_break (0);
1481     return GNUNET_SYSERR;
1482   }
1483   ch->pending_messages++;
1484
1485   if (GNUNET_YES == ch->is_loopback)
1486   {
1487     struct CadetChannelClient *receiver;
1488     struct GNUNET_MQ_Envelope *env;
1489     struct GNUNET_CADET_LocalData *ld;
1490     int to_owner;
1491
1492     env = GNUNET_MQ_msg_extra (ld,
1493                                buf_len,
1494                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1495     if (sender_ccn.channel_of_client ==
1496         ch->owner->ccn.channel_of_client)
1497     {
1498       receiver = ch->dest;
1499       to_owner = GNUNET_NO;
1500     }
1501     else
1502     {
1503       GNUNET_assert (sender_ccn.channel_of_client ==
1504                      ch->dest->ccn.channel_of_client);
1505       receiver = ch->owner;
1506       to_owner = GNUNET_YES;
1507     }
1508     ld->ccn = receiver->ccn;
1509     GNUNET_memcpy (&ld[1],
1510                    buf,
1511                    buf_len);
1512     if (GNUNET_YES == receiver->client_ready)
1513     {
1514       GSC_send_to_client (receiver->c,
1515                           env);
1516       send_ack_to_client (ch,
1517                           to_owner);
1518     }
1519     else
1520     {
1521       struct CadetOutOfOrderMessage *oom;
1522
1523       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1524       oom->env = env;
1525       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1526                                         receiver->tail_recv,
1527                                         oom);
1528     }
1529     return GNUNET_OK;
1530   }
1531
1532   /* Everything is correct, send the message. */
1533   crm = GNUNET_malloc (sizeof (*crm));
1534   crm->ch = ch;
1535   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1536                                      + buf_len);
1537   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1538   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1539   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1540   crm->data_message->mid = ch->mid_send;
1541   crm->data_message->ctn = ch->ctn;
1542   GNUNET_memcpy (&crm->data_message[1],
1543                  buf,
1544                  buf_len);
1545   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1546                                ch->tail_sent,
1547                                crm);
1548   LOG (GNUNET_ERROR_TYPE_DEBUG,
1549        "Sending %u bytes from local client to %s with MID %u\n",
1550        buf_len,
1551        GCCH_2s (ch),
1552        ntohl (crm->data_message->mid.mid));
1553   if (NULL != ch->retry_data_task)
1554   {
1555     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1556     ch->retry_data_task = NULL;
1557   }
1558   crm->qe = GCT_send (ch->t,
1559                       &crm->data_message->header,
1560                       &data_sent_cb,
1561                       crm);
1562   GNUNET_assert (NULL == ch->retry_data_task);
1563   return GNUNET_OK;
1564 }
1565
1566
1567 /**
1568  * Handle ACK from client on local channel.  Means the client is ready
1569  * for more data, see if we have any for it.
1570  *
1571  * @param ch channel to destroy
1572  * @param client_ccn ccn of the client sending the ack
1573  */
1574 void
1575 GCCH_handle_local_ack (struct CadetChannel *ch,
1576                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1577 {
1578   struct CadetChannelClient *ccc;
1579   struct CadetOutOfOrderMessage *com;
1580
1581   if ( (NULL != ch->owner) &&
1582        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1583     ccc = ch->owner;
1584   else if ( (NULL != ch->dest) &&
1585             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1586     ccc = ch->dest;
1587   else
1588     GNUNET_assert (0);
1589   ccc->client_ready = GNUNET_YES;
1590   com = ccc->head_recv;
1591   if (NULL == com)
1592   {
1593     LOG (GNUNET_ERROR_TYPE_DEBUG,
1594          "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending on %s)!\n",
1595          GSC_2s (ccc->c),
1596          ntohl (ccc->ccn.channel_of_client),
1597          GCCH_2s (ch));
1598     return; /* none pending */
1599   }
1600   if (GNUNET_YES == ch->is_loopback)
1601   {
1602     int to_owner;
1603
1604     /* Messages are always in-order, just send */
1605     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1606                                  ccc->tail_recv,
1607                                  com);
1608     GSC_send_to_client (ccc->c,
1609                         com->env);
1610     /* Notify sender that we can receive more */
1611     if (ccc->ccn.channel_of_client ==
1612         ch->owner->ccn.channel_of_client)
1613     {
1614       to_owner = GNUNET_NO;
1615     }
1616     else
1617     {
1618       GNUNET_assert (ccc->ccn.channel_of_client ==
1619                      ch->dest->ccn.channel_of_client);
1620       to_owner = GNUNET_YES;
1621     }
1622     send_ack_to_client (ch,
1623                         to_owner);
1624     GNUNET_free (com);
1625     return;
1626   }
1627
1628   if ( (com->mid.mid != ch->mid_recv.mid) &&
1629        (GNUNET_NO == ch->out_of_order) )
1630   {
1631     LOG (GNUNET_ERROR_TYPE_DEBUG,
1632          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1633          GSC_2s (ccc->c),
1634          ntohl (ccc->ccn.channel_of_client),
1635          ntohl (com->mid.mid),
1636          ntohl (ch->mid_recv.mid));
1637     return; /* missing next one in-order */
1638   }
1639
1640   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1641               "Got LOCAL ACK, passing payload message to %s-%X on %s\n",
1642               GSC_2s (ccc->c),
1643               ntohl (ccc->ccn.channel_of_client),
1644               GCCH_2s (ch));
1645
1646   /* all good, pass next message to client */
1647   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1648                                ccc->tail_recv,
1649                                com);
1650   /* FIXME: if unreliable, this is not aggressive
1651      enough, as it would be OK to have lost some! */
1652   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1653   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1654   ccc->client_ready = GNUNET_NO;
1655   GSC_send_to_client (ccc->c,
1656                       com->env);
1657   GNUNET_free (com);
1658   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1659        (GNUNET_YES == ch->reliable) )
1660   {
1661     /* The next 15 messages were also already received (0xFF), this
1662        suggests that the sender may be blocked on flow control
1663        urgently waiting for an ACK from us. (As we have an inherent
1664        maximum of 64 bits, and 15 is getting too close for comfort.)
1665        So we should send one now. */
1666     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1667                 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1668                 GCCH_2s (ch));
1669     if (GNUNET_YES == ch->reliable)
1670       send_channel_data_ack (ch);
1671   }
1672
1673   if (NULL != ccc->head_recv)
1674     return;
1675   if (GNUNET_NO == ch->destroy)
1676     return;
1677   GCT_send_channel_destroy (ch->t,
1678                             ch->ctn);
1679   channel_destroy (ch);
1680 }
1681
1682
1683 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1684
1685
1686 /**
1687  * Log channel info.
1688  *
1689  * @param ch Channel.
1690  * @param level Debug level to use.
1691  */
1692 void
1693 GCCH_debug (struct CadetChannel *ch,
1694             enum GNUNET_ErrorType level)
1695 {
1696   int do_log;
1697
1698   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1699                                        "cadet-chn",
1700                                        __FILE__, __FUNCTION__, __LINE__);
1701   if (0 == do_log)
1702     return;
1703
1704   if (NULL == ch)
1705   {
1706     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1707     return;
1708   }
1709   LOG2 (level,
1710         "CHN %s:%X (%p)\n",
1711         GCT_2s (ch->t),
1712         ch->ctn,
1713         ch);
1714   if (NULL != ch->owner)
1715   {
1716     LOG2 (level,
1717           "CHN origin %s ready %s local-id: %u\n",
1718           GSC_2s (ch->owner->c),
1719           ch->owner->client_ready ? "YES" : "NO",
1720           ntohl (ch->owner->ccn.channel_of_client));
1721   }
1722   if (NULL != ch->dest)
1723   {
1724     LOG2 (level,
1725           "CHN destination %s ready %s local-id: %u\n",
1726           GSC_2s (ch->dest->c),
1727           ch->dest->client_ready ? "YES" : "NO",
1728           ntohl (ch->dest->ccn.channel_of_client));
1729   }
1730   LOG2 (level,
1731         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1732         ntohl (ch->mid_recv.mid),
1733         (unsigned long long) ch->mid_futures,
1734         ntohl (ch->mid_send.mid));
1735 }
1736
1737
1738
1739 /* end of gnunet-service-cadet-new_channel.c */