fix client-client loopback flow control
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - FIXME: send ACKs back to loopback clients!
29  *
30  * - introduce shutdown so we can have half-closed channels, modify
31  *   destroy to include MID to have FIN-ACK equivalents, etc.
32  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
33  * - check that '0xFFULL' really is sufficient for flow control!
34  * - revisit handling of 'unreliable' traffic!
35  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
36  * - figure out flow control without ACKs (unreliable traffic!)
37  */
38 #include "platform.h"
39 #include "gnunet_util_lib.h"
40 #include "cadet.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-cadet-new.h"
43 #include "gnunet-service-cadet-new_channel.h"
44 #include "gnunet-service-cadet-new_connection.h"
45 #include "gnunet-service-cadet-new_tunnels.h"
46 #include "gnunet-service-cadet-new_peer.h"
47 #include "gnunet-service-cadet-new_paths.h"
48
49 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50
51 /**
52  * How long do we initially wait before retransmitting?
53  */
54 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55
56 /**
57  * How long do we wait before dropping state about incoming
58  * connection to closed port?
59  */
60 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
61
62
63 /**
64  * All the states a connection can be in.
65  */
66 enum CadetChannelState
67 {
68   /**
69    * Uninitialized status, should never appear in operation.
70    */
71   CADET_CHANNEL_NEW,
72
73   /**
74    * Connection create message sent, waiting for ACK.
75    */
76   CADET_CHANNEL_OPEN_SENT,
77
78   /**
79    * Connection confirmed, ready to carry traffic.
80    */
81   CADET_CHANNEL_READY
82 };
83
84
85 /**
86  * Info needed to retry a message in case it gets lost.
87  * Note that we DO use this structure also for unreliable
88  * messages.
89  */
90 struct CadetReliableMessage
91 {
92   /**
93    * Double linked list, FIFO style
94    */
95   struct CadetReliableMessage *next;
96
97   /**
98    * Double linked list, FIFO style
99    */
100   struct CadetReliableMessage *prev;
101
102   /**
103    * Which channel is this message in?
104    */
105   struct CadetChannel *ch;
106
107   /**
108    * Entry in the tunnels queue for this message, NULL if it has left
109    * the tunnel.  Used to cancel transmission in case we receive an
110    * ACK in time.
111    */
112   struct CadetTunnelQueueEntry *qe;
113
114   /**
115    * How soon should we retry if we fail to get an ACK?
116    * Messages in the queue are sorted by this value.
117    */
118   struct GNUNET_TIME_Absolute next_retry;
119
120   /**
121    * How long do we wait for an ACK after transmission?
122    * Use for the back-off calculation.
123    */
124   struct GNUNET_TIME_Relative retry_delay;
125
126   /**
127    * Data message we are trying to send.
128    */
129   struct GNUNET_CADET_ChannelAppDataMessage data_message;
130
131   /* followed by variable-size payload */
132 };
133
134
135 /**
136  * List of received out-of-order data messages.
137  */
138 struct CadetOutOfOrderMessage
139 {
140   /**
141    * Double linked list, FIFO style
142    */
143   struct CadetOutOfOrderMessage *next;
144
145   /**
146    * Double linked list, FIFO style
147    */
148   struct CadetOutOfOrderMessage *prev;
149
150   /**
151    * ID of the message (messages up to this point needed
152    * before we give this one to the client).
153    */
154   struct ChannelMessageIdentifier mid;
155
156   /**
157    * The envelope with the payload of the out-of-order message
158    */
159   struct GNUNET_MQ_Envelope *env;
160
161 };
162
163
164 /**
165  * Client endpoint of a `struct CadetChannel`.  A channel may be a
166  * loopback channel, in which case it has two of these endpoints.
167  * Note that flow control also is required in both directions.
168  */
169 struct CadetChannelClient
170 {
171   /**
172    * Client handle.  Not by itself sufficient to designate
173    * the client endpoint, as the same client handle may
174    * be used for both the owner and the destination, and
175    * we thus also need the channel ID to identify the client.
176    */
177   struct CadetClient *c;
178
179   /**
180    * Head of DLL of messages received out of order or while client was unready.
181    */
182   struct CadetOutOfOrderMessage *head_recv;
183
184   /**
185    * Tail DLL of messages received out of order or while client was unready.
186    */
187   struct CadetOutOfOrderMessage *tail_recv;
188
189   /**
190    * Local tunnel number for this client.
191    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
192    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
193    */
194   struct GNUNET_CADET_ClientChannelNumber ccn;
195
196   /**
197    * Can we send data to the client?
198    */
199   int client_ready;
200
201 };
202
203
204 /**
205  * Struct containing all information regarding a channel to a remote client.
206  */
207 struct CadetChannel
208 {
209   /**
210    * Tunnel this channel is in.
211    */
212   struct CadetTunnel *t;
213
214   /**
215    * Client owner of the tunnel, if any.
216    * (Used if this channel represends the initiating end of the tunnel.)
217    */
218   struct CadetChannelClient *owner;
219
220   /**
221    * Client destination of the tunnel, if any.
222    * (Used if this channel represents the listening end of the tunnel.)
223    */
224   struct CadetChannelClient *dest;
225
226   /**
227    * Last entry in the tunnel's queue relating to control messages
228    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
229    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
230    * transmission in case we receive updated information.
231    */
232   struct CadetTunnelQueueEntry *last_control_qe;
233
234   /**
235    * Head of DLL of messages sent and not yet ACK'd.
236    */
237   struct CadetReliableMessage *head_sent;
238
239   /**
240    * Tail of DLL of messages sent and not yet ACK'd.
241    */
242   struct CadetReliableMessage *tail_sent;
243
244   /**
245    * Task to resend/poll in case no ACK is received.
246    */
247   struct GNUNET_SCHEDULER_Task *retry_control_task;
248
249   /**
250    * Task to resend/poll in case no ACK is received.
251    */
252   struct GNUNET_SCHEDULER_Task *retry_data_task;
253
254   /**
255    * Last time the channel was used
256    */
257   struct GNUNET_TIME_Absolute timestamp;
258
259   /**
260    * Destination port of the channel.
261    */
262   struct GNUNET_HashCode port;
263
264   /**
265    * Counter for exponential backoff.
266    */
267   struct GNUNET_TIME_Relative retry_time;
268
269   /**
270    * How long does it usually take to get an ACK.
271    */
272   struct GNUNET_TIME_Relative expected_delay;
273
274   /**
275    * Bitfield of already-received messages past @e mid_recv.
276    */
277   uint64_t mid_futures;
278
279   /**
280    * Next MID expected for incoming traffic.
281    */
282   struct ChannelMessageIdentifier mid_recv;
283
284   /**
285    * Next MID to use for outgoing traffic.
286    */
287   struct ChannelMessageIdentifier mid_send;
288
289   /**
290    * Total (reliable) messages pending ACK for this channel.
291    */
292   unsigned int pending_messages;
293
294   /**
295    * Maximum (reliable) messages pending ACK for this channel
296    * before we throttle the client.
297    */
298   unsigned int max_pending_messages;
299
300   /**
301    * Number identifying this channel in its tunnel.
302    */
303   struct GNUNET_CADET_ChannelTunnelNumber ctn;
304
305   /**
306    * Channel state.
307    */
308   enum CadetChannelState state;
309
310   /**
311    * Is the tunnel bufferless (minimum latency)?
312    */
313   int nobuffer;
314
315   /**
316    * Is the tunnel reliable?
317    */
318   int reliable;
319
320   /**
321    * Is the tunnel out-of-order?
322    */
323   int out_of_order;
324
325   /**
326    * Is this channel a loopback channel, where the destination is us again?
327    */
328   int is_loopback;
329
330   /**
331    * Flag to signal the destruction of the channel.  If this is set to
332    * #GNUNET_YES the channel will be destroyed once the queue is
333    * empty.
334    */
335   int destroy;
336
337 };
338
339
340 /**
341  * Get the static string for identification of the channel.
342  *
343  * @param ch Channel.
344  *
345  * @return Static string with the channel IDs.
346  */
347 const char *
348 GCCH_2s (const struct CadetChannel *ch)
349 {
350   static char buf[128];
351
352   GNUNET_snprintf (buf,
353                    sizeof (buf),
354                    "Channel %s:%s ctn:%X(%X/%X)",
355                    (GNUNET_YES == ch->is_loopback)
356                    ? "loopback"
357                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
358                    GNUNET_h2s (&ch->port),
359                    ch->ctn,
360                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
361                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
362   return buf;
363 }
364
365
366 /**
367  * Get the channel's public ID.
368  *
369  * @param ch Channel.
370  *
371  * @return ID used to identify the channel with the remote peer.
372  */
373 struct GNUNET_CADET_ChannelTunnelNumber
374 GCCH_get_id (const struct CadetChannel *ch)
375 {
376   return ch->ctn;
377 }
378
379
380 /**
381  * Release memory associated with @a ccc
382  *
383  * @param ccc data structure to clean up
384  */
385 static void
386 free_channel_client (struct CadetChannelClient *ccc)
387 {
388   struct CadetOutOfOrderMessage *com;
389
390   while (NULL != (com = ccc->head_recv))
391   {
392     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
393                                  ccc->tail_recv,
394                                  com);
395     GNUNET_MQ_discard (com->env);
396     GNUNET_free (com);
397   }
398   GNUNET_free (ccc);
399 }
400
401
402 /**
403  * Destroy the given channel.
404  *
405  * @param ch channel to destroy
406  */
407 static void
408 channel_destroy (struct CadetChannel *ch)
409 {
410   struct CadetReliableMessage *crm;
411
412   while (NULL != (crm = ch->head_sent))
413   {
414     GNUNET_assert (ch == crm->ch);
415     if (NULL != crm->qe)
416     {
417       GCT_send_cancel (crm->qe);
418       crm->qe = NULL;
419     }
420     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
421                                  ch->tail_sent,
422                                  crm);
423     GNUNET_free (crm);
424   }
425   if (NULL != ch->owner)
426   {
427     free_channel_client (ch->owner);
428     ch->owner = NULL;
429   }
430   if (NULL != ch->dest)
431   {
432     free_channel_client (ch->dest);
433     ch->dest = NULL;
434   }
435   if (NULL != ch->last_control_qe)
436   {
437     GCT_send_cancel (ch->last_control_qe);
438     ch->last_control_qe = NULL;
439   }
440   if (NULL != ch->retry_data_task)
441   {
442     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
443     ch->retry_data_task = NULL;
444   }
445   if (NULL != ch->retry_control_task)
446   {
447     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
448     ch->retry_control_task = NULL;
449   }
450   if (GNUNET_NO == ch->is_loopback)
451   {
452     GCT_remove_channel (ch->t,
453                         ch,
454                         ch->ctn);
455     ch->t = NULL;
456   }
457   GNUNET_free (ch);
458 }
459
460
461 /**
462  * Send a channel create message.
463  *
464  * @param cls Channel for which to send.
465  */
466 static void
467 send_channel_open (void *cls);
468
469
470 /**
471  * Function called once the tunnel confirms that we sent the
472  * create message.  Delays for a bit until we retry.
473  *
474  * @param cls our `struct CadetChannel`.
475  */
476 static void
477 channel_open_sent_cb (void *cls)
478 {
479   struct CadetChannel *ch = cls;
480
481   GNUNET_assert (NULL != ch->last_control_qe);
482   ch->last_control_qe = NULL;
483   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
484   LOG (GNUNET_ERROR_TYPE_DEBUG,
485        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
486        GCCH_2s (ch),
487        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
488                                                GNUNET_YES));
489   ch->retry_control_task
490     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
491                                     &send_channel_open,
492                                     ch);
493 }
494
495
496 /**
497  * Send a channel open message.
498  *
499  * @param cls Channel for which to send.
500  */
501 static void
502 send_channel_open (void *cls)
503 {
504   struct CadetChannel *ch = cls;
505   struct GNUNET_CADET_ChannelOpenMessage msgcc;
506   uint32_t options;
507
508   ch->retry_control_task = NULL;
509   LOG (GNUNET_ERROR_TYPE_DEBUG,
510        "Sending CHANNEL_OPEN message for %s\n",
511        GCCH_2s (ch));
512   options = 0;
513   if (ch->nobuffer)
514     options |= GNUNET_CADET_OPTION_NOBUFFER;
515   if (ch->reliable)
516     options |= GNUNET_CADET_OPTION_RELIABLE;
517   if (ch->out_of_order)
518     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
519   msgcc.header.size = htons (sizeof (msgcc));
520   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
521   msgcc.opt = htonl (options);
522   msgcc.port = ch->port;
523   msgcc.ctn = ch->ctn;
524   ch->state = CADET_CHANNEL_OPEN_SENT;
525   ch->last_control_qe = GCT_send (ch->t,
526                                   &msgcc.header,
527                                   &channel_open_sent_cb,
528                                   ch);
529 }
530
531
532 /**
533  * Function called once and only once after a channel was bound
534  * to its tunnel via #GCT_add_channel() is ready for transmission.
535  * Note that this is only the case for channels that this peer
536  * initiates, as for incoming channels we assume that they are
537  * ready for transmission immediately upon receiving the open
538  * message.  Used to bootstrap the #GCT_send() process.
539  *
540  * @param ch the channel for which the tunnel is now ready
541  */
542 void
543 GCCH_tunnel_up (struct CadetChannel *ch)
544 {
545   GNUNET_assert (NULL == ch->retry_control_task);
546   LOG (GNUNET_ERROR_TYPE_DEBUG,
547        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
548        GCCH_2s (ch));
549   ch->retry_control_task
550     = GNUNET_SCHEDULER_add_now (&send_channel_open,
551                                 ch);
552 }
553
554
555 /**
556  * Create a new channel.
557  *
558  * @param owner local client owning the channel
559  * @param ccn local number of this channel at the @a owner
560  * @param destination peer to which we should build the channel
561  * @param port desired port at @a destination
562  * @param options options for the channel
563  * @return handle to the new channel
564  */
565 struct CadetChannel *
566 GCCH_channel_local_new (struct CadetClient *owner,
567                         struct GNUNET_CADET_ClientChannelNumber ccn,
568                         struct CadetPeer *destination,
569                         const struct GNUNET_HashCode *port,
570                         uint32_t options)
571 {
572   struct CadetChannel *ch;
573   struct CadetChannelClient *ccco;
574
575   ccco = GNUNET_new (struct CadetChannelClient);
576   ccco->c = owner;
577   ccco->ccn = ccn;
578   ccco->client_ready = GNUNET_YES;
579
580   ch = GNUNET_new (struct CadetChannel);
581   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
582   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
583   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
584   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
585   ch->owner = ccco;
586   ch->port = *port;
587   if (0 == memcmp (&my_full_id,
588                    GCP_get_id (destination),
589                    sizeof (struct GNUNET_PeerIdentity)))
590   {
591     struct CadetClient *c;
592
593     ch->is_loopback = GNUNET_YES;
594     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
595                                            port);
596     if (NULL == c)
597     {
598       /* port closed, wait for it to possibly open */
599       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
600                                                 port,
601                                                 ch,
602                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
603       LOG (GNUNET_ERROR_TYPE_DEBUG,
604            "Created loose incoming loopback channel to port %s\n",
605            GNUNET_h2s (&ch->port));
606     }
607     else
608     {
609       ch->dest = GNUNET_new (struct CadetChannelClient);
610       ch->dest->c = c;
611       ch->dest->client_ready = GNUNET_YES;
612       GCCH_bind (ch,
613                  ch->dest->c);
614     }
615   }
616   else
617   {
618     ch->t = GCP_get_tunnel (destination,
619                             GNUNET_YES);
620     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
621     ch->ctn = GCT_add_channel (ch->t,
622                                ch);
623   }
624   GNUNET_STATISTICS_update (stats,
625                             "# channels",
626                             1,
627                             GNUNET_NO);
628   LOG (GNUNET_ERROR_TYPE_DEBUG,
629        "Created channel to port %s at peer %s for %s using %s\n",
630        GNUNET_h2s (port),
631        GCP_2s (destination),
632        GSC_2s (owner),
633        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
634   return ch;
635 }
636
637
638 /**
639  * We had an incoming channel to a port that is closed.
640  * It has not been opened for a while, drop it.
641  *
642  * @param cls the channel to drop
643  */
644 static void
645 timeout_closed_cb (void *cls)
646 {
647   struct CadetChannel *ch = cls;
648
649   ch->retry_control_task = NULL;
650   LOG (GNUNET_ERROR_TYPE_DEBUG,
651        "Closing incoming channel to port %s from peer %s due to timeout\n",
652        GNUNET_h2s (&ch->port),
653        GCP_2s (GCT_get_destination (ch->t)));
654   channel_destroy (ch);
655 }
656
657
658 /**
659  * Create a new channel based on a request coming in over the network.
660  *
661  * @param t tunnel to the remote peer
662  * @param ctn identifier of this channel in the tunnel
663  * @param port desired local port
664  * @param options options for the channel
665  * @return handle to the new channel
666  */
667 struct CadetChannel *
668 GCCH_channel_incoming_new (struct CadetTunnel *t,
669                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
670                            const struct GNUNET_HashCode *port,
671                            uint32_t options)
672 {
673   struct CadetChannel *ch;
674   struct CadetClient *c;
675
676   ch = GNUNET_new (struct CadetChannel);
677   ch->port = *port;
678   ch->t = t;
679   ch->ctn = ctn;
680   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
681   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
682   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
683   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
684   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
685   GNUNET_STATISTICS_update (stats,
686                             "# channels",
687                             1,
688                             GNUNET_NO);
689
690   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
691                                          port);
692   if (NULL == c)
693   {
694     /* port closed, wait for it to possibly open */
695     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
696                                               port,
697                                               ch,
698                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
699     ch->retry_control_task
700       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
701                                       &timeout_closed_cb,
702                                       ch);
703     LOG (GNUNET_ERROR_TYPE_DEBUG,
704          "Created loose incoming channel to port %s from peer %s\n",
705          GNUNET_h2s (&ch->port),
706          GCP_2s (GCT_get_destination (ch->t)));
707   }
708   else
709   {
710     GCCH_bind (ch,
711                c);
712   }
713   GNUNET_STATISTICS_update (stats,
714                             "# channels",
715                             1,
716                             GNUNET_NO);
717   return ch;
718 }
719
720
721 /**
722  * Function called once the tunnel confirms that we sent the
723  * ACK message.  Just remembers it was sent, we do not expect
724  * ACKs for ACKs ;-).
725  *
726  * @param cls our `struct CadetChannel`.
727  */
728 static void
729 send_ack_cb (void *cls)
730 {
731   struct CadetChannel *ch = cls;
732
733   GNUNET_assert (NULL != ch->last_control_qe);
734   ch->last_control_qe = NULL;
735 }
736
737
738 /**
739  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
740  *
741  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
742  */
743 static void
744 send_channel_data_ack (struct CadetChannel *ch)
745 {
746   struct GNUNET_CADET_ChannelDataAckMessage msg;
747
748   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
749   msg.header.size = htons (sizeof (msg));
750   msg.ctn = ch->ctn;
751   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
752   msg.futures = GNUNET_htonll (ch->mid_futures);
753   if (NULL != ch->last_control_qe)
754     GCT_send_cancel (ch->last_control_qe);
755   ch->last_control_qe = GCT_send (ch->t,
756                                   &msg.header,
757                                   &send_ack_cb,
758                                   ch);
759 }
760
761
762 /**
763  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
764  * connection is up.
765  *
766  * @param cls the `struct CadetChannel`
767  */
768 static void
769 send_open_ack (void *cls)
770 {
771   struct CadetChannel *ch = cls;
772   struct GNUNET_CADET_ChannelManageMessage msg;
773
774   LOG (GNUNET_ERROR_TYPE_DEBUG,
775        "Sending CHANNEL_OPEN_ACK on %s\n",
776        GCCH_2s (ch));
777   ch->retry_control_task = NULL;
778   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
779   msg.header.size = htons (sizeof (msg));
780   msg.reserved = htonl (0);
781   msg.ctn = ch->ctn;
782   if (NULL != ch->last_control_qe)
783     GCT_send_cancel (ch->last_control_qe);
784   ch->last_control_qe = GCT_send (ch->t,
785                                   &msg.header,
786                                   &send_ack_cb,
787                                   ch);
788 }
789
790
791 /**
792  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
793  * this channel.  If the binding was successful, (re)transmit the
794  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
795  *
796  * @param ch channel that got the duplicate open
797  */
798 void
799 GCCH_handle_duplicate_open (struct CadetChannel *ch)
800 {
801   if (NULL == ch->dest)
802   {
803     LOG (GNUNET_ERROR_TYPE_DEBUG,
804          "Ignoring duplicate channel OPEN on %s: port is closed\n",
805          GCCH_2s (ch));
806     return;
807   }
808   if (NULL != ch->retry_control_task)
809   {
810     LOG (GNUNET_ERROR_TYPE_DEBUG,
811          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
812          GCCH_2s (ch));
813     return;
814   }
815   LOG (GNUNET_ERROR_TYPE_DEBUG,
816        "Retransmitting OPEN_ACK on %s\n",
817        GCCH_2s (ch));
818   ch->retry_control_task
819     = GNUNET_SCHEDULER_add_now (&send_open_ack,
820                                 ch);
821 }
822
823
824 /**
825  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
826  *
827  * @param ch channel the ack is for
828  * @param to_owner #GNUNET_YES to send to owner,
829  *                 #GNUNET_NO to send to dest
830  */
831 static void
832 send_ack_to_client (struct CadetChannel *ch,
833                     int to_owner)
834 {
835   struct GNUNET_MQ_Envelope *env;
836   struct GNUNET_CADET_LocalAck *ack;
837   struct CadetChannelClient *ccc;
838
839   env = GNUNET_MQ_msg (ack,
840                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
841   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
842   ack->ccn = ccc->ccn;
843   LOG (GNUNET_ERROR_TYPE_DEBUG,
844        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n",
845        GSC_2s (ccc->c),
846        (GNUNET_YES == to_owner) ? "owner" : "dest",
847        ntohl (ack->ccn.channel_of_client));
848   GSC_send_to_client (ccc->c,
849                       env);
850 }
851
852
853 /**
854  * A client is bound to the port that we have a channel
855  * open to.  Send the acknowledgement for the connection
856  * request and establish the link with the client.
857  *
858  * @param ch open incoming channel
859  * @param c client listening on the respective port
860  */
861 void
862 GCCH_bind (struct CadetChannel *ch,
863            struct CadetClient *c)
864 {
865   uint32_t options;
866   struct CadetChannelClient *cccd;
867
868   LOG (GNUNET_ERROR_TYPE_DEBUG,
869        "Binding %s from %s to port %s of %s\n",
870        GCCH_2s (ch),
871        GCT_2s (ch->t),
872        GNUNET_h2s (&ch->port),
873        GSC_2s (c));
874   if (NULL != ch->retry_control_task)
875   {
876     /* there might be a timeout task here */
877     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
878     ch->retry_control_task = NULL;
879   }
880   options = 0;
881   if (ch->nobuffer)
882     options |= GNUNET_CADET_OPTION_NOBUFFER;
883   if (ch->reliable)
884     options |= GNUNET_CADET_OPTION_RELIABLE;
885   if (ch->out_of_order)
886     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
887   cccd = GNUNET_new (struct CadetChannelClient);
888   ch->dest = cccd;
889   cccd->c = c;
890   cccd->client_ready = GNUNET_YES;
891   cccd->ccn = GSC_bind (c,
892                         ch,
893                         (GNUNET_YES == ch->is_loopback)
894                         ? GCP_get (&my_full_id,
895                                    GNUNET_YES)
896                         : GCT_get_destination (ch->t),
897                         &ch->port,
898                         options);
899   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
900                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
901   ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
902   if (GNUNET_YES == ch->is_loopback)
903   {
904     ch->state = CADET_CHANNEL_OPEN_SENT;
905     GCCH_handle_channel_open_ack (ch);
906   }
907   else
908   {
909     /* notify other peer that we accepted the connection */
910     ch->retry_control_task
911       = GNUNET_SCHEDULER_add_now (&send_open_ack,
912                                   ch);
913   }
914   /* give client it's initial supply of ACKs */
915   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
916                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
917   for (unsigned int i=0;i<ch->max_pending_messages;i++)
918     send_ack_to_client (ch,
919                         GNUNET_NO);
920 }
921
922
923 /**
924  * Destroy locally created channel.  Called by the local client, so no
925  * need to tell the client.
926  *
927  * @param ch channel to destroy
928  * @param c client that caused the destruction
929  * @param ccn client number of the client @a c
930  */
931 void
932 GCCH_channel_local_destroy (struct CadetChannel *ch,
933                             struct CadetClient *c,
934                             struct GNUNET_CADET_ClientChannelNumber ccn)
935 {
936   LOG (GNUNET_ERROR_TYPE_DEBUG,
937        "%s asks for destruction of %s\n",
938        GSC_2s (c),
939        GCCH_2s (ch));
940   GNUNET_assert (NULL != c);
941   if ( (NULL != ch->owner) &&
942        (c == ch->owner->c) &&
943        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
944   {
945     free_channel_client (ch->owner);
946     ch->owner = NULL;
947   }
948   else if ( (NULL != ch->dest) &&
949             (c == ch->dest->c) &&
950             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
951   {
952     free_channel_client (ch->dest);
953     ch->dest = NULL;
954   }
955   else
956   {
957     GNUNET_assert (0);
958   }
959
960   if (GNUNET_YES == ch->destroy)
961   {
962     /* other end already destroyed, with the local client gone, no need
963        to finish transmissions, just destroy immediately. */
964     channel_destroy (ch);
965     return;
966   }
967   if ( (NULL != ch->head_sent) ||
968        (NULL != ch->owner) ||
969        (NULL != ch->dest) )
970   {
971     /* Wait for other end to destroy us as well,
972        and otherwise allow send queue to be transmitted first */
973     ch->destroy = GNUNET_YES;
974     return;
975   }
976   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
977   if (CADET_CHANNEL_NEW != ch->state)
978     GCT_send_channel_destroy (ch->t,
979                               ch->ctn);
980   /* Nothing left to do, just finish destruction */
981   channel_destroy (ch);
982 }
983
984
985 /**
986  * We got an acknowledgement for the creation of the channel
987  * (the port is open on the other side). Begin transmissions.
988  *
989  * @param ch channel to destroy
990  */
991 void
992 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
993 {
994   switch (ch->state)
995   {
996   case CADET_CHANNEL_NEW:
997     /* this should be impossible */
998     GNUNET_break (0);
999     break;
1000   case CADET_CHANNEL_OPEN_SENT:
1001     if (NULL == ch->owner)
1002     {
1003       /* We're not the owner, wrong direction! */
1004       GNUNET_break_op (0);
1005       return;
1006     }
1007     LOG (GNUNET_ERROR_TYPE_DEBUG,
1008          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1009          GCCH_2s (ch));
1010     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1011     {
1012       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1013       ch->retry_control_task = NULL;
1014     }
1015     ch->state = CADET_CHANNEL_READY;
1016     /* On first connect, send client as many ACKs as we allow messages
1017        to be buffered! */
1018     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1019       send_ack_to_client (ch,
1020                           GNUNET_YES);
1021     break;
1022   case CADET_CHANNEL_READY:
1023     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1024     LOG (GNUNET_ERROR_TYPE_DEBUG,
1025          "Received duplicate channel OPEN_ACK for %s\n",
1026          GCCH_2s (ch));
1027     GNUNET_STATISTICS_update (stats,
1028                               "# duplicate CREATE_ACKs",
1029                               1,
1030                               GNUNET_NO);
1031     break;
1032   }
1033 }
1034
1035
1036 /**
1037  * Test if element @a e1 comes before element @a e2.
1038  *
1039  * TODO: use opportunity to create generic list insertion sort
1040  * logic in container!
1041  *
1042  * @param cls closure, our `struct CadetChannel`
1043  * @param e1 an element of to sort
1044  * @param e2 another element to sort
1045  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1046  */
1047 static int
1048 is_before (void *cls,
1049            void *e1,
1050            void *e2)
1051 {
1052   struct CadetOutOfOrderMessage *m1 = e1;
1053   struct CadetOutOfOrderMessage *m2 = e2;
1054   uint32_t v1 = ntohl (m1->mid.mid);
1055   uint32_t v2 = ntohl (m2->mid.mid);
1056   uint32_t delta;
1057
1058   delta = v1 - v2;
1059   if (delta > (uint32_t) INT_MAX)
1060   {
1061     /* in overflow range, we can safely assume we wrapped around */
1062     return GNUNET_NO;
1063   }
1064   else
1065   {
1066     return GNUNET_YES;
1067   }
1068 }
1069
1070
1071 /**
1072  * We got payload data for a channel.  Pass it on to the client
1073  * and send an ACK to the other end (once flow control allows it!)
1074  *
1075  * @param ch channel that got data
1076  * @param msg message that was received
1077  */
1078 void
1079 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1080                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1081 {
1082   struct GNUNET_MQ_Envelope *env;
1083   struct GNUNET_CADET_LocalData *ld;
1084   struct CadetChannelClient *ccc;
1085   struct CadetOutOfOrderMessage *com;
1086   size_t payload_size;
1087
1088   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1089   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1090   env = GNUNET_MQ_msg_extra (ld,
1091                              payload_size,
1092                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1093   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1094   GNUNET_memcpy (&ld[1],
1095                  &msg[1],
1096                  payload_size);
1097   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1098   if ( (GNUNET_YES == ccc->client_ready) &&
1099        ( (GNUNET_YES == ch->out_of_order) ||
1100          (msg->mid.mid == ch->mid_recv.mid) ) )
1101   {
1102     LOG (GNUNET_ERROR_TYPE_DEBUG,
1103          "Giving %u bytes of payload from %s to client %s\n",
1104          (unsigned int) payload_size,
1105          GCCH_2s (ch),
1106          GSC_2s (ccc->c));
1107     ccc->client_ready = GNUNET_NO;
1108     GSC_send_to_client (ccc->c,
1109                         env);
1110     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1111     ch->mid_futures >>= 1;
1112   }
1113   else
1114   {
1115     /* FIXME-SECURITY: if the element is WAY too far ahead,
1116        drop it (can't buffer too much!) */
1117     LOG (GNUNET_ERROR_TYPE_DEBUG,
1118          "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n",
1119          (GNUNET_YES == ccc->client_ready)
1120          ? "out-of-order"
1121          : "client-not-ready",
1122          (unsigned int) payload_size,
1123          GCCH_2s (ch),
1124          ntohl (msg->mid.mid),
1125          ntohl (ch->mid_recv.mid));
1126
1127     com = GNUNET_new (struct CadetOutOfOrderMessage);
1128     com->mid = msg->mid;
1129     com->env = env;
1130     /* sort into list ordered by "is_before" */
1131     if ( (NULL == ccc->head_recv) ||
1132          (GNUNET_YES == is_before (ch,
1133                                    com,
1134                                    ccc->head_recv)) )
1135     {
1136       GNUNET_CONTAINER_DLL_insert (ccc->head_recv,
1137                                    ccc->tail_recv,
1138                                    com);
1139     }
1140     else
1141     {
1142       struct CadetOutOfOrderMessage *pos;
1143
1144       for (pos = ccc->head_recv;
1145            NULL != pos;
1146            pos = pos->next)
1147       {
1148         if (GNUNET_YES !=
1149             is_before (NULL,
1150                        pos,
1151                        com))
1152           break;
1153       }
1154       if (NULL == pos)
1155         GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv,
1156                                           ccc->tail_recv,
1157                                           com);
1158       else
1159         GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv,
1160                                            ccc->tail_recv,
1161                                            com,
1162                                            pos->prev);
1163     }
1164   }
1165 }
1166
1167
1168 /**
1169  * We got an acknowledgement for payload data for a channel.
1170  * Possibly resume transmissions.
1171  *
1172  * @param ch channel that got the ack
1173  * @param ack details about what was received
1174  */
1175 void
1176 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1177                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1178 {
1179   struct CadetReliableMessage *crm;
1180
1181   GNUNET_break (GNUNET_NO == ch->is_loopback);
1182   if (GNUNET_NO == ch->reliable)
1183   {
1184     /* not expecting ACKs on unreliable channel, odd */
1185     GNUNET_break_op (0);
1186     return;
1187   }
1188   for (crm = ch->head_sent;
1189         NULL != crm;
1190        crm = crm->next)
1191     if (ack->mid.mid == crm->data_message.mid.mid)
1192       break;
1193   if (NULL == crm)
1194   {
1195     /* ACK for message we already dropped, might have been a
1196        duplicate ACK? Ignore. */
1197     LOG (GNUNET_ERROR_TYPE_DEBUG,
1198          "Duplicate DATA_ACK on %s, ignoring\n",
1199          GCCH_2s (ch));
1200     GNUNET_STATISTICS_update (stats,
1201                               "# duplicate DATA_ACKs",
1202                               1,
1203                               GNUNET_NO);
1204     return;
1205   }
1206   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1207                                ch->tail_sent,
1208                                crm);
1209   ch->pending_messages--;
1210   send_ack_to_client (ch,
1211                       (NULL == ch->owner)
1212                       ? GNUNET_NO
1213                       : GNUNET_YES);
1214   GNUNET_free (crm);
1215   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1216   LOG (GNUNET_ERROR_TYPE_DEBUG,
1217        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1218        GCCH_2s (ch),
1219        (unsigned int) ntohl (ack->mid.mid),
1220        ch->pending_messages);
1221   send_ack_to_client (ch,
1222                       (NULL == ch->owner)
1223                       ? GNUNET_NO
1224                       : GNUNET_YES);
1225 }
1226
1227
1228 /**
1229  * Destroy channel, based on the other peer closing the
1230  * connection.  Also needs to remove this channel from
1231  * the tunnel.
1232  *
1233  * @param ch channel to destroy
1234  */
1235 void
1236 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1237 {
1238   struct CadetChannelClient *ccc;
1239
1240   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1241   LOG (GNUNET_ERROR_TYPE_DEBUG,
1242        "Received remote channel DESTROY for %s\n",
1243        GCCH_2s (ch));
1244   if (GNUNET_YES == ch->destroy)
1245   {
1246     /* Local client already gone, this is instant-death. */
1247     channel_destroy (ch);
1248     return;
1249   }
1250   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1251   if (NULL != ccc->head_recv)
1252   {
1253     LOG (GNUNET_ERROR_TYPE_WARNING,
1254          "Lost end of transmission due to remote shutdown on %s\n",
1255          GCCH_2s (ch));
1256     /* FIXME: change API to notify client about truncated transmission! */
1257   }
1258   ch->destroy = GNUNET_YES;
1259   GSC_handle_remote_channel_destroy (ccc->c,
1260                                      ccc->ccn,
1261                                      ch);
1262   channel_destroy (ch);
1263 }
1264
1265
1266 /**
1267  * Function called once the tunnel has sent one of our messages.
1268  * If the message is unreliable, simply frees the `crm`. If the
1269  * message was reliable, calculate retransmission time and
1270  * wait for ACK (or retransmit).
1271  *
1272  * @param cls the `struct CadetReliableMessage` that was sent
1273  */
1274 static void
1275 data_sent_cb (void *cls);
1276
1277
1278 /**
1279  * We need to retry a transmission, the last one took too long to
1280  * be acknowledged.
1281  *
1282  * @param cls the `struct CadetChannel` where we need to retransmit
1283  */
1284 static void
1285 retry_transmission (void *cls)
1286 {
1287   struct CadetChannel *ch = cls;
1288   struct CadetReliableMessage *crm = ch->head_sent;
1289
1290   ch->retry_data_task = NULL;
1291   GNUNET_assert (NULL == crm->qe);
1292   crm->qe = GCT_send (ch->t,
1293                       &crm->data_message.header,
1294                       &data_sent_cb,
1295                       crm);
1296 }
1297
1298
1299 /**
1300  * Function called once the tunnel has sent one of our messages.
1301  * If the message is unreliable, simply frees the `crm`. If the
1302  * message was reliable, calculate retransmission time and
1303  * wait for ACK (or retransmit).
1304  *
1305  * @param cls the `struct CadetReliableMessage` that was sent
1306  */
1307 static void
1308 data_sent_cb (void *cls)
1309 {
1310   struct CadetReliableMessage *crm = cls;
1311   struct CadetChannel *ch = crm->ch;
1312   struct CadetReliableMessage *off;
1313
1314   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1315   crm->qe = NULL;
1316   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1317                                ch->tail_sent,
1318                                crm);
1319   if (GNUNET_NO == ch->reliable)
1320   {
1321     GNUNET_free (crm);
1322     ch->pending_messages--;
1323     send_ack_to_client (ch,
1324                         (NULL == ch->owner)
1325                         ? GNUNET_NO
1326                         : GNUNET_YES);
1327     return;
1328   }
1329   if (0 == crm->retry_delay.rel_value_us)
1330     crm->retry_delay = ch->expected_delay;
1331   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1332
1333   /* find position for re-insertion into the DLL */
1334   if ( (NULL == ch->head_sent) ||
1335        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1336   {
1337     /* insert at HEAD, also (re)schedule retry task! */
1338     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1339                                  ch->tail_sent,
1340                                  crm);
1341     if (NULL != ch->retry_data_task)
1342       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1343     ch->retry_data_task
1344       = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1345                                       &retry_transmission,
1346                                       ch);
1347     return;
1348   }
1349   for (off = ch->head_sent; NULL != off; off = off->next)
1350     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1351       break;
1352   if (NULL == off)
1353   {
1354     /* insert at tail */
1355     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1356                                       ch->tail_sent,
1357                                       crm);
1358   }
1359   else
1360   {
1361     /* insert before off */
1362     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1363                                        ch->tail_sent,
1364                                        off->prev,
1365                                        crm);
1366   }
1367 }
1368
1369
1370 /**
1371  * Handle data given by a client.
1372  *
1373  * Check whether the client is allowed to send in this tunnel, save if
1374  * channel is reliable and send an ACK to the client if there is still
1375  * buffer space in the tunnel.
1376  *
1377  * @param ch Channel.
1378  * @param sender_ccn ccn of the sender
1379  * @param buf payload to transmit.
1380  * @param buf_len number of bytes in @a buf
1381  * @return #GNUNET_OK if everything goes well,
1382  *         #GNUNET_SYSERR in case of an error.
1383  */
1384 int
1385 GCCH_handle_local_data (struct CadetChannel *ch,
1386                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1387                         const char *buf,
1388                         size_t buf_len)
1389 {
1390   struct CadetReliableMessage *crm;
1391
1392   if (ch->pending_messages > ch->max_pending_messages)
1393   {
1394     GNUNET_break (0);
1395     return GNUNET_SYSERR;
1396   }
1397   ch->pending_messages++;
1398
1399   if (GNUNET_YES == ch->is_loopback)
1400   {
1401     struct CadetChannelClient *receiver;
1402     struct GNUNET_MQ_Envelope *env;
1403     struct GNUNET_CADET_LocalData *ld;
1404     int to_owner;
1405
1406     env = GNUNET_MQ_msg_extra (ld,
1407                                buf_len,
1408                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1409     if (sender_ccn.channel_of_client ==
1410         ch->owner->ccn.channel_of_client)
1411     {
1412       receiver = ch->dest;
1413       to_owner = GNUNET_NO;
1414     }
1415     else
1416     {
1417       GNUNET_assert (sender_ccn.channel_of_client ==
1418                      ch->dest->ccn.channel_of_client);
1419       receiver = ch->owner;
1420       to_owner = GNUNET_YES;
1421     }
1422     ld->ccn = receiver->ccn;
1423     GNUNET_memcpy (&ld[1],
1424                    buf,
1425                    buf_len);
1426     /* FIXME: this does not provide for flow control! */
1427     GSC_send_to_client (receiver->c,
1428                         env);
1429     send_ack_to_client (ch,
1430                         to_owner);
1431     return GNUNET_OK;
1432   }
1433
1434   /* Everything is correct, send the message. */
1435   crm = GNUNET_malloc (sizeof (*crm) + buf_len);
1436   crm->ch = ch;
1437   crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1438   crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1439   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1440   crm->data_message.mid = ch->mid_send;
1441   crm->data_message.ctn = ch->ctn;
1442   GNUNET_memcpy (&crm[1],
1443                  buf,
1444                  buf_len);
1445   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1446                                ch->tail_sent,
1447                                crm);
1448   LOG (GNUNET_ERROR_TYPE_DEBUG,
1449        "Sending %u bytes from local client to %s\n",
1450        buf_len,
1451        GCCH_2s (ch));
1452   crm->qe = GCT_send (ch->t,
1453                       &crm->data_message.header,
1454                       &data_sent_cb,
1455                       crm);
1456   return GNUNET_OK;
1457 }
1458
1459
1460 /**
1461  * Handle ACK from client on local channel.  Means the client is ready
1462  * for more data, see if we have any for it.
1463  *
1464  * @param ch channel to destroy
1465  * @param client_ccn ccn of the client sending the ack
1466  */
1467 void
1468 GCCH_handle_local_ack (struct CadetChannel *ch,
1469                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1470 {
1471   struct CadetChannelClient *ccc;
1472   struct CadetOutOfOrderMessage *com;
1473
1474   if ( (NULL != ch->owner) &&
1475        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1476     ccc = ch->owner;
1477   else if ( (NULL != ch->dest) &&
1478             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1479     ccc = ch->dest;
1480   else
1481     GNUNET_assert (0);
1482   ccc->client_ready = GNUNET_YES;
1483   LOG (GNUNET_ERROR_TYPE_DEBUG,
1484        "Got LOCAL_ACK, client ready to receive more data!\n");
1485   com = ccc->head_recv;
1486   if (NULL == com)
1487     return; /* none pending */
1488   if ( (com->mid.mid != ch->mid_recv.mid) &&
1489        (GNUNET_NO == ch->out_of_order) )
1490     return; /* missing next one in-order */
1491
1492   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493               "Passing payload message to client on %s\n",
1494               GCCH_2s (ch));
1495
1496   /* all good, pass next message to client */
1497   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1498                                ccc->tail_recv,
1499                                com);
1500   /* FIXME: if unreliable, this is not aggressive
1501      enough, as it would be OK to have lost some! */
1502   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1503   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1504   ccc->client_ready = GNUNET_NO;
1505   GSC_send_to_client (ccc->c,
1506                       com->env);
1507   GNUNET_free (com);
1508   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1509        (GNUNET_YES == ch->reliable) )
1510   {
1511     /* The next 15 messages were also already received (0xFF), this
1512        suggests that the sender may be blocked on flow control
1513        urgently waiting for an ACK from us. (As we have an inherent
1514        maximum of 64 bits, and 15 is getting too close for comfort.)
1515        So we should send one now. */
1516     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1517                 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1518                 GCCH_2s (ch));
1519     if (GNUNET_YES == ch->reliable)
1520       send_channel_data_ack (ch);
1521   }
1522
1523   if (NULL != ccc->head_recv)
1524     return;
1525   if (GNUNET_NO == ch->destroy)
1526     return;
1527   GCT_send_channel_destroy (ch->t,
1528                             ch->ctn);
1529   channel_destroy (ch);
1530 }
1531
1532
1533 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1534
1535
1536 /**
1537  * Log channel info.
1538  *
1539  * @param ch Channel.
1540  * @param level Debug level to use.
1541  */
1542 void
1543 GCCH_debug (struct CadetChannel *ch,
1544             enum GNUNET_ErrorType level)
1545 {
1546   int do_log;
1547
1548   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1549                                        "cadet-chn",
1550                                        __FILE__, __FUNCTION__, __LINE__);
1551   if (0 == do_log)
1552     return;
1553
1554   if (NULL == ch)
1555   {
1556     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1557     return;
1558   }
1559   LOG2 (level,
1560         "CHN %s:%X (%p)\n",
1561         GCT_2s (ch->t),
1562         ch->ctn,
1563         ch);
1564   if (NULL != ch->owner)
1565   {
1566     LOG2 (level,
1567           "CHN origin %s ready %s local-id: %u\n",
1568           GSC_2s (ch->owner->c),
1569           ch->owner->client_ready ? "YES" : "NO",
1570           ntohl (ch->owner->ccn.channel_of_client));
1571   }
1572   if (NULL != ch->dest)
1573   {
1574     LOG2 (level,
1575           "CHN destination %s ready %s local-id: %u\n",
1576           GSC_2s (ch->dest->c),
1577           ch->dest->client_ready ? "YES" : "NO",
1578           ntohl (ch->dest->ccn.channel_of_client));
1579   }
1580   LOG2 (level,
1581         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1582         ntohl (ch->mid_recv.mid),
1583         (unsigned long long) ch->mid_futures,
1584         ntohl (ch->mid_send.mid));
1585 }
1586
1587
1588
1589 /* end of gnunet-service-cadet-new_channel.c */