fix crash if this end closed connection and other still sends data
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - FIXME: send ACKs back to loopback clients!
29  *
30  * - introduce shutdown so we can have half-closed channels, modify
31  *   destroy to include MID to have FIN-ACK equivalents, etc.
32  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
33  * - check that '0xFFULL' really is sufficient for flow control!
34  * - revisit handling of 'unreliable' traffic!
35  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
36  * - figure out flow control without ACKs (unreliable traffic!)
37  */
38 #include "platform.h"
39 #include "gnunet_util_lib.h"
40 #include "cadet.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-cadet-new.h"
43 #include "gnunet-service-cadet-new_channel.h"
44 #include "gnunet-service-cadet-new_connection.h"
45 #include "gnunet-service-cadet-new_tunnels.h"
46 #include "gnunet-service-cadet-new_peer.h"
47 #include "gnunet-service-cadet-new_paths.h"
48
49 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50
51 /**
52  * How long do we initially wait before retransmitting?
53  */
54 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55
56 /**
57  * How long do we wait before dropping state about incoming
58  * connection to closed port?
59  */
60 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
61
62
63 /**
64  * All the states a connection can be in.
65  */
66 enum CadetChannelState
67 {
68   /**
69    * Uninitialized status, should never appear in operation.
70    */
71   CADET_CHANNEL_NEW,
72
73   /**
74    * Connection create message sent, waiting for ACK.
75    */
76   CADET_CHANNEL_OPEN_SENT,
77
78   /**
79    * Connection confirmed, ready to carry traffic.
80    */
81   CADET_CHANNEL_READY
82 };
83
84
85 /**
86  * Info needed to retry a message in case it gets lost.
87  * Note that we DO use this structure also for unreliable
88  * messages.
89  */
90 struct CadetReliableMessage
91 {
92   /**
93    * Double linked list, FIFO style
94    */
95   struct CadetReliableMessage *next;
96
97   /**
98    * Double linked list, FIFO style
99    */
100   struct CadetReliableMessage *prev;
101
102   /**
103    * Which channel is this message in?
104    */
105   struct CadetChannel *ch;
106
107   /**
108    * Entry in the tunnels queue for this message, NULL if it has left
109    * the tunnel.  Used to cancel transmission in case we receive an
110    * ACK in time.
111    */
112   struct CadetTunnelQueueEntry *qe;
113
114   /**
115    * How soon should we retry if we fail to get an ACK?
116    * Messages in the queue are sorted by this value.
117    */
118   struct GNUNET_TIME_Absolute next_retry;
119
120   /**
121    * How long do we wait for an ACK after transmission?
122    * Use for the back-off calculation.
123    */
124   struct GNUNET_TIME_Relative retry_delay;
125
126   /**
127    * Data message we are trying to send.
128    */
129   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
130
131 };
132
133
134 /**
135  * List of received out-of-order data messages.
136  */
137 struct CadetOutOfOrderMessage
138 {
139   /**
140    * Double linked list, FIFO style
141    */
142   struct CadetOutOfOrderMessage *next;
143
144   /**
145    * Double linked list, FIFO style
146    */
147   struct CadetOutOfOrderMessage *prev;
148
149   /**
150    * ID of the message (messages up to this point needed
151    * before we give this one to the client).
152    */
153   struct ChannelMessageIdentifier mid;
154
155   /**
156    * The envelope with the payload of the out-of-order message
157    */
158   struct GNUNET_MQ_Envelope *env;
159
160 };
161
162
163 /**
164  * Client endpoint of a `struct CadetChannel`.  A channel may be a
165  * loopback channel, in which case it has two of these endpoints.
166  * Note that flow control also is required in both directions.
167  */
168 struct CadetChannelClient
169 {
170   /**
171    * Client handle.  Not by itself sufficient to designate
172    * the client endpoint, as the same client handle may
173    * be used for both the owner and the destination, and
174    * we thus also need the channel ID to identify the client.
175    */
176   struct CadetClient *c;
177
178   /**
179    * Head of DLL of messages received out of order or while client was unready.
180    */
181   struct CadetOutOfOrderMessage *head_recv;
182
183   /**
184    * Tail DLL of messages received out of order or while client was unready.
185    */
186   struct CadetOutOfOrderMessage *tail_recv;
187
188   /**
189    * Local tunnel number for this client.
190    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
191    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
192    */
193   struct GNUNET_CADET_ClientChannelNumber ccn;
194
195   /**
196    * Can we send data to the client?
197    */
198   int client_ready;
199
200 };
201
202
203 /**
204  * Struct containing all information regarding a channel to a remote client.
205  */
206 struct CadetChannel
207 {
208   /**
209    * Tunnel this channel is in.
210    */
211   struct CadetTunnel *t;
212
213   /**
214    * Client owner of the tunnel, if any.
215    * (Used if this channel represends the initiating end of the tunnel.)
216    */
217   struct CadetChannelClient *owner;
218
219   /**
220    * Client destination of the tunnel, if any.
221    * (Used if this channel represents the listening end of the tunnel.)
222    */
223   struct CadetChannelClient *dest;
224
225   /**
226    * Last entry in the tunnel's queue relating to control messages
227    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
228    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
229    * transmission in case we receive updated information.
230    */
231   struct CadetTunnelQueueEntry *last_control_qe;
232
233   /**
234    * Head of DLL of messages sent and not yet ACK'd.
235    */
236   struct CadetReliableMessage *head_sent;
237
238   /**
239    * Tail of DLL of messages sent and not yet ACK'd.
240    */
241   struct CadetReliableMessage *tail_sent;
242
243   /**
244    * Task to resend/poll in case no ACK is received.
245    */
246   struct GNUNET_SCHEDULER_Task *retry_control_task;
247
248   /**
249    * Task to resend/poll in case no ACK is received.
250    */
251   struct GNUNET_SCHEDULER_Task *retry_data_task;
252
253   /**
254    * Last time the channel was used
255    */
256   struct GNUNET_TIME_Absolute timestamp;
257
258   /**
259    * Destination port of the channel.
260    */
261   struct GNUNET_HashCode port;
262
263   /**
264    * Counter for exponential backoff.
265    */
266   struct GNUNET_TIME_Relative retry_time;
267
268   /**
269    * How long does it usually take to get an ACK.
270    */
271   struct GNUNET_TIME_Relative expected_delay;
272
273   /**
274    * Bitfield of already-received messages past @e mid_recv.
275    */
276   uint64_t mid_futures;
277
278   /**
279    * Next MID expected for incoming traffic.
280    */
281   struct ChannelMessageIdentifier mid_recv;
282
283   /**
284    * Next MID to use for outgoing traffic.
285    */
286   struct ChannelMessageIdentifier mid_send;
287
288   /**
289    * Total (reliable) messages pending ACK for this channel.
290    */
291   unsigned int pending_messages;
292
293   /**
294    * Maximum (reliable) messages pending ACK for this channel
295    * before we throttle the client.
296    */
297   unsigned int max_pending_messages;
298
299   /**
300    * Number identifying this channel in its tunnel.
301    */
302   struct GNUNET_CADET_ChannelTunnelNumber ctn;
303
304   /**
305    * Channel state.
306    */
307   enum CadetChannelState state;
308
309   /**
310    * Is the tunnel bufferless (minimum latency)?
311    */
312   int nobuffer;
313
314   /**
315    * Is the tunnel reliable?
316    */
317   int reliable;
318
319   /**
320    * Is the tunnel out-of-order?
321    */
322   int out_of_order;
323
324   /**
325    * Is this channel a loopback channel, where the destination is us again?
326    */
327   int is_loopback;
328
329   /**
330    * Flag to signal the destruction of the channel.  If this is set to
331    * #GNUNET_YES the channel will be destroyed once the queue is
332    * empty.
333    */
334   int destroy;
335
336 };
337
338
339 /**
340  * Get the static string for identification of the channel.
341  *
342  * @param ch Channel.
343  *
344  * @return Static string with the channel IDs.
345  */
346 const char *
347 GCCH_2s (const struct CadetChannel *ch)
348 {
349   static char buf[128];
350
351   GNUNET_snprintf (buf,
352                    sizeof (buf),
353                    "Channel %s:%s ctn:%X(%X/%X)",
354                    (GNUNET_YES == ch->is_loopback)
355                    ? "loopback"
356                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
357                    GNUNET_h2s (&ch->port),
358                    ch->ctn,
359                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
360                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
361   return buf;
362 }
363
364
365 /**
366  * Get the channel's public ID.
367  *
368  * @param ch Channel.
369  *
370  * @return ID used to identify the channel with the remote peer.
371  */
372 struct GNUNET_CADET_ChannelTunnelNumber
373 GCCH_get_id (const struct CadetChannel *ch)
374 {
375   return ch->ctn;
376 }
377
378
379 /**
380  * Release memory associated with @a ccc
381  *
382  * @param ccc data structure to clean up
383  */
384 static void
385 free_channel_client (struct CadetChannelClient *ccc)
386 {
387   struct CadetOutOfOrderMessage *com;
388
389   while (NULL != (com = ccc->head_recv))
390   {
391     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
392                                  ccc->tail_recv,
393                                  com);
394     GNUNET_MQ_discard (com->env);
395     GNUNET_free (com);
396   }
397   GNUNET_free (ccc);
398 }
399
400
401 /**
402  * Destroy the given channel.
403  *
404  * @param ch channel to destroy
405  */
406 static void
407 channel_destroy (struct CadetChannel *ch)
408 {
409   struct CadetReliableMessage *crm;
410
411   while (NULL != (crm = ch->head_sent))
412   {
413     GNUNET_assert (ch == crm->ch);
414     if (NULL != crm->qe)
415     {
416       GCT_send_cancel (crm->qe);
417       crm->qe = NULL;
418     }
419     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
420                                  ch->tail_sent,
421                                  crm);
422     GNUNET_free (crm->data_message);
423     GNUNET_free (crm);
424   }
425   if (NULL != ch->owner)
426   {
427     free_channel_client (ch->owner);
428     ch->owner = NULL;
429   }
430   if (NULL != ch->dest)
431   {
432     free_channel_client (ch->dest);
433     ch->dest = NULL;
434   }
435   if (NULL != ch->last_control_qe)
436   {
437     GCT_send_cancel (ch->last_control_qe);
438     ch->last_control_qe = NULL;
439   }
440   if (NULL != ch->retry_data_task)
441   {
442     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
443     ch->retry_data_task = NULL;
444   }
445   if (NULL != ch->retry_control_task)
446   {
447     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
448     ch->retry_control_task = NULL;
449   }
450   if (GNUNET_NO == ch->is_loopback)
451   {
452     GCT_remove_channel (ch->t,
453                         ch,
454                         ch->ctn);
455     ch->t = NULL;
456   }
457   GNUNET_free (ch);
458 }
459
460
461 /**
462  * Send a channel create message.
463  *
464  * @param cls Channel for which to send.
465  */
466 static void
467 send_channel_open (void *cls);
468
469
470 /**
471  * Function called once the tunnel confirms that we sent the
472  * create message.  Delays for a bit until we retry.
473  *
474  * @param cls our `struct CadetChannel`.
475  */
476 static void
477 channel_open_sent_cb (void *cls)
478 {
479   struct CadetChannel *ch = cls;
480
481   GNUNET_assert (NULL != ch->last_control_qe);
482   ch->last_control_qe = NULL;
483   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
484   LOG (GNUNET_ERROR_TYPE_DEBUG,
485        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
486        GCCH_2s (ch),
487        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
488                                                GNUNET_YES));
489   ch->retry_control_task
490     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
491                                     &send_channel_open,
492                                     ch);
493 }
494
495
496 /**
497  * Send a channel open message.
498  *
499  * @param cls Channel for which to send.
500  */
501 static void
502 send_channel_open (void *cls)
503 {
504   struct CadetChannel *ch = cls;
505   struct GNUNET_CADET_ChannelOpenMessage msgcc;
506   uint32_t options;
507
508   ch->retry_control_task = NULL;
509   LOG (GNUNET_ERROR_TYPE_DEBUG,
510        "Sending CHANNEL_OPEN message for %s\n",
511        GCCH_2s (ch));
512   options = 0;
513   if (ch->nobuffer)
514     options |= GNUNET_CADET_OPTION_NOBUFFER;
515   if (ch->reliable)
516     options |= GNUNET_CADET_OPTION_RELIABLE;
517   if (ch->out_of_order)
518     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
519   msgcc.header.size = htons (sizeof (msgcc));
520   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
521   msgcc.opt = htonl (options);
522   msgcc.port = ch->port;
523   msgcc.ctn = ch->ctn;
524   ch->state = CADET_CHANNEL_OPEN_SENT;
525   ch->last_control_qe = GCT_send (ch->t,
526                                   &msgcc.header,
527                                   &channel_open_sent_cb,
528                                   ch);
529 }
530
531
532 /**
533  * Function called once and only once after a channel was bound
534  * to its tunnel via #GCT_add_channel() is ready for transmission.
535  * Note that this is only the case for channels that this peer
536  * initiates, as for incoming channels we assume that they are
537  * ready for transmission immediately upon receiving the open
538  * message.  Used to bootstrap the #GCT_send() process.
539  *
540  * @param ch the channel for which the tunnel is now ready
541  */
542 void
543 GCCH_tunnel_up (struct CadetChannel *ch)
544 {
545   GNUNET_assert (NULL == ch->retry_control_task);
546   LOG (GNUNET_ERROR_TYPE_DEBUG,
547        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
548        GCCH_2s (ch));
549   ch->retry_control_task
550     = GNUNET_SCHEDULER_add_now (&send_channel_open,
551                                 ch);
552 }
553
554
555 /**
556  * Create a new channel.
557  *
558  * @param owner local client owning the channel
559  * @param ccn local number of this channel at the @a owner
560  * @param destination peer to which we should build the channel
561  * @param port desired port at @a destination
562  * @param options options for the channel
563  * @return handle to the new channel
564  */
565 struct CadetChannel *
566 GCCH_channel_local_new (struct CadetClient *owner,
567                         struct GNUNET_CADET_ClientChannelNumber ccn,
568                         struct CadetPeer *destination,
569                         const struct GNUNET_HashCode *port,
570                         uint32_t options)
571 {
572   struct CadetChannel *ch;
573   struct CadetChannelClient *ccco;
574
575   ccco = GNUNET_new (struct CadetChannelClient);
576   ccco->c = owner;
577   ccco->ccn = ccn;
578   ccco->client_ready = GNUNET_YES;
579
580   ch = GNUNET_new (struct CadetChannel);
581   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
582   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
583   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
584   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
585   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
586   ch->owner = ccco;
587   ch->port = *port;
588   if (0 == memcmp (&my_full_id,
589                    GCP_get_id (destination),
590                    sizeof (struct GNUNET_PeerIdentity)))
591   {
592     struct CadetClient *c;
593
594     ch->is_loopback = GNUNET_YES;
595     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
596                                            port);
597     if (NULL == c)
598     {
599       /* port closed, wait for it to possibly open */
600       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
601                                                 port,
602                                                 ch,
603                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
604       LOG (GNUNET_ERROR_TYPE_DEBUG,
605            "Created loose incoming loopback channel to port %s\n",
606            GNUNET_h2s (&ch->port));
607     }
608     else
609     {
610       ch->dest = GNUNET_new (struct CadetChannelClient);
611       ch->dest->c = c;
612       ch->dest->client_ready = GNUNET_YES;
613       GCCH_bind (ch,
614                  ch->dest->c);
615     }
616   }
617   else
618   {
619     ch->t = GCP_get_tunnel (destination,
620                             GNUNET_YES);
621     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
622     ch->ctn = GCT_add_channel (ch->t,
623                                ch);
624   }
625   GNUNET_STATISTICS_update (stats,
626                             "# channels",
627                             1,
628                             GNUNET_NO);
629   LOG (GNUNET_ERROR_TYPE_DEBUG,
630        "Created channel to port %s at peer %s for %s using %s\n",
631        GNUNET_h2s (port),
632        GCP_2s (destination),
633        GSC_2s (owner),
634        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
635   return ch;
636 }
637
638
639 /**
640  * We had an incoming channel to a port that is closed.
641  * It has not been opened for a while, drop it.
642  *
643  * @param cls the channel to drop
644  */
645 static void
646 timeout_closed_cb (void *cls)
647 {
648   struct CadetChannel *ch = cls;
649
650   ch->retry_control_task = NULL;
651   LOG (GNUNET_ERROR_TYPE_DEBUG,
652        "Closing incoming channel to port %s from peer %s due to timeout\n",
653        GNUNET_h2s (&ch->port),
654        GCP_2s (GCT_get_destination (ch->t)));
655   channel_destroy (ch);
656 }
657
658
659 /**
660  * Create a new channel based on a request coming in over the network.
661  *
662  * @param t tunnel to the remote peer
663  * @param ctn identifier of this channel in the tunnel
664  * @param port desired local port
665  * @param options options for the channel
666  * @return handle to the new channel
667  */
668 struct CadetChannel *
669 GCCH_channel_incoming_new (struct CadetTunnel *t,
670                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
671                            const struct GNUNET_HashCode *port,
672                            uint32_t options)
673 {
674   struct CadetChannel *ch;
675   struct CadetClient *c;
676
677   ch = GNUNET_new (struct CadetChannel);
678   ch->port = *port;
679   ch->t = t;
680   ch->ctn = ctn;
681   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
682   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
683   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
684   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
685   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
686   GNUNET_STATISTICS_update (stats,
687                             "# channels",
688                             1,
689                             GNUNET_NO);
690
691   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
692                                          port);
693   if (NULL == c)
694   {
695     /* port closed, wait for it to possibly open */
696     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
697                                               port,
698                                               ch,
699                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
700     ch->retry_control_task
701       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
702                                       &timeout_closed_cb,
703                                       ch);
704     LOG (GNUNET_ERROR_TYPE_DEBUG,
705          "Created loose incoming channel to port %s from peer %s\n",
706          GNUNET_h2s (&ch->port),
707          GCP_2s (GCT_get_destination (ch->t)));
708   }
709   else
710   {
711     GCCH_bind (ch,
712                c);
713   }
714   GNUNET_STATISTICS_update (stats,
715                             "# channels",
716                             1,
717                             GNUNET_NO);
718   return ch;
719 }
720
721
722 /**
723  * Function called once the tunnel confirms that we sent the
724  * ACK message.  Just remembers it was sent, we do not expect
725  * ACKs for ACKs ;-).
726  *
727  * @param cls our `struct CadetChannel`.
728  */
729 static void
730 send_ack_cb (void *cls)
731 {
732   struct CadetChannel *ch = cls;
733
734   GNUNET_assert (NULL != ch->last_control_qe);
735   ch->last_control_qe = NULL;
736 }
737
738
739 /**
740  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
741  *
742  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
743  */
744 static void
745 send_channel_data_ack (struct CadetChannel *ch)
746 {
747   struct GNUNET_CADET_ChannelDataAckMessage msg;
748
749   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
750   msg.header.size = htons (sizeof (msg));
751   msg.ctn = ch->ctn;
752   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
753   msg.futures = GNUNET_htonll (ch->mid_futures);
754   if (NULL != ch->last_control_qe)
755     GCT_send_cancel (ch->last_control_qe);
756   ch->last_control_qe = GCT_send (ch->t,
757                                   &msg.header,
758                                   &send_ack_cb,
759                                   ch);
760 }
761
762
763 /**
764  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
765  * connection is up.
766  *
767  * @param cls the `struct CadetChannel`
768  */
769 static void
770 send_open_ack (void *cls)
771 {
772   struct CadetChannel *ch = cls;
773   struct GNUNET_CADET_ChannelManageMessage msg;
774
775   LOG (GNUNET_ERROR_TYPE_DEBUG,
776        "Sending CHANNEL_OPEN_ACK on %s\n",
777        GCCH_2s (ch));
778   ch->retry_control_task = NULL;
779   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
780   msg.header.size = htons (sizeof (msg));
781   msg.reserved = htonl (0);
782   msg.ctn = ch->ctn;
783   if (NULL != ch->last_control_qe)
784     GCT_send_cancel (ch->last_control_qe);
785   ch->last_control_qe = GCT_send (ch->t,
786                                   &msg.header,
787                                   &send_ack_cb,
788                                   ch);
789 }
790
791
792 /**
793  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
794  * this channel.  If the binding was successful, (re)transmit the
795  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
796  *
797  * @param ch channel that got the duplicate open
798  */
799 void
800 GCCH_handle_duplicate_open (struct CadetChannel *ch)
801 {
802   if (NULL == ch->dest)
803   {
804     LOG (GNUNET_ERROR_TYPE_DEBUG,
805          "Ignoring duplicate channel OPEN on %s: port is closed\n",
806          GCCH_2s (ch));
807     return;
808   }
809   if (NULL != ch->retry_control_task)
810   {
811     LOG (GNUNET_ERROR_TYPE_DEBUG,
812          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
813          GCCH_2s (ch));
814     return;
815   }
816   LOG (GNUNET_ERROR_TYPE_DEBUG,
817        "Retransmitting OPEN_ACK on %s\n",
818        GCCH_2s (ch));
819   ch->retry_control_task
820     = GNUNET_SCHEDULER_add_now (&send_open_ack,
821                                 ch);
822 }
823
824
825 /**
826  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
827  *
828  * @param ch channel the ack is for
829  * @param to_owner #GNUNET_YES to send to owner,
830  *                 #GNUNET_NO to send to dest
831  */
832 static void
833 send_ack_to_client (struct CadetChannel *ch,
834                     int to_owner)
835 {
836   struct GNUNET_MQ_Envelope *env;
837   struct GNUNET_CADET_LocalAck *ack;
838   struct CadetChannelClient *ccc;
839
840   env = GNUNET_MQ_msg (ack,
841                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
842   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
843   ack->ccn = ccc->ccn;
844   LOG (GNUNET_ERROR_TYPE_DEBUG,
845        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X\n",
846        GSC_2s (ccc->c),
847        (GNUNET_YES == to_owner) ? "owner" : "dest",
848        ntohl (ack->ccn.channel_of_client));
849   GSC_send_to_client (ccc->c,
850                       env);
851 }
852
853
854 /**
855  * A client is bound to the port that we have a channel
856  * open to.  Send the acknowledgement for the connection
857  * request and establish the link with the client.
858  *
859  * @param ch open incoming channel
860  * @param c client listening on the respective port
861  */
862 void
863 GCCH_bind (struct CadetChannel *ch,
864            struct CadetClient *c)
865 {
866   uint32_t options;
867   struct CadetChannelClient *cccd;
868
869   LOG (GNUNET_ERROR_TYPE_DEBUG,
870        "Binding %s from %s to port %s of %s\n",
871        GCCH_2s (ch),
872        GCT_2s (ch->t),
873        GNUNET_h2s (&ch->port),
874        GSC_2s (c));
875   if (NULL != ch->retry_control_task)
876   {
877     /* there might be a timeout task here */
878     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
879     ch->retry_control_task = NULL;
880   }
881   options = 0;
882   if (ch->nobuffer)
883     options |= GNUNET_CADET_OPTION_NOBUFFER;
884   if (ch->reliable)
885     options |= GNUNET_CADET_OPTION_RELIABLE;
886   if (ch->out_of_order)
887     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
888   cccd = GNUNET_new (struct CadetChannelClient);
889   ch->dest = cccd;
890   cccd->c = c;
891   cccd->client_ready = GNUNET_YES;
892   cccd->ccn = GSC_bind (c,
893                         ch,
894                         (GNUNET_YES == ch->is_loopback)
895                         ? GCP_get (&my_full_id,
896                                    GNUNET_YES)
897                         : GCT_get_destination (ch->t),
898                         &ch->port,
899                         options);
900   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
901                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
902   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
903   if (GNUNET_YES == ch->is_loopback)
904   {
905     ch->state = CADET_CHANNEL_OPEN_SENT;
906     GCCH_handle_channel_open_ack (ch);
907   }
908   else
909   {
910     /* notify other peer that we accepted the connection */
911     ch->retry_control_task
912       = GNUNET_SCHEDULER_add_now (&send_open_ack,
913                                   ch);
914   }
915   /* give client it's initial supply of ACKs */
916   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
917                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
918   for (unsigned int i=0;i<ch->max_pending_messages;i++)
919     send_ack_to_client (ch,
920                         GNUNET_NO);
921 }
922
923
924 /**
925  * Destroy locally created channel.  Called by the local client, so no
926  * need to tell the client.
927  *
928  * @param ch channel to destroy
929  * @param c client that caused the destruction
930  * @param ccn client number of the client @a c
931  */
932 void
933 GCCH_channel_local_destroy (struct CadetChannel *ch,
934                             struct CadetClient *c,
935                             struct GNUNET_CADET_ClientChannelNumber ccn)
936 {
937   LOG (GNUNET_ERROR_TYPE_DEBUG,
938        "%s asks for destruction of %s\n",
939        GSC_2s (c),
940        GCCH_2s (ch));
941   GNUNET_assert (NULL != c);
942   if ( (NULL != ch->owner) &&
943        (c == ch->owner->c) &&
944        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
945   {
946     free_channel_client (ch->owner);
947     ch->owner = NULL;
948   }
949   else if ( (NULL != ch->dest) &&
950             (c == ch->dest->c) &&
951             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
952   {
953     free_channel_client (ch->dest);
954     ch->dest = NULL;
955   }
956   else
957   {
958     GNUNET_assert (0);
959   }
960
961   if (GNUNET_YES == ch->destroy)
962   {
963     /* other end already destroyed, with the local client gone, no need
964        to finish transmissions, just destroy immediately. */
965     channel_destroy (ch);
966     return;
967   }
968   if ( (NULL != ch->head_sent) ||
969        (NULL != ch->owner) ||
970        (NULL != ch->dest) )
971   {
972     /* Wait for other end to destroy us as well,
973        and otherwise allow send queue to be transmitted first */
974     ch->destroy = GNUNET_YES;
975     return;
976   }
977   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
978   if (CADET_CHANNEL_NEW != ch->state)
979     GCT_send_channel_destroy (ch->t,
980                               ch->ctn);
981   /* Nothing left to do, just finish destruction */
982   channel_destroy (ch);
983 }
984
985
986 /**
987  * We got an acknowledgement for the creation of the channel
988  * (the port is open on the other side). Begin transmissions.
989  *
990  * @param ch channel to destroy
991  */
992 void
993 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
994 {
995   switch (ch->state)
996   {
997   case CADET_CHANNEL_NEW:
998     /* this should be impossible */
999     GNUNET_break (0);
1000     break;
1001   case CADET_CHANNEL_OPEN_SENT:
1002     if (NULL == ch->owner)
1003     {
1004       /* We're not the owner, wrong direction! */
1005       GNUNET_break_op (0);
1006       return;
1007     }
1008     LOG (GNUNET_ERROR_TYPE_DEBUG,
1009          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1010          GCCH_2s (ch));
1011     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1012     {
1013       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1014       ch->retry_control_task = NULL;
1015     }
1016     ch->state = CADET_CHANNEL_READY;
1017     /* On first connect, send client as many ACKs as we allow messages
1018        to be buffered! */
1019     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1020       send_ack_to_client (ch,
1021                           GNUNET_YES);
1022     break;
1023   case CADET_CHANNEL_READY:
1024     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1025     LOG (GNUNET_ERROR_TYPE_DEBUG,
1026          "Received duplicate channel OPEN_ACK for %s\n",
1027          GCCH_2s (ch));
1028     GNUNET_STATISTICS_update (stats,
1029                               "# duplicate CREATE_ACKs",
1030                               1,
1031                               GNUNET_NO);
1032     break;
1033   }
1034 }
1035
1036
1037 /**
1038  * Test if element @a e1 comes before element @a e2.
1039  *
1040  * TODO: use opportunity to create generic list insertion sort
1041  * logic in container!
1042  *
1043  * @param cls closure, our `struct CadetChannel`
1044  * @param e1 an element of to sort
1045  * @param e2 another element to sort
1046  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1047  */
1048 static int
1049 is_before (void *cls,
1050            void *e1,
1051            void *e2)
1052 {
1053   struct CadetOutOfOrderMessage *m1 = e1;
1054   struct CadetOutOfOrderMessage *m2 = e2;
1055   uint32_t v1 = ntohl (m1->mid.mid);
1056   uint32_t v2 = ntohl (m2->mid.mid);
1057   uint32_t delta;
1058
1059   delta = v1 - v2;
1060   if (delta > (uint32_t) INT_MAX)
1061   {
1062     /* in overflow range, we can safely assume we wrapped around */
1063     return GNUNET_NO;
1064   }
1065   else
1066   {
1067     return GNUNET_YES;
1068   }
1069 }
1070
1071
1072 /**
1073  * We got payload data for a channel.  Pass it on to the client
1074  * and send an ACK to the other end (once flow control allows it!)
1075  *
1076  * @param ch channel that got data
1077  * @param msg message that was received
1078  */
1079 void
1080 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1081                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1082 {
1083   struct GNUNET_MQ_Envelope *env;
1084   struct GNUNET_CADET_LocalData *ld;
1085   struct CadetChannelClient *ccc;
1086   struct CadetOutOfOrderMessage *com;
1087   size_t payload_size;
1088
1089   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1090   if ( (GNUNET_YES == ch->destroy) &&
1091        (NULL == ch->owner) &&
1092        (NULL == ch->dest) )
1093   {
1094     /* This client is gone, but we still have messages to send to
1095        the other end (which is why @a ch is not yet dead).  However,
1096        we cannot pass messages to our client anymore. */
1097     LOG (GNUNET_ERROR_TYPE_DEBUG,
1098          "Dropping incoming payload on %s as this end is already closed\n",
1099          GCCH_2s (ch));
1100     /* FIXME: send back ACK/NACK/Closed notification
1101        to stop retransmissions! */
1102     return;
1103   }
1104   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1105   env = GNUNET_MQ_msg_extra (ld,
1106                              payload_size,
1107                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1108   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1109   GNUNET_memcpy (&ld[1],
1110                  &msg[1],
1111                  payload_size);
1112   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1113   if ( (GNUNET_YES == ccc->client_ready) &&
1114        ( (GNUNET_YES == ch->out_of_order) ||
1115          (msg->mid.mid == ch->mid_recv.mid) ) )
1116   {
1117     LOG (GNUNET_ERROR_TYPE_DEBUG,
1118          "Giving %u bytes of payload from %s to client %s\n",
1119          (unsigned int) payload_size,
1120          GCCH_2s (ch),
1121          GSC_2s (ccc->c));
1122     ccc->client_ready = GNUNET_NO;
1123     GSC_send_to_client (ccc->c,
1124                         env);
1125     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1126     ch->mid_futures >>= 1;
1127   }
1128   else
1129   {
1130     /* FIXME-SECURITY: if the element is WAY too far ahead,
1131        drop it (can't buffer too much!) */
1132     LOG (GNUNET_ERROR_TYPE_DEBUG,
1133          "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n",
1134          (GNUNET_YES == ccc->client_ready)
1135          ? "out-of-order"
1136          : "client-not-ready",
1137          (unsigned int) payload_size,
1138          GCCH_2s (ch),
1139          ntohl (msg->mid.mid),
1140          ntohl (ch->mid_recv.mid));
1141
1142     com = GNUNET_new (struct CadetOutOfOrderMessage);
1143     com->mid = msg->mid;
1144     com->env = env;
1145     /* sort into list ordered by "is_before" */
1146     if ( (NULL == ccc->head_recv) ||
1147          (GNUNET_YES == is_before (ch,
1148                                    com,
1149                                    ccc->head_recv)) )
1150     {
1151       GNUNET_CONTAINER_DLL_insert (ccc->head_recv,
1152                                    ccc->tail_recv,
1153                                    com);
1154     }
1155     else
1156     {
1157       struct CadetOutOfOrderMessage *pos;
1158
1159       for (pos = ccc->head_recv;
1160            NULL != pos;
1161            pos = pos->next)
1162       {
1163         if (GNUNET_YES !=
1164             is_before (NULL,
1165                        pos,
1166                        com))
1167           break;
1168       }
1169       if (NULL == pos)
1170         GNUNET_CONTAINER_DLL_insert_tail (ccc->head_recv,
1171                                           ccc->tail_recv,
1172                                           com);
1173       else
1174         GNUNET_CONTAINER_DLL_insert_after (ccc->head_recv,
1175                                            ccc->tail_recv,
1176                                            com,
1177                                            pos->prev);
1178     }
1179   }
1180 }
1181
1182
1183 /**
1184  * We got an acknowledgement for payload data for a channel.
1185  * Possibly resume transmissions.
1186  *
1187  * @param ch channel that got the ack
1188  * @param ack details about what was received
1189  */
1190 void
1191 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1192                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1193 {
1194   struct CadetReliableMessage *crm;
1195
1196   GNUNET_break (GNUNET_NO == ch->is_loopback);
1197   if (GNUNET_NO == ch->reliable)
1198   {
1199     /* not expecting ACKs on unreliable channel, odd */
1200     GNUNET_break_op (0);
1201     return;
1202   }
1203   for (crm = ch->head_sent;
1204         NULL != crm;
1205        crm = crm->next)
1206     if (ack->mid.mid == crm->data_message->mid.mid)
1207       break;
1208   if (NULL == crm)
1209   {
1210     /* ACK for message we already dropped, might have been a
1211        duplicate ACK? Ignore. */
1212     LOG (GNUNET_ERROR_TYPE_DEBUG,
1213          "Duplicate DATA_ACK on %s, ignoring\n",
1214          GCCH_2s (ch));
1215     GNUNET_STATISTICS_update (stats,
1216                               "# duplicate DATA_ACKs",
1217                               1,
1218                               GNUNET_NO);
1219     return;
1220   }
1221   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1222                                ch->tail_sent,
1223                                crm);
1224   GNUNET_free (crm->data_message);
1225   GNUNET_free (crm);
1226   ch->pending_messages--;
1227   send_ack_to_client (ch,
1228                       (NULL == ch->owner)
1229                       ? GNUNET_NO
1230                       : GNUNET_YES);
1231   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1232   LOG (GNUNET_ERROR_TYPE_DEBUG,
1233        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1234        GCCH_2s (ch),
1235        (unsigned int) ntohl (ack->mid.mid),
1236        ch->pending_messages);
1237   send_ack_to_client (ch,
1238                       (NULL == ch->owner)
1239                       ? GNUNET_NO
1240                       : GNUNET_YES);
1241 }
1242
1243
1244 /**
1245  * Destroy channel, based on the other peer closing the
1246  * connection.  Also needs to remove this channel from
1247  * the tunnel.
1248  *
1249  * @param ch channel to destroy
1250  */
1251 void
1252 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1253 {
1254   struct CadetChannelClient *ccc;
1255
1256   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1257   LOG (GNUNET_ERROR_TYPE_DEBUG,
1258        "Received remote channel DESTROY for %s\n",
1259        GCCH_2s (ch));
1260   if (GNUNET_YES == ch->destroy)
1261   {
1262     /* Local client already gone, this is instant-death. */
1263     channel_destroy (ch);
1264     return;
1265   }
1266   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1267   if (NULL != ccc->head_recv)
1268   {
1269     LOG (GNUNET_ERROR_TYPE_WARNING,
1270          "Lost end of transmission due to remote shutdown on %s\n",
1271          GCCH_2s (ch));
1272     /* FIXME: change API to notify client about truncated transmission! */
1273   }
1274   ch->destroy = GNUNET_YES;
1275   GSC_handle_remote_channel_destroy (ccc->c,
1276                                      ccc->ccn,
1277                                      ch);
1278   channel_destroy (ch);
1279 }
1280
1281
1282 /**
1283  * Function called once the tunnel has sent one of our messages.
1284  * If the message is unreliable, simply frees the `crm`. If the
1285  * message was reliable, calculate retransmission time and
1286  * wait for ACK (or retransmit).
1287  *
1288  * @param cls the `struct CadetReliableMessage` that was sent
1289  */
1290 static void
1291 data_sent_cb (void *cls);
1292
1293
1294 /**
1295  * We need to retry a transmission, the last one took too long to
1296  * be acknowledged.
1297  *
1298  * @param cls the `struct CadetChannel` where we need to retransmit
1299  */
1300 static void
1301 retry_transmission (void *cls)
1302 {
1303   struct CadetChannel *ch = cls;
1304   struct CadetReliableMessage *crm = ch->head_sent;
1305
1306   ch->retry_data_task = NULL;
1307   GNUNET_assert (NULL == crm->qe);
1308   crm->qe = GCT_send (ch->t,
1309                       &crm->data_message->header,
1310                       &data_sent_cb,
1311                       crm);
1312 }
1313
1314
1315 /**
1316  * Function called once the tunnel has sent one of our messages.
1317  * If the message is unreliable, simply frees the `crm`. If the
1318  * message was reliable, calculate retransmission time and
1319  * wait for ACK (or retransmit).
1320  *
1321  * @param cls the `struct CadetReliableMessage` that was sent
1322  */
1323 static void
1324 data_sent_cb (void *cls)
1325 {
1326   struct CadetReliableMessage *crm = cls;
1327   struct CadetChannel *ch = crm->ch;
1328   struct CadetReliableMessage *off;
1329
1330   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1331   crm->qe = NULL;
1332   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1333                                ch->tail_sent,
1334                                crm);
1335   if (GNUNET_NO == ch->reliable)
1336   {
1337     GNUNET_free (crm);
1338     ch->pending_messages--;
1339     send_ack_to_client (ch,
1340                         (NULL == ch->owner)
1341                         ? GNUNET_NO
1342                         : GNUNET_YES);
1343     return;
1344   }
1345   if (0 == crm->retry_delay.rel_value_us)
1346     crm->retry_delay = ch->expected_delay;
1347   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1348
1349   /* find position for re-insertion into the DLL */
1350   if ( (NULL == ch->head_sent) ||
1351        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1352   {
1353     /* insert at HEAD, also (re)schedule retry task! */
1354     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1355                                  ch->tail_sent,
1356                                  crm);
1357     if (NULL != ch->retry_data_task)
1358       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1359     ch->retry_data_task
1360       = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1361                                       &retry_transmission,
1362                                       ch);
1363     return;
1364   }
1365   for (off = ch->head_sent; NULL != off; off = off->next)
1366     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1367       break;
1368   if (NULL == off)
1369   {
1370     /* insert at tail */
1371     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1372                                       ch->tail_sent,
1373                                       crm);
1374   }
1375   else
1376   {
1377     /* insert before off */
1378     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1379                                        ch->tail_sent,
1380                                        off->prev,
1381                                        crm);
1382   }
1383 }
1384
1385
1386 /**
1387  * Handle data given by a client.
1388  *
1389  * Check whether the client is allowed to send in this tunnel, save if
1390  * channel is reliable and send an ACK to the client if there is still
1391  * buffer space in the tunnel.
1392  *
1393  * @param ch Channel.
1394  * @param sender_ccn ccn of the sender
1395  * @param buf payload to transmit.
1396  * @param buf_len number of bytes in @a buf
1397  * @return #GNUNET_OK if everything goes well,
1398  *         #GNUNET_SYSERR in case of an error.
1399  */
1400 int
1401 GCCH_handle_local_data (struct CadetChannel *ch,
1402                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1403                         const char *buf,
1404                         size_t buf_len)
1405 {
1406   struct CadetReliableMessage *crm;
1407
1408   if (ch->pending_messages > ch->max_pending_messages)
1409   {
1410     GNUNET_break (0);
1411     return GNUNET_SYSERR;
1412   }
1413   ch->pending_messages++;
1414
1415   if (GNUNET_YES == ch->is_loopback)
1416   {
1417     struct CadetChannelClient *receiver;
1418     struct GNUNET_MQ_Envelope *env;
1419     struct GNUNET_CADET_LocalData *ld;
1420     int to_owner;
1421
1422     env = GNUNET_MQ_msg_extra (ld,
1423                                buf_len,
1424                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1425     if (sender_ccn.channel_of_client ==
1426         ch->owner->ccn.channel_of_client)
1427     {
1428       receiver = ch->dest;
1429       to_owner = GNUNET_NO;
1430     }
1431     else
1432     {
1433       GNUNET_assert (sender_ccn.channel_of_client ==
1434                      ch->dest->ccn.channel_of_client);
1435       receiver = ch->owner;
1436       to_owner = GNUNET_YES;
1437     }
1438     ld->ccn = receiver->ccn;
1439     GNUNET_memcpy (&ld[1],
1440                    buf,
1441                    buf_len);
1442     /* FIXME: this does not provide for flow control! */
1443     GSC_send_to_client (receiver->c,
1444                         env);
1445     send_ack_to_client (ch,
1446                         to_owner);
1447     return GNUNET_OK;
1448   }
1449
1450   /* Everything is correct, send the message. */
1451   crm = GNUNET_malloc (sizeof (*crm));
1452   crm->ch = ch;
1453   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1454                                      + buf_len);
1455   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1456   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1457   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1458   crm->data_message->mid = ch->mid_send;
1459   crm->data_message->ctn = ch->ctn;
1460   GNUNET_memcpy (&crm->data_message[1],
1461                  buf,
1462                  buf_len);
1463   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1464                                ch->tail_sent,
1465                                crm);
1466   LOG (GNUNET_ERROR_TYPE_DEBUG,
1467        "Sending %u bytes from local client to %s\n",
1468        buf_len,
1469        GCCH_2s (ch));
1470   crm->qe = GCT_send (ch->t,
1471                       &crm->data_message->header,
1472                       &data_sent_cb,
1473                       crm);
1474   return GNUNET_OK;
1475 }
1476
1477
1478 /**
1479  * Handle ACK from client on local channel.  Means the client is ready
1480  * for more data, see if we have any for it.
1481  *
1482  * @param ch channel to destroy
1483  * @param client_ccn ccn of the client sending the ack
1484  */
1485 void
1486 GCCH_handle_local_ack (struct CadetChannel *ch,
1487                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1488 {
1489   struct CadetChannelClient *ccc;
1490   struct CadetOutOfOrderMessage *com;
1491
1492   if ( (NULL != ch->owner) &&
1493        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1494     ccc = ch->owner;
1495   else if ( (NULL != ch->dest) &&
1496             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1497     ccc = ch->dest;
1498   else
1499     GNUNET_assert (0);
1500   ccc->client_ready = GNUNET_YES;
1501   LOG (GNUNET_ERROR_TYPE_DEBUG,
1502        "Got LOCAL_ACK, client ready to receive more data!\n");
1503   com = ccc->head_recv;
1504   if (NULL == com)
1505     return; /* none pending */
1506   if ( (com->mid.mid != ch->mid_recv.mid) &&
1507        (GNUNET_NO == ch->out_of_order) )
1508     return; /* missing next one in-order */
1509
1510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1511               "Passing payload message to client on %s\n",
1512               GCCH_2s (ch));
1513
1514   /* all good, pass next message to client */
1515   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1516                                ccc->tail_recv,
1517                                com);
1518   /* FIXME: if unreliable, this is not aggressive
1519      enough, as it would be OK to have lost some! */
1520   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1521   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1522   ccc->client_ready = GNUNET_NO;
1523   GSC_send_to_client (ccc->c,
1524                       com->env);
1525   GNUNET_free (com);
1526   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1527        (GNUNET_YES == ch->reliable) )
1528   {
1529     /* The next 15 messages were also already received (0xFF), this
1530        suggests that the sender may be blocked on flow control
1531        urgently waiting for an ACK from us. (As we have an inherent
1532        maximum of 64 bits, and 15 is getting too close for comfort.)
1533        So we should send one now. */
1534     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1535                 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1536                 GCCH_2s (ch));
1537     if (GNUNET_YES == ch->reliable)
1538       send_channel_data_ack (ch);
1539   }
1540
1541   if (NULL != ccc->head_recv)
1542     return;
1543   if (GNUNET_NO == ch->destroy)
1544     return;
1545   GCT_send_channel_destroy (ch->t,
1546                             ch->ctn);
1547   channel_destroy (ch);
1548 }
1549
1550
1551 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1552
1553
1554 /**
1555  * Log channel info.
1556  *
1557  * @param ch Channel.
1558  * @param level Debug level to use.
1559  */
1560 void
1561 GCCH_debug (struct CadetChannel *ch,
1562             enum GNUNET_ErrorType level)
1563 {
1564   int do_log;
1565
1566   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1567                                        "cadet-chn",
1568                                        __FILE__, __FUNCTION__, __LINE__);
1569   if (0 == do_log)
1570     return;
1571
1572   if (NULL == ch)
1573   {
1574     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1575     return;
1576   }
1577   LOG2 (level,
1578         "CHN %s:%X (%p)\n",
1579         GCT_2s (ch->t),
1580         ch->ctn,
1581         ch);
1582   if (NULL != ch->owner)
1583   {
1584     LOG2 (level,
1585           "CHN origin %s ready %s local-id: %u\n",
1586           GSC_2s (ch->owner->c),
1587           ch->owner->client_ready ? "YES" : "NO",
1588           ntohl (ch->owner->ccn.channel_of_client));
1589   }
1590   if (NULL != ch->dest)
1591   {
1592     LOG2 (level,
1593           "CHN destination %s ready %s local-id: %u\n",
1594           GSC_2s (ch->dest->c),
1595           ch->dest->client_ready ? "YES" : "NO",
1596           ntohl (ch->dest->ccn.channel_of_client));
1597   }
1598   LOG2 (level,
1599         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1600         ntohl (ch->mid_recv.mid),
1601         (unsigned long long) ch->mid_futures,
1602         ntohl (ch->mid_send.mid));
1603 }
1604
1605
1606
1607 /* end of gnunet-service-cadet-new_channel.c */