fix double-transmission scheduling, implement client-client loopback flow control
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - FIXME: send ACKs back to loopback clients!
29  *
30  * - introduce shutdown so we can have half-closed channels, modify
31  *   destroy to include MID to have FIN-ACK equivalents, etc.
32  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
33  * - check that '0xFFULL' really is sufficient for flow control!
34  * - revisit handling of 'unreliable' traffic!
35  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
36  * - figure out flow control without ACKs (unreliable traffic!)
37  */
38 #include "platform.h"
39 #include "gnunet_util_lib.h"
40 #include "cadet.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-cadet-new.h"
43 #include "gnunet-service-cadet-new_channel.h"
44 #include "gnunet-service-cadet-new_connection.h"
45 #include "gnunet-service-cadet-new_tunnels.h"
46 #include "gnunet-service-cadet-new_peer.h"
47 #include "gnunet-service-cadet-new_paths.h"
48
49 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50
51 /**
52  * How long do we initially wait before retransmitting?
53  */
54 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55
56 /**
57  * How long do we wait before dropping state about incoming
58  * connection to closed port?
59  */
60 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
61
62
63 /**
64  * All the states a connection can be in.
65  */
66 enum CadetChannelState
67 {
68   /**
69    * Uninitialized status, should never appear in operation.
70    */
71   CADET_CHANNEL_NEW,
72
73   /**
74    * Connection create message sent, waiting for ACK.
75    */
76   CADET_CHANNEL_OPEN_SENT,
77
78   /**
79    * Connection confirmed, ready to carry traffic.
80    */
81   CADET_CHANNEL_READY
82 };
83
84
85 /**
86  * Info needed to retry a message in case it gets lost.
87  * Note that we DO use this structure also for unreliable
88  * messages.
89  */
90 struct CadetReliableMessage
91 {
92   /**
93    * Double linked list, FIFO style
94    */
95   struct CadetReliableMessage *next;
96
97   /**
98    * Double linked list, FIFO style
99    */
100   struct CadetReliableMessage *prev;
101
102   /**
103    * Which channel is this message in?
104    */
105   struct CadetChannel *ch;
106
107   /**
108    * Entry in the tunnels queue for this message, NULL if it has left
109    * the tunnel.  Used to cancel transmission in case we receive an
110    * ACK in time.
111    */
112   struct CadetTunnelQueueEntry *qe;
113
114   /**
115    * How soon should we retry if we fail to get an ACK?
116    * Messages in the queue are sorted by this value.
117    */
118   struct GNUNET_TIME_Absolute next_retry;
119
120   /**
121    * How long do we wait for an ACK after transmission?
122    * Use for the back-off calculation.
123    */
124   struct GNUNET_TIME_Relative retry_delay;
125
126   /**
127    * Data message we are trying to send.
128    */
129   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
130
131 };
132
133
134 /**
135  * List of received out-of-order data messages.
136  */
137 struct CadetOutOfOrderMessage
138 {
139   /**
140    * Double linked list, FIFO style
141    */
142   struct CadetOutOfOrderMessage *next;
143
144   /**
145    * Double linked list, FIFO style
146    */
147   struct CadetOutOfOrderMessage *prev;
148
149   /**
150    * ID of the message (messages up to this point needed
151    * before we give this one to the client).
152    */
153   struct ChannelMessageIdentifier mid;
154
155   /**
156    * The envelope with the payload of the out-of-order message
157    */
158   struct GNUNET_MQ_Envelope *env;
159
160 };
161
162
163 /**
164  * Client endpoint of a `struct CadetChannel`.  A channel may be a
165  * loopback channel, in which case it has two of these endpoints.
166  * Note that flow control also is required in both directions.
167  */
168 struct CadetChannelClient
169 {
170   /**
171    * Client handle.  Not by itself sufficient to designate
172    * the client endpoint, as the same client handle may
173    * be used for both the owner and the destination, and
174    * we thus also need the channel ID to identify the client.
175    */
176   struct CadetClient *c;
177
178   /**
179    * Head of DLL of messages received out of order or while client was unready.
180    */
181   struct CadetOutOfOrderMessage *head_recv;
182
183   /**
184    * Tail DLL of messages received out of order or while client was unready.
185    */
186   struct CadetOutOfOrderMessage *tail_recv;
187
188   /**
189    * Local tunnel number for this client.
190    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
191    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
192    */
193   struct GNUNET_CADET_ClientChannelNumber ccn;
194
195   /**
196    * Can we send data to the client?
197    */
198   int client_ready;
199
200 };
201
202
203 /**
204  * Struct containing all information regarding a channel to a remote client.
205  */
206 struct CadetChannel
207 {
208   /**
209    * Tunnel this channel is in.
210    */
211   struct CadetTunnel *t;
212
213   /**
214    * Client owner of the tunnel, if any.
215    * (Used if this channel represends the initiating end of the tunnel.)
216    */
217   struct CadetChannelClient *owner;
218
219   /**
220    * Client destination of the tunnel, if any.
221    * (Used if this channel represents the listening end of the tunnel.)
222    */
223   struct CadetChannelClient *dest;
224
225   /**
226    * Last entry in the tunnel's queue relating to control messages
227    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
228    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
229    * transmission in case we receive updated information.
230    */
231   struct CadetTunnelQueueEntry *last_control_qe;
232
233   /**
234    * Head of DLL of messages sent and not yet ACK'd.
235    */
236   struct CadetReliableMessage *head_sent;
237
238   /**
239    * Tail of DLL of messages sent and not yet ACK'd.
240    */
241   struct CadetReliableMessage *tail_sent;
242
243   /**
244    * Task to resend/poll in case no ACK is received.
245    */
246   struct GNUNET_SCHEDULER_Task *retry_control_task;
247
248   /**
249    * Task to resend/poll in case no ACK is received.
250    */
251   struct GNUNET_SCHEDULER_Task *retry_data_task;
252
253   /**
254    * Last time the channel was used
255    */
256   struct GNUNET_TIME_Absolute timestamp;
257
258   /**
259    * Destination port of the channel.
260    */
261   struct GNUNET_HashCode port;
262
263   /**
264    * Counter for exponential backoff.
265    */
266   struct GNUNET_TIME_Relative retry_time;
267
268   /**
269    * How long does it usually take to get an ACK.
270    */
271   struct GNUNET_TIME_Relative expected_delay;
272
273   /**
274    * Bitfield of already-received messages past @e mid_recv.
275    */
276   uint64_t mid_futures;
277
278   /**
279    * Next MID expected for incoming traffic.
280    */
281   struct ChannelMessageIdentifier mid_recv;
282
283   /**
284    * Next MID to use for outgoing traffic.
285    */
286   struct ChannelMessageIdentifier mid_send;
287
288   /**
289    * Total (reliable) messages pending ACK for this channel.
290    */
291   unsigned int pending_messages;
292
293   /**
294    * Maximum (reliable) messages pending ACK for this channel
295    * before we throttle the client.
296    */
297   unsigned int max_pending_messages;
298
299   /**
300    * Number identifying this channel in its tunnel.
301    */
302   struct GNUNET_CADET_ChannelTunnelNumber ctn;
303
304   /**
305    * Channel state.
306    */
307   enum CadetChannelState state;
308
309   /**
310    * Is the tunnel bufferless (minimum latency)?
311    */
312   int nobuffer;
313
314   /**
315    * Is the tunnel reliable?
316    */
317   int reliable;
318
319   /**
320    * Is the tunnel out-of-order?
321    */
322   int out_of_order;
323
324   /**
325    * Is this channel a loopback channel, where the destination is us again?
326    */
327   int is_loopback;
328
329   /**
330    * Flag to signal the destruction of the channel.  If this is set to
331    * #GNUNET_YES the channel will be destroyed once the queue is
332    * empty.
333    */
334   int destroy;
335
336 };
337
338
339 /**
340  * Get the static string for identification of the channel.
341  *
342  * @param ch Channel.
343  *
344  * @return Static string with the channel IDs.
345  */
346 const char *
347 GCCH_2s (const struct CadetChannel *ch)
348 {
349   static char buf[128];
350
351   GNUNET_snprintf (buf,
352                    sizeof (buf),
353                    "Channel %s:%s ctn:%X(%X/%X)",
354                    (GNUNET_YES == ch->is_loopback)
355                    ? "loopback"
356                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
357                    GNUNET_h2s (&ch->port),
358                    ch->ctn,
359                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
360                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
361   return buf;
362 }
363
364
365 /**
366  * Get the channel's public ID.
367  *
368  * @param ch Channel.
369  *
370  * @return ID used to identify the channel with the remote peer.
371  */
372 struct GNUNET_CADET_ChannelTunnelNumber
373 GCCH_get_id (const struct CadetChannel *ch)
374 {
375   return ch->ctn;
376 }
377
378
379 /**
380  * Release memory associated with @a ccc
381  *
382  * @param ccc data structure to clean up
383  */
384 static void
385 free_channel_client (struct CadetChannelClient *ccc)
386 {
387   struct CadetOutOfOrderMessage *com;
388
389   while (NULL != (com = ccc->head_recv))
390   {
391     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
392                                  ccc->tail_recv,
393                                  com);
394     GNUNET_MQ_discard (com->env);
395     GNUNET_free (com);
396   }
397   GNUNET_free (ccc);
398 }
399
400
401 /**
402  * Destroy the given channel.
403  *
404  * @param ch channel to destroy
405  */
406 static void
407 channel_destroy (struct CadetChannel *ch)
408 {
409   struct CadetReliableMessage *crm;
410
411   while (NULL != (crm = ch->head_sent))
412   {
413     GNUNET_assert (ch == crm->ch);
414     if (NULL != crm->qe)
415     {
416       GCT_send_cancel (crm->qe);
417       crm->qe = NULL;
418     }
419     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
420                                  ch->tail_sent,
421                                  crm);
422     GNUNET_free (crm->data_message);
423     GNUNET_free (crm);
424   }
425   if (NULL != ch->owner)
426   {
427     free_channel_client (ch->owner);
428     ch->owner = NULL;
429   }
430   if (NULL != ch->dest)
431   {
432     free_channel_client (ch->dest);
433     ch->dest = NULL;
434   }
435   if (NULL != ch->last_control_qe)
436   {
437     GCT_send_cancel (ch->last_control_qe);
438     ch->last_control_qe = NULL;
439   }
440   if (NULL != ch->retry_data_task)
441   {
442     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
443     ch->retry_data_task = NULL;
444   }
445   if (NULL != ch->retry_control_task)
446   {
447     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
448     ch->retry_control_task = NULL;
449   }
450   if (GNUNET_NO == ch->is_loopback)
451   {
452     GCT_remove_channel (ch->t,
453                         ch,
454                         ch->ctn);
455     ch->t = NULL;
456   }
457   GNUNET_free (ch);
458 }
459
460
461 /**
462  * Send a channel create message.
463  *
464  * @param cls Channel for which to send.
465  */
466 static void
467 send_channel_open (void *cls);
468
469
470 /**
471  * Function called once the tunnel confirms that we sent the
472  * create message.  Delays for a bit until we retry.
473  *
474  * @param cls our `struct CadetChannel`.
475  */
476 static void
477 channel_open_sent_cb (void *cls)
478 {
479   struct CadetChannel *ch = cls;
480
481   GNUNET_assert (NULL != ch->last_control_qe);
482   ch->last_control_qe = NULL;
483   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
484   LOG (GNUNET_ERROR_TYPE_DEBUG,
485        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
486        GCCH_2s (ch),
487        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
488                                                GNUNET_YES));
489   ch->retry_control_task
490     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
491                                     &send_channel_open,
492                                     ch);
493 }
494
495
496 /**
497  * Send a channel open message.
498  *
499  * @param cls Channel for which to send.
500  */
501 static void
502 send_channel_open (void *cls)
503 {
504   struct CadetChannel *ch = cls;
505   struct GNUNET_CADET_ChannelOpenMessage msgcc;
506   uint32_t options;
507
508   ch->retry_control_task = NULL;
509   LOG (GNUNET_ERROR_TYPE_DEBUG,
510        "Sending CHANNEL_OPEN message for %s\n",
511        GCCH_2s (ch));
512   options = 0;
513   if (ch->nobuffer)
514     options |= GNUNET_CADET_OPTION_NOBUFFER;
515   if (ch->reliable)
516     options |= GNUNET_CADET_OPTION_RELIABLE;
517   if (ch->out_of_order)
518     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
519   msgcc.header.size = htons (sizeof (msgcc));
520   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
521   msgcc.opt = htonl (options);
522   msgcc.port = ch->port;
523   msgcc.ctn = ch->ctn;
524   ch->state = CADET_CHANNEL_OPEN_SENT;
525   ch->last_control_qe = GCT_send (ch->t,
526                                   &msgcc.header,
527                                   &channel_open_sent_cb,
528                                   ch);
529   GNUNET_assert (NULL == ch->retry_control_task);
530 }
531
532
533 /**
534  * Function called once and only once after a channel was bound
535  * to its tunnel via #GCT_add_channel() is ready for transmission.
536  * Note that this is only the case for channels that this peer
537  * initiates, as for incoming channels we assume that they are
538  * ready for transmission immediately upon receiving the open
539  * message.  Used to bootstrap the #GCT_send() process.
540  *
541  * @param ch the channel for which the tunnel is now ready
542  */
543 void
544 GCCH_tunnel_up (struct CadetChannel *ch)
545 {
546   GNUNET_assert (NULL == ch->retry_control_task);
547   LOG (GNUNET_ERROR_TYPE_DEBUG,
548        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
549        GCCH_2s (ch));
550   ch->retry_control_task
551     = GNUNET_SCHEDULER_add_now (&send_channel_open,
552                                 ch);
553 }
554
555
556 /**
557  * Create a new channel.
558  *
559  * @param owner local client owning the channel
560  * @param ccn local number of this channel at the @a owner
561  * @param destination peer to which we should build the channel
562  * @param port desired port at @a destination
563  * @param options options for the channel
564  * @return handle to the new channel
565  */
566 struct CadetChannel *
567 GCCH_channel_local_new (struct CadetClient *owner,
568                         struct GNUNET_CADET_ClientChannelNumber ccn,
569                         struct CadetPeer *destination,
570                         const struct GNUNET_HashCode *port,
571                         uint32_t options)
572 {
573   struct CadetChannel *ch;
574   struct CadetChannelClient *ccco;
575
576   ccco = GNUNET_new (struct CadetChannelClient);
577   ccco->c = owner;
578   ccco->ccn = ccn;
579   ccco->client_ready = GNUNET_YES;
580
581   ch = GNUNET_new (struct CadetChannel);
582   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
583   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
584   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
585   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
586   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
587   ch->owner = ccco;
588   ch->port = *port;
589   if (0 == memcmp (&my_full_id,
590                    GCP_get_id (destination),
591                    sizeof (struct GNUNET_PeerIdentity)))
592   {
593     struct CadetClient *c;
594
595     ch->is_loopback = GNUNET_YES;
596     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
597                                            port);
598     if (NULL == c)
599     {
600       /* port closed, wait for it to possibly open */
601       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
602                                                 port,
603                                                 ch,
604                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
605       LOG (GNUNET_ERROR_TYPE_DEBUG,
606            "Created loose incoming loopback channel to port %s\n",
607            GNUNET_h2s (&ch->port));
608     }
609     else
610     {
611       ch->dest = GNUNET_new (struct CadetChannelClient);
612       ch->dest->c = c;
613       ch->dest->client_ready = GNUNET_YES;
614       GCCH_bind (ch,
615                  ch->dest->c);
616     }
617   }
618   else
619   {
620     ch->t = GCP_get_tunnel (destination,
621                             GNUNET_YES);
622     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
623     ch->ctn = GCT_add_channel (ch->t,
624                                ch);
625   }
626   GNUNET_STATISTICS_update (stats,
627                             "# channels",
628                             1,
629                             GNUNET_NO);
630   LOG (GNUNET_ERROR_TYPE_DEBUG,
631        "Created channel to port %s at peer %s for %s using %s\n",
632        GNUNET_h2s (port),
633        GCP_2s (destination),
634        GSC_2s (owner),
635        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
636   return ch;
637 }
638
639
640 /**
641  * We had an incoming channel to a port that is closed.
642  * It has not been opened for a while, drop it.
643  *
644  * @param cls the channel to drop
645  */
646 static void
647 timeout_closed_cb (void *cls)
648 {
649   struct CadetChannel *ch = cls;
650
651   ch->retry_control_task = NULL;
652   LOG (GNUNET_ERROR_TYPE_DEBUG,
653        "Closing incoming channel to port %s from peer %s due to timeout\n",
654        GNUNET_h2s (&ch->port),
655        GCP_2s (GCT_get_destination (ch->t)));
656   channel_destroy (ch);
657 }
658
659
660 /**
661  * Create a new channel based on a request coming in over the network.
662  *
663  * @param t tunnel to the remote peer
664  * @param ctn identifier of this channel in the tunnel
665  * @param port desired local port
666  * @param options options for the channel
667  * @return handle to the new channel
668  */
669 struct CadetChannel *
670 GCCH_channel_incoming_new (struct CadetTunnel *t,
671                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
672                            const struct GNUNET_HashCode *port,
673                            uint32_t options)
674 {
675   struct CadetChannel *ch;
676   struct CadetClient *c;
677
678   ch = GNUNET_new (struct CadetChannel);
679   ch->port = *port;
680   ch->t = t;
681   ch->ctn = ctn;
682   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
683   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
684   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
685   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
686   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
687   GNUNET_STATISTICS_update (stats,
688                             "# channels",
689                             1,
690                             GNUNET_NO);
691
692   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
693                                          port);
694   if (NULL == c)
695   {
696     /* port closed, wait for it to possibly open */
697     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
698                                               port,
699                                               ch,
700                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
701     ch->retry_control_task
702       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
703                                       &timeout_closed_cb,
704                                       ch);
705     LOG (GNUNET_ERROR_TYPE_DEBUG,
706          "Created loose incoming channel to port %s from peer %s\n",
707          GNUNET_h2s (&ch->port),
708          GCP_2s (GCT_get_destination (ch->t)));
709   }
710   else
711   {
712     GCCH_bind (ch,
713                c);
714   }
715   GNUNET_STATISTICS_update (stats,
716                             "# channels",
717                             1,
718                             GNUNET_NO);
719   return ch;
720 }
721
722
723 /**
724  * Function called once the tunnel confirms that we sent the
725  * ACK message.  Just remembers it was sent, we do not expect
726  * ACKs for ACKs ;-).
727  *
728  * @param cls our `struct CadetChannel`.
729  */
730 static void
731 send_ack_cb (void *cls)
732 {
733   struct CadetChannel *ch = cls;
734
735   GNUNET_assert (NULL != ch->last_control_qe);
736   ch->last_control_qe = NULL;
737 }
738
739
740 /**
741  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
742  *
743  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
744  */
745 static void
746 send_channel_data_ack (struct CadetChannel *ch)
747 {
748   struct GNUNET_CADET_ChannelDataAckMessage msg;
749
750   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
751   msg.header.size = htons (sizeof (msg));
752   msg.ctn = ch->ctn;
753   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
754   msg.futures = GNUNET_htonll (ch->mid_futures);
755   if (NULL != ch->last_control_qe)
756     GCT_send_cancel (ch->last_control_qe);
757   ch->last_control_qe = GCT_send (ch->t,
758                                   &msg.header,
759                                   &send_ack_cb,
760                                   ch);
761 }
762
763
764 /**
765  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
766  * connection is up.
767  *
768  * @param cls the `struct CadetChannel`
769  */
770 static void
771 send_open_ack (void *cls)
772 {
773   struct CadetChannel *ch = cls;
774   struct GNUNET_CADET_ChannelManageMessage msg;
775
776   LOG (GNUNET_ERROR_TYPE_DEBUG,
777        "Sending CHANNEL_OPEN_ACK on %s\n",
778        GCCH_2s (ch));
779   ch->retry_control_task = NULL;
780   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
781   msg.header.size = htons (sizeof (msg));
782   msg.reserved = htonl (0);
783   msg.ctn = ch->ctn;
784   if (NULL != ch->last_control_qe)
785     GCT_send_cancel (ch->last_control_qe);
786   ch->last_control_qe = GCT_send (ch->t,
787                                   &msg.header,
788                                   &send_ack_cb,
789                                   ch);
790 }
791
792
793 /**
794  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
795  * this channel.  If the binding was successful, (re)transmit the
796  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
797  *
798  * @param ch channel that got the duplicate open
799  */
800 void
801 GCCH_handle_duplicate_open (struct CadetChannel *ch)
802 {
803   if (NULL == ch->dest)
804   {
805     LOG (GNUNET_ERROR_TYPE_DEBUG,
806          "Ignoring duplicate channel OPEN on %s: port is closed\n",
807          GCCH_2s (ch));
808     return;
809   }
810   if (NULL != ch->retry_control_task)
811   {
812     LOG (GNUNET_ERROR_TYPE_DEBUG,
813          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
814          GCCH_2s (ch));
815     return;
816   }
817   LOG (GNUNET_ERROR_TYPE_DEBUG,
818        "Retransmitting OPEN_ACK on %s\n",
819        GCCH_2s (ch));
820   ch->retry_control_task
821     = GNUNET_SCHEDULER_add_now (&send_open_ack,
822                                 ch);
823 }
824
825
826 /**
827  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
828  *
829  * @param ch channel the ack is for
830  * @param to_owner #GNUNET_YES to send to owner,
831  *                 #GNUNET_NO to send to dest
832  */
833 static void
834 send_ack_to_client (struct CadetChannel *ch,
835                     int to_owner)
836 {
837   struct GNUNET_MQ_Envelope *env;
838   struct GNUNET_CADET_LocalAck *ack;
839   struct CadetChannelClient *ccc;
840
841   env = GNUNET_MQ_msg (ack,
842                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
843   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
844   ack->ccn = ccc->ccn;
845   LOG (GNUNET_ERROR_TYPE_DEBUG,
846        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
847        GSC_2s (ccc->c),
848        (GNUNET_YES == to_owner) ? "owner" : "dest",
849        ntohl (ack->ccn.channel_of_client),
850        ch->pending_messages,
851        ch->max_pending_messages);
852   GSC_send_to_client (ccc->c,
853                       env);
854 }
855
856
857 /**
858  * A client is bound to the port that we have a channel
859  * open to.  Send the acknowledgement for the connection
860  * request and establish the link with the client.
861  *
862  * @param ch open incoming channel
863  * @param c client listening on the respective port
864  */
865 void
866 GCCH_bind (struct CadetChannel *ch,
867            struct CadetClient *c)
868 {
869   uint32_t options;
870   struct CadetChannelClient *cccd;
871
872   LOG (GNUNET_ERROR_TYPE_DEBUG,
873        "Binding %s from %s to port %s of %s\n",
874        GCCH_2s (ch),
875        GCT_2s (ch->t),
876        GNUNET_h2s (&ch->port),
877        GSC_2s (c));
878   if (NULL != ch->retry_control_task)
879   {
880     /* there might be a timeout task here */
881     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
882     ch->retry_control_task = NULL;
883   }
884   options = 0;
885   if (ch->nobuffer)
886     options |= GNUNET_CADET_OPTION_NOBUFFER;
887   if (ch->reliable)
888     options |= GNUNET_CADET_OPTION_RELIABLE;
889   if (ch->out_of_order)
890     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
891   cccd = GNUNET_new (struct CadetChannelClient);
892   ch->dest = cccd;
893   cccd->c = c;
894   cccd->client_ready = GNUNET_YES;
895   cccd->ccn = GSC_bind (c,
896                         ch,
897                         (GNUNET_YES == ch->is_loopback)
898                         ? GCP_get (&my_full_id,
899                                    GNUNET_YES)
900                         : GCT_get_destination (ch->t),
901                         &ch->port,
902                         options);
903   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
904                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
905   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
906   if (GNUNET_YES == ch->is_loopback)
907   {
908     ch->state = CADET_CHANNEL_OPEN_SENT;
909     GCCH_handle_channel_open_ack (ch);
910   }
911   else
912   {
913     /* notify other peer that we accepted the connection */
914     ch->retry_control_task
915       = GNUNET_SCHEDULER_add_now (&send_open_ack,
916                                   ch);
917   }
918   /* give client it's initial supply of ACKs */
919   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
920                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
921   for (unsigned int i=0;i<ch->max_pending_messages;i++)
922     send_ack_to_client (ch,
923                         GNUNET_NO);
924 }
925
926
927 /**
928  * Destroy locally created channel.  Called by the local client, so no
929  * need to tell the client.
930  *
931  * @param ch channel to destroy
932  * @param c client that caused the destruction
933  * @param ccn client number of the client @a c
934  */
935 void
936 GCCH_channel_local_destroy (struct CadetChannel *ch,
937                             struct CadetClient *c,
938                             struct GNUNET_CADET_ClientChannelNumber ccn)
939 {
940   LOG (GNUNET_ERROR_TYPE_DEBUG,
941        "%s asks for destruction of %s\n",
942        GSC_2s (c),
943        GCCH_2s (ch));
944   GNUNET_assert (NULL != c);
945   if ( (NULL != ch->owner) &&
946        (c == ch->owner->c) &&
947        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
948   {
949     free_channel_client (ch->owner);
950     ch->owner = NULL;
951   }
952   else if ( (NULL != ch->dest) &&
953             (c == ch->dest->c) &&
954             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
955   {
956     free_channel_client (ch->dest);
957     ch->dest = NULL;
958   }
959   else
960   {
961     GNUNET_assert (0);
962   }
963
964   if (GNUNET_YES == ch->destroy)
965   {
966     /* other end already destroyed, with the local client gone, no need
967        to finish transmissions, just destroy immediately. */
968     channel_destroy (ch);
969     return;
970   }
971   if ( (NULL != ch->head_sent) ||
972        (NULL != ch->owner) ||
973        (NULL != ch->dest) )
974   {
975     /* Wait for other end to destroy us as well,
976        and otherwise allow send queue to be transmitted first */
977     ch->destroy = GNUNET_YES;
978     return;
979   }
980   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
981   if (CADET_CHANNEL_NEW != ch->state)
982     GCT_send_channel_destroy (ch->t,
983                               ch->ctn);
984   /* Nothing left to do, just finish destruction */
985   channel_destroy (ch);
986 }
987
988
989 /**
990  * We got an acknowledgement for the creation of the channel
991  * (the port is open on the other side). Begin transmissions.
992  *
993  * @param ch channel to destroy
994  */
995 void
996 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
997 {
998   switch (ch->state)
999   {
1000   case CADET_CHANNEL_NEW:
1001     /* this should be impossible */
1002     GNUNET_break (0);
1003     break;
1004   case CADET_CHANNEL_OPEN_SENT:
1005     if (NULL == ch->owner)
1006     {
1007       /* We're not the owner, wrong direction! */
1008       GNUNET_break_op (0);
1009       return;
1010     }
1011     LOG (GNUNET_ERROR_TYPE_DEBUG,
1012          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1013          GCCH_2s (ch));
1014     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1015     {
1016       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1017       ch->retry_control_task = NULL;
1018     }
1019     ch->state = CADET_CHANNEL_READY;
1020     /* On first connect, send client as many ACKs as we allow messages
1021        to be buffered! */
1022     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1023       send_ack_to_client (ch,
1024                           GNUNET_YES);
1025     break;
1026   case CADET_CHANNEL_READY:
1027     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1028     LOG (GNUNET_ERROR_TYPE_DEBUG,
1029          "Received duplicate channel OPEN_ACK for %s\n",
1030          GCCH_2s (ch));
1031     GNUNET_STATISTICS_update (stats,
1032                               "# duplicate CREATE_ACKs",
1033                               1,
1034                               GNUNET_NO);
1035     break;
1036   }
1037 }
1038
1039
1040 /**
1041  * Test if element @a e1 comes before element @a e2.
1042  *
1043  * @param cls closure, to a flag where we indicate duplicate packets
1044  * @param e1 an element of to sort
1045  * @param e2 another element to sort
1046  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1047  */
1048 static int
1049 is_before (void *cls,
1050            struct CadetOutOfOrderMessage *m1,
1051            struct CadetOutOfOrderMessage *m2)
1052 {
1053   int *duplicate = cls;
1054   uint32_t v1 = ntohl (m1->mid.mid);
1055   uint32_t v2 = ntohl (m2->mid.mid);
1056   uint32_t delta;
1057
1058   delta = v2 - v1;
1059   if (0 == delta)
1060     *duplicate = GNUNET_YES;
1061   if (delta > (uint32_t) INT_MAX)
1062   {
1063     /* in overflow range, we can safely assume we wrapped around */
1064     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1065                 "%u > %u => %p > %p\n",
1066                 (unsigned int) v1,
1067                 (unsigned int) v2,
1068                 m1,
1069                 m2);
1070     return GNUNET_NO;
1071   }
1072   else
1073   {
1074     /* result is small, thus v2 > v1, thus e1 < e2 */
1075     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076                 "%u < %u => %p < %p\n",
1077                 (unsigned int) v1,
1078                 (unsigned int) v2,
1079                 m1,
1080                 m2);
1081     return GNUNET_YES;
1082   }
1083 }
1084
1085
1086 /**
1087  * We got payload data for a channel.  Pass it on to the client
1088  * and send an ACK to the other end (once flow control allows it!)
1089  *
1090  * @param ch channel that got data
1091  * @param msg message that was received
1092  */
1093 void
1094 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1095                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1096 {
1097   struct GNUNET_MQ_Envelope *env;
1098   struct GNUNET_CADET_LocalData *ld;
1099   struct CadetChannelClient *ccc;
1100   size_t payload_size;
1101
1102   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1103   if ( (GNUNET_YES == ch->destroy) &&
1104        (NULL == ch->owner) &&
1105        (NULL == ch->dest) )
1106   {
1107     /* This client is gone, but we still have messages to send to
1108        the other end (which is why @a ch is not yet dead).  However,
1109        we cannot pass messages to our client anymore. */
1110     LOG (GNUNET_ERROR_TYPE_DEBUG,
1111          "Dropping incoming payload on %s as this end is already closed\n",
1112          GCCH_2s (ch));
1113     /* FIXME: send back ACK/NACK/Closed notification
1114        to stop retransmissions! */
1115     return;
1116   }
1117   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1118   env = GNUNET_MQ_msg_extra (ld,
1119                              payload_size,
1120                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1121   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1122   GNUNET_memcpy (&ld[1],
1123                  &msg[1],
1124                  payload_size);
1125   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1126   if ( (GNUNET_YES == ccc->client_ready) &&
1127        ( (GNUNET_YES == ch->out_of_order) ||
1128          (msg->mid.mid == ch->mid_recv.mid) ) )
1129   {
1130     LOG (GNUNET_ERROR_TYPE_DEBUG,
1131          "Giving %u bytes of payload from %s to client %s\n",
1132          (unsigned int) payload_size,
1133          GCCH_2s (ch),
1134          GSC_2s (ccc->c));
1135     ccc->client_ready = GNUNET_NO;
1136     GSC_send_to_client (ccc->c,
1137                         env);
1138     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1139     ch->mid_futures >>= 1;
1140   }
1141   else
1142   {
1143     struct CadetOutOfOrderMessage *com;
1144     int duplicate;
1145
1146     /* FIXME-SECURITY: if the element is WAY too far ahead,
1147        drop it (can't buffer too much!) */
1148
1149     com = GNUNET_new (struct CadetOutOfOrderMessage);
1150     com->mid = msg->mid;
1151     com->env = env;
1152     duplicate = GNUNET_NO;
1153     GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1154                                         is_before,
1155                                         &duplicate,
1156                                         ccc->head_recv,
1157                                         ccc->tail_recv,
1158                                         com);
1159     if (GNUNET_YES == duplicate)
1160     {
1161       LOG (GNUNET_ERROR_TYPE_DEBUG,
1162            "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1163            (unsigned int) payload_size,
1164            GCCH_2s (ch),
1165            ntohl (msg->mid.mid));
1166       GNUNET_STATISTICS_update (stats,
1167                                 "# duplicate DATA",
1168                                 1,
1169                                 GNUNET_NO);
1170       GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1171                                    ccc->tail_recv,
1172                                    com);
1173       GNUNET_MQ_discard (com->env);
1174       GNUNET_free (com);
1175       return;
1176     }
1177     LOG (GNUNET_ERROR_TYPE_DEBUG,
1178          "Queued %s payload of %u bytes on %s (mid %u, need %u first)\n",
1179          (GNUNET_YES == ccc->client_ready)
1180          ? "out-of-order"
1181          : "client-not-ready",
1182          (unsigned int) payload_size,
1183          GCCH_2s (ch),
1184          ntohl (msg->mid.mid),
1185          ntohl (ch->mid_recv.mid));
1186   }
1187 }
1188
1189
1190 /**
1191  * We got an acknowledgement for payload data for a channel.
1192  * Possibly resume transmissions.
1193  *
1194  * @param ch channel that got the ack
1195  * @param ack details about what was received
1196  */
1197 void
1198 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1199                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1200 {
1201   struct CadetReliableMessage *crm;
1202
1203   GNUNET_break (GNUNET_NO == ch->is_loopback);
1204   if (GNUNET_NO == ch->reliable)
1205   {
1206     /* not expecting ACKs on unreliable channel, odd */
1207     GNUNET_break_op (0);
1208     return;
1209   }
1210   for (crm = ch->head_sent;
1211         NULL != crm;
1212        crm = crm->next)
1213     if (ack->mid.mid == crm->data_message->mid.mid)
1214       break;
1215   if (NULL == crm)
1216   {
1217     /* ACK for message we already dropped, might have been a
1218        duplicate ACK? Ignore. */
1219     LOG (GNUNET_ERROR_TYPE_DEBUG,
1220          "Duplicate DATA_ACK on %s, ignoring\n",
1221          GCCH_2s (ch));
1222     GNUNET_STATISTICS_update (stats,
1223                               "# duplicate DATA_ACKs",
1224                               1,
1225                               GNUNET_NO);
1226     return;
1227   }
1228   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1229                                ch->tail_sent,
1230                                crm);
1231   GNUNET_free (crm->data_message);
1232   GNUNET_free (crm);
1233   ch->pending_messages--;
1234   send_ack_to_client (ch,
1235                       (NULL == ch->owner)
1236                       ? GNUNET_NO
1237                       : GNUNET_YES);
1238   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1239   LOG (GNUNET_ERROR_TYPE_DEBUG,
1240        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1241        GCCH_2s (ch),
1242        (unsigned int) ntohl (ack->mid.mid),
1243        ch->pending_messages);
1244   send_ack_to_client (ch,
1245                       (NULL == ch->owner)
1246                       ? GNUNET_NO
1247                       : GNUNET_YES);
1248 }
1249
1250
1251 /**
1252  * Destroy channel, based on the other peer closing the
1253  * connection.  Also needs to remove this channel from
1254  * the tunnel.
1255  *
1256  * @param ch channel to destroy
1257  */
1258 void
1259 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1260 {
1261   struct CadetChannelClient *ccc;
1262
1263   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1264   LOG (GNUNET_ERROR_TYPE_DEBUG,
1265        "Received remote channel DESTROY for %s\n",
1266        GCCH_2s (ch));
1267   if (GNUNET_YES == ch->destroy)
1268   {
1269     /* Local client already gone, this is instant-death. */
1270     channel_destroy (ch);
1271     return;
1272   }
1273   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1274   if (NULL != ccc->head_recv)
1275   {
1276     LOG (GNUNET_ERROR_TYPE_WARNING,
1277          "Lost end of transmission due to remote shutdown on %s\n",
1278          GCCH_2s (ch));
1279     /* FIXME: change API to notify client about truncated transmission! */
1280   }
1281   ch->destroy = GNUNET_YES;
1282   GSC_handle_remote_channel_destroy (ccc->c,
1283                                      ccc->ccn,
1284                                      ch);
1285   channel_destroy (ch);
1286 }
1287
1288
1289 /**
1290  * Function called once the tunnel has sent one of our messages.
1291  * If the message is unreliable, simply frees the `crm`. If the
1292  * message was reliable, calculate retransmission time and
1293  * wait for ACK (or retransmit).
1294  *
1295  * @param cls the `struct CadetReliableMessage` that was sent
1296  */
1297 static void
1298 data_sent_cb (void *cls);
1299
1300
1301 /**
1302  * We need to retry a transmission, the last one took too long to
1303  * be acknowledged.
1304  *
1305  * @param cls the `struct CadetChannel` where we need to retransmit
1306  */
1307 static void
1308 retry_transmission (void *cls)
1309 {
1310   struct CadetChannel *ch = cls;
1311   struct CadetReliableMessage *crm = ch->head_sent;
1312
1313   ch->retry_data_task = NULL;
1314   GNUNET_assert (NULL == crm->qe);
1315   crm->qe = GCT_send (ch->t,
1316                       &crm->data_message->header,
1317                       &data_sent_cb,
1318                       crm);
1319   GNUNET_assert (NULL == ch->retry_data_task);
1320 }
1321
1322
1323 /**
1324  * Function called once the tunnel has sent one of our messages.
1325  * If the message is unreliable, simply frees the `crm`. If the
1326  * message was reliable, calculate retransmission time and
1327  * wait for ACK (or retransmit).
1328  *
1329  * @param cls the `struct CadetReliableMessage` that was sent
1330  */
1331 static void
1332 data_sent_cb (void *cls)
1333 {
1334   struct CadetReliableMessage *crm = cls;
1335   struct CadetChannel *ch = crm->ch;
1336   struct CadetReliableMessage *off;
1337
1338   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1339   GNUNET_assert (NULL != crm->qe);
1340   crm->qe = NULL;
1341   GNUNET_assert (NULL == ch->retry_data_task);
1342   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1343                                ch->tail_sent,
1344                                crm);
1345   if (GNUNET_NO == ch->reliable)
1346   {
1347     GNUNET_free (crm->data_message);
1348     GNUNET_free (crm);
1349     ch->pending_messages--;
1350     send_ack_to_client (ch,
1351                         (NULL == ch->owner)
1352                         ? GNUNET_NO
1353                         : GNUNET_YES);
1354     return;
1355   }
1356   if (0 == crm->retry_delay.rel_value_us)
1357     crm->retry_delay = ch->expected_delay;
1358   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1359
1360   /* find position for re-insertion into the DLL */
1361   if ( (NULL == ch->head_sent) ||
1362        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1363   {
1364     /* insert at HEAD, also (re)schedule retry task! */
1365     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1366                                  ch->tail_sent,
1367                                  crm);
1368     GNUNET_assert (NULL == crm->qe);
1369     ch->retry_data_task
1370       = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1371                                       &retry_transmission,
1372                                       ch);
1373     return;
1374   }
1375   for (off = ch->head_sent; NULL != off; off = off->next)
1376     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1377       break;
1378   if (NULL == off)
1379   {
1380     /* insert at tail */
1381     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1382                                       ch->tail_sent,
1383                                       crm);
1384   }
1385   else
1386   {
1387     /* insert before off */
1388     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1389                                        ch->tail_sent,
1390                                        off->prev,
1391                                        crm);
1392   }
1393 }
1394
1395
1396 /**
1397  * Handle data given by a client.
1398  *
1399  * Check whether the client is allowed to send in this tunnel, save if
1400  * channel is reliable and send an ACK to the client if there is still
1401  * buffer space in the tunnel.
1402  *
1403  * @param ch Channel.
1404  * @param sender_ccn ccn of the sender
1405  * @param buf payload to transmit.
1406  * @param buf_len number of bytes in @a buf
1407  * @return #GNUNET_OK if everything goes well,
1408  *         #GNUNET_SYSERR in case of an error.
1409  */
1410 int
1411 GCCH_handle_local_data (struct CadetChannel *ch,
1412                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1413                         const char *buf,
1414                         size_t buf_len)
1415 {
1416   struct CadetReliableMessage *crm;
1417
1418   if (ch->pending_messages > ch->max_pending_messages)
1419   {
1420     GNUNET_break (0);
1421     return GNUNET_SYSERR;
1422   }
1423   ch->pending_messages++;
1424
1425   if (GNUNET_YES == ch->is_loopback)
1426   {
1427     struct CadetChannelClient *receiver;
1428     struct GNUNET_MQ_Envelope *env;
1429     struct GNUNET_CADET_LocalData *ld;
1430     int to_owner;
1431
1432     env = GNUNET_MQ_msg_extra (ld,
1433                                buf_len,
1434                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1435     if (sender_ccn.channel_of_client ==
1436         ch->owner->ccn.channel_of_client)
1437     {
1438       receiver = ch->dest;
1439       to_owner = GNUNET_NO;
1440     }
1441     else
1442     {
1443       GNUNET_assert (sender_ccn.channel_of_client ==
1444                      ch->dest->ccn.channel_of_client);
1445       receiver = ch->owner;
1446       to_owner = GNUNET_YES;
1447     }
1448     ld->ccn = receiver->ccn;
1449     GNUNET_memcpy (&ld[1],
1450                    buf,
1451                    buf_len);
1452     if (GNUNET_YES == receiver->client_ready)
1453     {
1454       /* FIXME: this does not provide for flow control! */
1455       GSC_send_to_client (receiver->c,
1456                           env);
1457       send_ack_to_client (ch,
1458                           to_owner);
1459     }
1460     else
1461     {
1462       struct CadetOutOfOrderMessage *oom;
1463
1464       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1465       oom->env = env;
1466       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1467                                         receiver->tail_recv,
1468                                         oom);
1469     }
1470     return GNUNET_OK;
1471   }
1472
1473   /* Everything is correct, send the message. */
1474   crm = GNUNET_malloc (sizeof (*crm));
1475   crm->ch = ch;
1476   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1477                                      + buf_len);
1478   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1479   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1480   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1481   crm->data_message->mid = ch->mid_send;
1482   crm->data_message->ctn = ch->ctn;
1483   GNUNET_memcpy (&crm->data_message[1],
1484                  buf,
1485                  buf_len);
1486   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1487                                ch->tail_sent,
1488                                crm);
1489   LOG (GNUNET_ERROR_TYPE_DEBUG,
1490        "Sending %u bytes from local client to %s\n",
1491        buf_len,
1492        GCCH_2s (ch));
1493   if (NULL != ch->retry_data_task)
1494   {
1495     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1496     ch->retry_data_task = NULL;
1497   }
1498   crm->qe = GCT_send (ch->t,
1499                       &crm->data_message->header,
1500                       &data_sent_cb,
1501                       crm);
1502   GNUNET_assert (NULL == ch->retry_data_task);
1503   return GNUNET_OK;
1504 }
1505
1506
1507 /**
1508  * Handle ACK from client on local channel.  Means the client is ready
1509  * for more data, see if we have any for it.
1510  *
1511  * @param ch channel to destroy
1512  * @param client_ccn ccn of the client sending the ack
1513  */
1514 void
1515 GCCH_handle_local_ack (struct CadetChannel *ch,
1516                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1517 {
1518   struct CadetChannelClient *ccc;
1519   struct CadetOutOfOrderMessage *com;
1520
1521   if ( (NULL != ch->owner) &&
1522        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1523     ccc = ch->owner;
1524   else if ( (NULL != ch->dest) &&
1525             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1526     ccc = ch->dest;
1527   else
1528     GNUNET_assert (0);
1529   ccc->client_ready = GNUNET_YES;
1530   com = ccc->head_recv;
1531   if (NULL == com)
1532   {
1533     LOG (GNUNET_ERROR_TYPE_DEBUG,
1534          "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending)!\n",
1535          GSC_2s (ccc->c),
1536          ntohl (ccc->ccn.channel_of_client));
1537     return; /* none pending */
1538   }
1539   if (GNUNET_YES == ch->is_loopback)
1540   {
1541     int to_owner;
1542
1543     /* Messages are always in-order, just send */
1544     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1545                                  ccc->tail_recv,
1546                                  com);
1547     GSC_send_to_client (ccc->c,
1548                         com->env);
1549     /* Notify sender that we can receive more */
1550     if (ccc->ccn.channel_of_client ==
1551         ch->owner->ccn.channel_of_client)
1552     {
1553       to_owner = GNUNET_NO;
1554     }
1555     else
1556     {
1557       GNUNET_assert (ccc->ccn.channel_of_client ==
1558                      ch->dest->ccn.channel_of_client);
1559       to_owner = GNUNET_YES;
1560     }
1561     send_ack_to_client (ch,
1562                         to_owner);
1563     GNUNET_free (com);
1564     return;
1565   }
1566
1567   if ( (com->mid.mid != ch->mid_recv.mid) &&
1568        (GNUNET_NO == ch->out_of_order) )
1569   {
1570     LOG (GNUNET_ERROR_TYPE_DEBUG,
1571          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1572          GSC_2s (ccc->c),
1573          ntohl (ccc->ccn.channel_of_client),
1574          ntohl (com->mid.mid),
1575          ntohl (ch->mid_recv.mid));
1576     return; /* missing next one in-order */
1577   }
1578
1579   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1580               "Got LOCAL ACK, passing payload message to %s-%X on %s\n",
1581               GSC_2s (ccc->c),
1582               ntohl (ccc->ccn.channel_of_client),
1583               GCCH_2s (ch));
1584
1585   /* all good, pass next message to client */
1586   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1587                                ccc->tail_recv,
1588                                com);
1589   /* FIXME: if unreliable, this is not aggressive
1590      enough, as it would be OK to have lost some! */
1591   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1592   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1593   ccc->client_ready = GNUNET_NO;
1594   GSC_send_to_client (ccc->c,
1595                       com->env);
1596   GNUNET_free (com);
1597   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1598        (GNUNET_YES == ch->reliable) )
1599   {
1600     /* The next 15 messages were also already received (0xFF), this
1601        suggests that the sender may be blocked on flow control
1602        urgently waiting for an ACK from us. (As we have an inherent
1603        maximum of 64 bits, and 15 is getting too close for comfort.)
1604        So we should send one now. */
1605     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1606                 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1607                 GCCH_2s (ch));
1608     if (GNUNET_YES == ch->reliable)
1609       send_channel_data_ack (ch);
1610   }
1611
1612   if (NULL != ccc->head_recv)
1613     return;
1614   if (GNUNET_NO == ch->destroy)
1615     return;
1616   GCT_send_channel_destroy (ch->t,
1617                             ch->ctn);
1618   channel_destroy (ch);
1619 }
1620
1621
1622 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1623
1624
1625 /**
1626  * Log channel info.
1627  *
1628  * @param ch Channel.
1629  * @param level Debug level to use.
1630  */
1631 void
1632 GCCH_debug (struct CadetChannel *ch,
1633             enum GNUNET_ErrorType level)
1634 {
1635   int do_log;
1636
1637   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1638                                        "cadet-chn",
1639                                        __FILE__, __FUNCTION__, __LINE__);
1640   if (0 == do_log)
1641     return;
1642
1643   if (NULL == ch)
1644   {
1645     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1646     return;
1647   }
1648   LOG2 (level,
1649         "CHN %s:%X (%p)\n",
1650         GCT_2s (ch->t),
1651         ch->ctn,
1652         ch);
1653   if (NULL != ch->owner)
1654   {
1655     LOG2 (level,
1656           "CHN origin %s ready %s local-id: %u\n",
1657           GSC_2s (ch->owner->c),
1658           ch->owner->client_ready ? "YES" : "NO",
1659           ntohl (ch->owner->ccn.channel_of_client));
1660   }
1661   if (NULL != ch->dest)
1662   {
1663     LOG2 (level,
1664           "CHN destination %s ready %s local-id: %u\n",
1665           GSC_2s (ch->dest->c),
1666           ch->dest->client_ready ? "YES" : "NO",
1667           ntohl (ch->dest->ccn.channel_of_client));
1668   }
1669   LOG2 (level,
1670         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1671         ntohl (ch->mid_recv.mid),
1672         (unsigned long long) ch->mid_futures,
1673         ntohl (ch->mid_send.mid));
1674 }
1675
1676
1677
1678 /* end of gnunet-service-cadet-new_channel.c */