more work on channel/tunnel logic
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
29  * - check that '0xFFULL' really is sufficient for flow control!
30  * - what about the 'no buffer' option?
31  * - what about the 'out-of-order' option?
32  */
33 #include "platform.h"
34 #include "gnunet_util_lib.h"
35 #include "cadet.h"
36 #include "gnunet_statistics_service.h"
37 #include "gnunet-service-cadet-new.h"
38 #include "gnunet-service-cadet-new_channel.h"
39 #include "gnunet-service-cadet-new_connection.h"
40 #include "gnunet-service-cadet-new_tunnels.h"
41 #include "gnunet-service-cadet-new_peer.h"
42 #include "gnunet-service-cadet-new_paths.h"
43
44 #define LOG(level, ...) GNUNET_log (level,__VA_ARGS__)
45
46 /**
47  * How long do we initially wait before retransmitting?
48  */
49 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
50
51 /**
52  * How long do we wait before dropping state about incoming
53  * connection to closed port?
54  */
55 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
56
57
58 /**
59  * All the states a connection can be in.
60  */
61 enum CadetChannelState
62 {
63   /**
64    * Uninitialized status, should never appear in operation.
65    */
66   CADET_CHANNEL_NEW,
67
68   /**
69    * Connection create message sent, waiting for ACK.
70    */
71   CADET_CHANNEL_CREATE_SENT,
72
73   /**
74    * Connection confirmed, ready to carry traffic.
75    */
76   CADET_CHANNEL_READY
77 };
78
79
80 /**
81  * Info needed to retry a message in case it gets lost.
82  * Note that we DO use this structure also for unreliable
83  * messages.
84  */
85 struct CadetReliableMessage
86 {
87   /**
88    * Double linked list, FIFO style
89    */
90   struct CadetReliableMessage *next;
91
92   /**
93    * Double linked list, FIFO style
94    */
95   struct CadetReliableMessage *prev;
96
97   /**
98    * Which channel is this message in?
99    */
100   struct CadetChannel *ch;
101
102   /**
103    * Entry in the tunnels queue for this message, NULL if it has left
104    * the tunnel.  Used to cancel transmission in case we receive an
105    * ACK in time.
106    */
107   struct CadetTunnelQueueEntry *qe;
108
109   /**
110    * How soon should we retry if we fail to get an ACK?
111    * Messages in the queue are sorted by this value.
112    */
113   struct GNUNET_TIME_Absolute next_retry;
114
115   /**
116    * How long do we wait for an ACK after transmission?
117    * Use for the back-off calculation.
118    */
119   struct GNUNET_TIME_Relative retry_delay;
120
121   /**
122    * Data message we are trying to send.
123    */
124   struct GNUNET_CADET_ChannelAppDataMessage data_message;
125
126   /* followed by variable-size payload */
127 };
128
129
130 /**
131  * List of received out-of-order data messages.
132  */
133 struct CadetOutOfOrderMessage
134 {
135   /**
136    * Double linked list, FIFO style
137    */
138   struct CadetOutOfOrderMessage *next;
139
140   /**
141    * Double linked list, FIFO style
142    */
143   struct CadetOutOfOrderMessage *prev;
144
145   /**
146    * ID of the message (ACK needed to free)
147    */
148   struct ChannelMessageIdentifier mid;
149
150   /**
151    * The envelope with the payload of the out-of-order message
152    */
153   struct GNUNET_MQ_Envelope *env;
154
155 };
156
157
158 /**
159  * Struct containing all information regarding a channel to a remote client.
160  */
161 struct CadetChannel
162 {
163   /**
164    * Tunnel this channel is in.
165    */
166   struct CadetTunnel *t;
167
168   /**
169    * Last entry in the tunnel's queue relating to control messages
170    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
171    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
172    * transmission in case we receive updated information.
173    */
174   struct CadetTunnelQueueEntry *last_control_qe;
175
176   /**
177    * Client owner of the tunnel, if any.
178    * (Used if this channel represends the initiating end of the tunnel.)
179    */
180   struct CadetClient *owner;
181
182   /**
183    * Client destination of the tunnel, if any.
184    * (Used if this channel represents the listening end of the tunnel.)
185    */
186   struct CadetClient *dest;
187
188   /**
189    * Head of DLL of messages sent and not yet ACK'd.
190    */
191   struct CadetReliableMessage *head_sent;
192
193   /**
194    * Tail of DLL of messages sent and not yet ACK'd.
195    */
196   struct CadetReliableMessage *tail_sent;
197
198   /**
199    * Head of DLL of messages received out of order or while client was unready.
200    */
201   struct CadetOutOfOrderMessage *head_recv;
202
203   /**
204    * Tail DLL of messages received out of order or while client was unready.
205    */
206   struct CadetOutOfOrderMessage *tail_recv;
207
208   /**
209    * Task to resend/poll in case no ACK is received.
210    */
211   struct GNUNET_SCHEDULER_Task *retry_task;
212
213   /**
214    * Last time the channel was used
215    */
216   struct GNUNET_TIME_Absolute timestamp;
217
218   /**
219    * Destination port of the channel.
220    */
221   struct GNUNET_HashCode port;
222
223   /**
224    * Counter for exponential backoff.
225    */
226   struct GNUNET_TIME_Relative retry_time;
227
228   /**
229    * How long does it usually take to get an ACK.
230    */
231   struct GNUNET_TIME_Relative expected_delay;
232
233   /**
234    * Bitfield of already-received messages past @e mid_recv.
235    */
236   uint64_t mid_futures;
237
238   /**
239    * Next MID expected for incoming traffic.
240    */
241   struct ChannelMessageIdentifier mid_recv;
242
243   /**
244    * Next MID to use for outgoing traffic.
245    */
246   struct ChannelMessageIdentifier mid_send;
247
248   /**
249    * Total (reliable) messages pending ACK for this channel.
250    */
251   unsigned int pending_messages;
252
253   /**
254    * Maximum (reliable) messages pending ACK for this channel
255    * before we throttle the client.
256    */
257   unsigned int max_pending_messages;
258
259   /**
260    * Number identifying this channel in its tunnel.
261    */
262   struct GNUNET_CADET_ChannelTunnelNumber chid;
263
264   /**
265    * Local tunnel number for local client owning the channel.
266    * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
267    */
268   struct GNUNET_CADET_ClientChannelNumber lid;
269
270   /**
271    * Channel state.
272    */
273   enum CadetChannelState state;
274
275   /**
276    * Can we send data to the client?
277    */
278   int client_ready;
279
280   /**
281    * Can the client send data to us?
282    */
283   int client_allowed;
284
285   /**
286    * Is the tunnel bufferless (minimum latency)?
287    */
288   int nobuffer;
289
290   /**
291    * Is the tunnel reliable?
292    */
293   int reliable;
294
295   /**
296    * Is the tunnel out-of-order?
297    */
298   int out_of_order;
299
300   /**
301    * Flag to signal the destruction of the channel.  If this is set to
302    * #GNUNET_YES the channel will be destroyed once the queue is
303    * empty.
304    */
305   int destroy;
306
307 };
308
309
310
311 /**
312  * Get the static string for identification of the channel.
313  *
314  * @param ch Channel.
315  *
316  * @return Static string with the channel IDs.
317  */
318 const char *
319 GCCH_2s (const struct CadetChannel *ch)
320 {
321   static char buf[128];
322
323   if (NULL == ch)
324     return "(NULL Channel)";
325   GNUNET_snprintf (buf,
326                    sizeof (buf),
327                    "%s:%s chid:%X (%X)",
328                    GCT_2s (ch->t),
329                    GNUNET_h2s (&ch->port),
330                    ch->chid,
331                    ntohl (ch->lid.channel_of_client));
332   return buf;
333 }
334
335
336 /**
337  * Get the channel's public ID.
338  *
339  * @param ch Channel.
340  *
341  * @return ID used to identify the channel with the remote peer.
342  */
343 struct GNUNET_CADET_ChannelTunnelNumber
344 GCCH_get_id (const struct CadetChannel *ch)
345 {
346   return ch->chid;
347 }
348
349
350 /**
351  * Destroy the given channel.
352  *
353  * @param ch channel to destroy
354  */
355 static void
356 channel_destroy (struct CadetChannel *ch)
357 {
358   struct CadetReliableMessage *crm;
359   struct CadetOutOfOrderMessage *com;
360
361   while (NULL != (crm = ch->head_sent))
362   {
363     GNUNET_assert (ch == crm->ch);
364     if (NULL != crm->qe)
365     {
366       GCT_send_cancel (crm->qe);
367       crm->qe = NULL;
368     }
369     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
370                                  ch->tail_sent,
371                                  crm);
372     GNUNET_free (crm);
373   }
374   while (NULL != (com = ch->head_recv))
375   {
376     GNUNET_CONTAINER_DLL_remove (ch->head_recv,
377                                  ch->tail_recv,
378                                  com);
379     GNUNET_MQ_discard (com->env);
380     GNUNET_free (com);
381   }
382   if (NULL != ch->last_control_qe)
383   {
384     GCT_send_cancel (ch->last_control_qe);
385     ch->last_control_qe = NULL;
386   }
387   if (NULL != ch->retry_task)
388   {
389     GNUNET_SCHEDULER_cancel (ch->retry_task);
390     ch->retry_task = NULL;
391   }
392   GCT_remove_channel (ch->t,
393                       ch,
394                       ch->chid);
395   GNUNET_free (ch);
396 }
397
398
399 /**
400  * Send a channel create message.
401  *
402  * @param cls Channel for which to send.
403  */
404 static void
405 send_create (void *cls);
406
407
408 /**
409  * Function called once the tunnel confirms that we sent the
410  * create message.  Delays for a bit until we retry.
411  *
412  * @param cls our `struct CadetChannel`.
413  */
414 static void
415 create_sent_cb (void *cls)
416 {
417   struct CadetChannel *ch = cls;
418
419   ch->last_control_qe = NULL;
420   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
421   ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
422                                                  &send_create,
423                                                  ch);
424 }
425
426
427 /**
428  * Send a channel create message.
429  *
430  * @param cls Channel for which to send.
431  */
432 static void
433 send_create (void *cls)
434 {
435   struct CadetChannel *ch = cls;
436   struct GNUNET_CADET_ChannelOpenMessage msgcc;
437   uint32_t options;
438
439   options = 0;
440   if (ch->nobuffer)
441     options |= GNUNET_CADET_OPTION_NOBUFFER;
442   if (ch->reliable)
443     options |= GNUNET_CADET_OPTION_RELIABLE;
444   if (ch->out_of_order)
445     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
446   msgcc.header.size = htons (sizeof (msgcc));
447   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
448   msgcc.opt = htonl (options);
449   msgcc.port = ch->port;
450   msgcc.chid = ch->chid;
451   ch->state = CADET_CHANNEL_CREATE_SENT;
452   ch->last_control_qe = GCT_send (ch->t,
453                                   &msgcc.header,
454                                   &create_sent_cb,
455                                   ch);
456 }
457
458
459 /**
460  * Create a new channel.
461  *
462  * @param owner local client owning the channel
463  * @param owner_id local chid of this channel at the @a owner
464  * @param destination peer to which we should build the channel
465  * @param port desired port at @a destination
466  * @param options options for the channel
467  * @return handle to the new channel
468  */
469 struct CadetChannel *
470 GCCH_channel_local_new (struct CadetClient *owner,
471                         struct GNUNET_CADET_ClientChannelNumber owner_id,
472                         struct CadetPeer *destination,
473                         const struct GNUNET_HashCode *port,
474                         uint32_t options)
475 {
476   struct CadetChannel *ch;
477
478   ch = GNUNET_new (struct CadetChannel);
479   ch->max_pending_messages = 32; /* FIXME: allow control via options
480                                     or adjust dynamically... */
481   ch->owner = owner;
482   ch->lid = owner_id;
483   ch->port = *port;
484   ch->t = GCP_get_tunnel (destination,
485                           GNUNET_YES);
486   ch->chid = GCT_add_channel (ch->t,
487                               ch);
488   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
489   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
490   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
491   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
492   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create,
493                                              ch);
494   GNUNET_STATISTICS_update (stats,
495                             "# channels",
496                             1,
497                             GNUNET_NO);
498   return ch;
499 }
500
501
502 /**
503  * We had an incoming channel to a port that is closed.
504  * It has not been opened for a while, drop it.
505  *
506  * @param cls the channel to drop
507  */
508 static void
509 timeout_closed_cb (void *cls)
510 {
511   struct CadetChannel *ch = cls;
512
513   ch->retry_task = NULL;
514   channel_destroy (ch);
515 }
516
517
518 /**
519  * Create a new channel based on a request coming in over the network.
520  *
521  * @param t tunnel to the remote peer
522  * @param chid identifier of this channel in the tunnel
523  * @param port desired local port
524  * @param options options for the channel
525  * @return handle to the new channel
526  */
527 struct CadetChannel *
528 GCCH_channel_incoming_new (struct CadetTunnel *t,
529                            struct GNUNET_CADET_ChannelTunnelNumber chid,
530                            const struct GNUNET_HashCode *port,
531                            uint32_t options)
532 {
533   struct CadetChannel *ch;
534   struct CadetClient *c;
535
536   ch = GNUNET_new (struct CadetChannel);
537   ch->max_pending_messages = 32; /* FIXME: allow control via options
538                                     or adjust dynamically... */
539   ch->port = *port;
540   ch->t = t;
541   ch->chid = chid;
542   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
543   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
544   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
545   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
546   GNUNET_STATISTICS_update (stats,
547                             "# channels",
548                             1,
549                             GNUNET_NO);
550
551   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
552                                          port);
553   if (NULL == c)
554   {
555     /* port closed, wait for it to possibly open */
556     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
557                                               port,
558                                               ch,
559                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
560     ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
561                                                    &timeout_closed_cb,
562                                                    ch);
563   }
564   else
565   {
566     GCCH_bind (ch,
567                c);
568   }
569   GNUNET_STATISTICS_update (stats,
570                             "# channels",
571                             1,
572                             GNUNET_NO);
573   return ch;
574 }
575
576
577 /**
578  * Function called once the tunnel confirms that we sent the
579  * ACK message.  Just remembers it was sent, we do not expect
580  * ACKs for ACKs ;-).
581  *
582  * @param cls our `struct CadetChannel`.
583  */
584 static void
585 send_ack_cb (void *cls)
586 {
587   struct CadetChannel *ch = cls;
588
589   ch->last_control_qe = NULL;
590 }
591
592
593 /**
594  * Compute and send the current ACK to the other peer.
595  *
596  * @param ch channel to send the ACK for
597  */
598 static void
599 send_channel_ack (struct CadetChannel *ch)
600 {
601   struct GNUNET_CADET_ChannelDataAckMessage msg;
602
603   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
604   msg.header.size = htons (sizeof (msg));
605   msg.gid = ch->chid;
606   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
607   msg.futures = GNUNET_htonll (ch->mid_futures);
608   if (NULL != ch->last_control_qe)
609     GCT_send_cancel (ch->last_control_qe);
610   ch->last_control_qe = GCT_send (ch->t,
611                                   &msg.header,
612                                   &send_ack_cb,
613                                   ch);
614 }
615
616
617 /**
618  * Send our initial ACK to the client confirming that the
619  * connection is up.
620  *
621  * @param cls the `struct CadetChannel`
622  */
623 static void
624 send_connect_ack (void *cls)
625 {
626   struct CadetChannel *ch = cls;
627
628   ch->retry_task = NULL;
629   send_channel_ack (ch);
630 }
631
632
633 /**
634  * A client is bound to the port that we have a channel
635  * open to.  Send the acknowledgement for the connection
636  * request and establish the link with the client.
637  *
638  * @param ch open incoming channel
639  * @param c client listening on the respective port
640  */
641 void
642 GCCH_bind (struct CadetChannel *ch,
643            struct CadetClient *c)
644 {
645   uint32_t options;
646
647   if (NULL != ch->retry_task)
648   {
649     /* there might be a timeout task here */
650     GNUNET_SCHEDULER_cancel (ch->retry_task);
651     ch->retry_task = NULL;
652   }
653   options = 0;
654   if (ch->nobuffer)
655     options |= GNUNET_CADET_OPTION_NOBUFFER;
656   if (ch->reliable)
657     options |= GNUNET_CADET_OPTION_RELIABLE;
658   if (ch->out_of_order)
659     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
660   ch->dest = c;
661   ch->lid = GSC_bind (c,
662                       ch,
663                       GCT_get_destination (ch->t),
664                       &ch->port,
665                       options);
666   ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
667
668   /* notify other peer that we accepted the connection */
669   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
670                                              ch);
671 }
672
673
674 /**
675  * Destroy locally created channel.  Called by the
676  * local client, so no need to tell the client.
677  *
678  * @param ch channel to destroy
679  */
680 void
681 GCCH_channel_local_destroy (struct CadetChannel *ch)
682 {
683   if (GNUNET_YES == ch->destroy)
684   {
685     /* other end already destroyed, with the local client gone, no need
686        to finish transmissions, just destroy immediately. */
687     channel_destroy (ch);
688     return;
689   }
690   if (NULL != ch->head_sent)
691   {
692     /* allow send queue to train first */
693     ch->destroy = GNUNET_YES;
694     return;
695   }
696   /* Nothing left to do, just finish destruction */
697   GCT_send_channel_destroy (ch->t,
698                             ch->chid);
699   channel_destroy (ch);
700 }
701
702
703 /**
704  * Destroy channel that was incoming.  Called by the
705  * local client, so no need to tell the client.
706  *
707  * @param ch channel to destroy
708  */
709 void
710 GCCH_channel_incoming_destroy (struct CadetChannel *ch)
711 {
712   if (GNUNET_YES == ch->destroy)
713   {
714     /* other end already destroyed, with the remote client gone, no need
715        to finish transmissions, just destroy immediately. */
716     channel_destroy (ch);
717     return;
718   }
719   if (NULL != ch->head_recv)
720   {
721     /* allow local client to see all data first */
722     ch->destroy = GNUNET_YES;
723     return;
724   }
725   /* Nothing left to do, just finish destruction */
726   GCT_send_channel_destroy (ch->t,
727                             ch->chid);
728   channel_destroy (ch);
729 }
730
731
732 /**
733  * Destroy channel, based on the other peer closing the
734  * connection.  Also needs to remove this channel from
735  * the tunnel.
736  *
737  * FIXME: need to make it possible to defer destruction until we have
738  * received all messages up to the destroy, and right now the destroy
739  * message (and this API) fails to give is the information we need!
740  *
741  * FIXME: also need to know if the other peer got a destroy from
742  * us before!
743  *
744  * @param ch channel to destroy
745  */
746 void
747 GCCH_channel_remote_destroy (struct CadetChannel *ch)
748 {
749   GNUNET_break (0); // FIXME!
750 }
751
752
753 /**
754  * Function called once the tunnel has sent one of our messages.
755  * If the message is unreliable, simply frees the `crm`. If the
756  * message was reliable, calculate retransmission time and
757  * wait for ACK (or retransmit).
758  *
759  * @param cls the `struct CadetReliableMessage` that was sent
760  */
761 static void
762 data_sent_cb (void *cls);
763
764
765 /**
766  * We need to retry a transmission, the last one took too long to
767  * be acknowledged.
768  *
769  * @param cls the `struct CadetChannel` where we need to retransmit
770  */
771 static void
772 retry_transmission (void *cls)
773 {
774   struct CadetChannel *ch = cls;
775   struct CadetReliableMessage *crm = ch->head_sent;
776
777   GNUNET_assert (NULL == crm->qe);
778   crm->qe = GCT_send (ch->t,
779                       &crm->data_message.header,
780                       &data_sent_cb,
781                       crm);
782 }
783
784
785 /**
786  * Check if we can now allow the client to transmit, and if so,
787  * let the client know about it.
788  *
789  * @param ch channel to check
790  */
791 static void
792 GCCH_check_allow_client (struct CadetChannel *ch)
793 {
794   struct GNUNET_MQ_Envelope *env;
795   struct GNUNET_CADET_LocalAck *msg;
796
797   if (GNUNET_YES == ch->client_allowed)
798     return; /* client already allowed! */
799   if (CADET_CHANNEL_READY != ch->state)
800   {
801     /* destination did not yet ACK our CREATE! */
802     LOG (GNUNET_ERROR_TYPE_DEBUG,
803          "Channel %s not yet ready, throttling client until ACK.\n",
804          GCCH_2s (ch));
805     return;
806   }
807   if (ch->pending_messages > ch->max_pending_messages)
808   {
809     /* Too many messages in queue. */
810     LOG (GNUNET_ERROR_TYPE_DEBUG,
811          "Message queue still too long on channel %s, throttling client until ACK.\n",
812          GCCH_2s (ch));
813     return;
814   }
815   if ( (NULL != ch->head_sent) &&
816        (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
817   {
818     LOG (GNUNET_ERROR_TYPE_DEBUG,
819          "Gap in ACKs too big on channel %s, throttling client until ACK.\n",
820          GCCH_2s (ch));
821     return;
822   }
823   ch->client_allowed = GNUNET_YES;
824
825
826   LOG (GNUNET_ERROR_TYPE_DEBUG,
827        "Sending local ack to channel %s client\n",
828        GCCH_2s (ch));
829   env = GNUNET_MQ_msg (msg,
830                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
831   msg->channel_id = ch->lid;
832   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
833                       env);
834 }
835
836
837 /**
838  * Function called once the tunnel has sent one of our messages.
839  * If the message is unreliable, simply frees the `crm`. If the
840  * message was reliable, calculate retransmission time and
841  * wait for ACK (or retransmit).
842  *
843  * @param cls the `struct CadetReliableMessage` that was sent
844  */
845 static void
846 data_sent_cb (void *cls)
847 {
848   struct CadetReliableMessage *crm = cls;
849   struct CadetChannel *ch = crm->ch;
850   struct CadetReliableMessage *off;
851
852   crm->qe = NULL;
853   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
854                                ch->tail_sent,
855                                crm);
856   if (GNUNET_NO == ch->reliable)
857   {
858     GNUNET_free (crm);
859     ch->pending_messages--;
860     GCCH_check_allow_client (ch);
861     return;
862   }
863   if (0 == crm->retry_delay.rel_value_us)
864     crm->retry_delay = ch->expected_delay;
865   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
866
867   /* find position for re-insertion into the DLL */
868   if ( (NULL == ch->head_sent) ||
869        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
870   {
871     /* insert at HEAD, also (re)schedule retry task! */
872     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
873                                  ch->tail_sent,
874                                  crm);
875     if (NULL != ch->retry_task)
876       GNUNET_SCHEDULER_cancel (ch->retry_task);
877     ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
878                                                    &retry_transmission,
879                                                    ch);
880     return;
881   }
882   for (off = ch->head_sent; NULL != off; off = off->next)
883     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
884       break;
885   if (NULL == off)
886   {
887     /* insert at tail */
888     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
889                                       ch->tail_sent,
890                                       crm);
891   }
892   else
893   {
894     /* insert before off */
895     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
896                                        ch->tail_sent,
897                                        off->prev,
898                                        crm);
899   }
900 }
901
902
903 /**
904  * Handle data given by a client.
905  *
906  * Check whether the client is allowed to send in this tunnel, save if
907  * channel is reliable and send an ACK to the client if there is still
908  * buffer space in the tunnel.
909  *
910  * @param ch Channel.
911  * @param message payload to transmit.
912  * @return #GNUNET_OK if everything goes well,
913  *         #GNUNET_SYSERR in case of an error.
914  */
915 int
916 GCCH_handle_local_data (struct CadetChannel *ch,
917                         const struct GNUNET_MessageHeader *message)
918 {
919   uint16_t payload_size = ntohs (message->size);
920   struct CadetReliableMessage *crm;
921
922   if (GNUNET_NO == ch->client_allowed)
923   {
924     GNUNET_break_op (0);
925     return GNUNET_SYSERR;
926   }
927   ch->client_allowed = GNUNET_NO;
928   ch->pending_messages++;
929
930   /* Everything is correct, send the message. */
931   crm = GNUNET_malloc (sizeof (*crm) + payload_size);
932   crm->ch = ch;
933   crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size);
934   crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
935   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
936   crm->data_message.mid = ch->mid_send;
937   crm->data_message.gid = ch->chid;
938   GNUNET_memcpy (&crm[1],
939                  message,
940                  payload_size);
941   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
942                                ch->tail_sent,
943                                crm);
944   LOG (GNUNET_ERROR_TYPE_DEBUG,
945        "Sending %u bytes from local client to channel %s\n",
946        payload_size,
947        GCCH_2s (ch));
948   crm->qe = GCT_send (ch->t,
949                       &crm->data_message.header,
950                       &data_sent_cb,
951                       crm);
952   GCCH_check_allow_client (ch);
953   return GNUNET_OK;
954 }
955
956
957 /**
958  * Try to deliver messages to the local client, if it is ready for more.
959  *
960  * @param ch channel to process
961  */
962 static void
963 send_client_buffered_data (struct CadetChannel *ch)
964 {
965   struct CadetOutOfOrderMessage *com;
966
967   if (GNUNET_NO == ch->client_ready)
968     return; /* client not ready */
969   com = ch->head_recv;
970   if (NULL == com)
971     return; /* none pending */
972   if ( (com->mid.mid != ch->mid_recv.mid) &&
973        (GNUNET_NO == ch->out_of_order) )
974     return; /* missing next one in-order */
975
976   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977               "Passing payload message to client on channel %s\n",
978               GCCH_2s (ch));
979
980   /* all good, pass next message to client */
981   GNUNET_CONTAINER_DLL_remove (ch->head_recv,
982                                ch->tail_recv,
983                                com);
984   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
985   ch->mid_futures >>= 1; /* equivalent to division by 2 */
986   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
987                       com->env);
988   GNUNET_free (com);
989   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
990        (GNUNET_YES == ch->reliable) )
991   {
992     /* The next 15 messages were also already received (0xFF), this
993        suggests that the sender may be blocked on flow control
994        urgently waiting for an ACK from us. (As we have an inherent
995        maximum of 64 bits, and 15 is getting too close for comfort.)
996        So we should send one now. */
997     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998                 "Sender on channel %s likely blocked on flow-control, sending ACK now.\n",
999                 GCCH_2s (ch));
1000     if (GNUNET_YES == ch->reliable)
1001       send_channel_ack (ch);
1002   }
1003
1004   if (NULL != ch->head_recv)
1005     return;
1006   if (GNUNET_NO == ch->destroy)
1007     return;
1008   GCT_send_channel_destroy (ch->t,
1009                             ch->chid);
1010   channel_destroy (ch);
1011 }
1012
1013
1014 /**
1015  * Handle ACK from client on local channel.
1016  *
1017  * @param ch channel to destroy
1018  */
1019 void
1020 GCCH_handle_local_ack (struct CadetChannel *ch)
1021 {
1022   ch->client_ready = GNUNET_YES;
1023   send_client_buffered_data (ch);
1024 }
1025
1026
1027 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1028
1029
1030 /**
1031  * Log channel info.
1032  *
1033  * @param ch Channel.
1034  * @param level Debug level to use.
1035  */
1036 void
1037 GCCH_debug (struct CadetChannel *ch,
1038             enum GNUNET_ErrorType level)
1039 {
1040   int do_log;
1041
1042   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1043                                        "cadet-chn",
1044                                        __FILE__, __FUNCTION__, __LINE__);
1045   if (0 == do_log)
1046     return;
1047
1048   if (NULL == ch)
1049   {
1050     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1051     return;
1052   }
1053   LOG2 (level,
1054         "CHN Channel %s:%X (%p)\n",
1055         GCT_2s (ch->t),
1056         ch->chid,
1057         ch);
1058   if (NULL != ch->owner)
1059   {
1060     LOG2 (level,
1061           "CHN origin %s ready %s local-id: %u\n",
1062           GSC_2s (ch->owner),
1063           ch->client_ready ? "YES" : "NO",
1064           ntohl (ch->lid.channel_of_client));
1065   }
1066   if (NULL != ch->dest)
1067   {
1068     LOG2 (level,
1069           "CHN destination %s ready %s local-id: %u\n",
1070           GSC_2s (ch->dest),
1071           ch->client_ready ? "YES" : "NO",
1072           ntohl (ch->lid.channel_of_client));
1073   }
1074   LOG2 (level,
1075         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1076         ntohl (ch->mid_recv.mid),
1077         (unsigned long long) ch->mid_futures,
1078         ntohl (ch->mid_send.mid));
1079 }
1080
1081
1082
1083 /* end of gnunet-service-cadet-new_channel.c */