note on things left to do
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21
22 /**
23  * @file cadet/gnunet-service-cadet-new_channel.c
24  * @brief logical links between CADET clients
25  * @author Bartlomiej Polot
26  * @author Christian Grothoff
27  *
28  * TODO:
29  * - estimate max bandwidth using bursts and use to optimize
30  *   transmission rate(s)
31  */
32
33 #include "platform.h"
34 #include "gnunet_util_lib.h"
35 #include "cadet.h"
36 #include "gnunet_statistics_service.h"
37 #include "gnunet-service-cadet-new.h"
38 #include "gnunet-service-cadet-new_channel.h"
39 #include "gnunet-service-cadet-new_connection.h"
40 #include "gnunet-service-cadet-new_tunnels.h"
41 #include "gnunet-service-cadet-new_peer.h"
42 #include "gnunet-service-cadet-new_paths.h"
43
44 #define LOG(level, ...) GNUNET_log (level,__VA_ARGS__)
45
46 /**
47  * How long do we initially wait before retransmitting?
48  */
49 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
50
51 /**
52  * How long do we wait before dropping state about incoming
53  * connection to closed port?
54  */
55 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
56
57
58 /**
59  * All the states a connection can be in.
60  */
61 enum CadetChannelState
62 {
63   /**
64    * Uninitialized status, should never appear in operation.
65    */
66   CADET_CHANNEL_NEW,
67
68   /**
69    * Connection create message sent, waiting for ACK.
70    */
71   CADET_CHANNEL_CREATE_SENT,
72
73   /**
74    * Connection confirmed, ready to carry traffic.
75    */
76   CADET_CHANNEL_READY
77 };
78
79
80 /**
81  * Info needed to retry a message in case it gets lost.
82  * Note that we DO use this structure also for unreliable
83  * messages.
84  */
85 struct CadetReliableMessage
86 {
87   /**
88    * Double linked list, FIFO style
89    */
90   struct CadetReliableMessage *next;
91
92   /**
93    * Double linked list, FIFO style
94    */
95   struct CadetReliableMessage *prev;
96
97   /**
98    * Which channel is this message in?
99    */
100   struct CadetChannel *ch;
101
102   /**
103    * Entry in the tunnels queue for this message, NULL if it has left
104    * the tunnel.  Used to cancel transmission in case we receive an
105    * ACK in time.
106    */
107   struct CadetTunnelQueueEntry *qe;
108
109   /**
110    * How soon should we retry if we fail to get an ACK?
111    * Messages in the queue are sorted by this value.
112    */
113   struct GNUNET_TIME_Absolute next_retry;
114
115   /**
116    * How long do we wait for an ACK after transmission?
117    * Use for the back-off calculation.
118    */
119   struct GNUNET_TIME_Relative retry_delay;
120
121   /**
122    * Data message we are trying to send.
123    */
124   struct GNUNET_CADET_ChannelAppDataMessage data_message;
125
126   /* followed by variable-size payload */
127 };
128
129
130 /**
131  * List of received out-of-order data messages.
132  */
133 struct CadetOutOfOrderMessage
134 {
135   /**
136    * Double linked list, FIFO style
137    */
138   struct CadetOutOfOrderMessage *next;
139
140   /**
141    * Double linked list, FIFO style
142    */
143   struct CadetOutOfOrderMessage *prev;
144
145   /**
146    * ID of the message (ACK needed to free)
147    */
148   struct ChannelMessageIdentifier mid;
149
150   /**
151    * The envelope with the payload of the out-of-order message
152    */
153   struct GNUNET_MQ_Envelope *env;
154
155 };
156
157
158 /**
159  * Struct containing all information regarding a channel to a remote client.
160  */
161 struct CadetChannel
162 {
163   /**
164    * Tunnel this channel is in.
165    */
166   struct CadetTunnel *t;
167
168   /**
169    * Last entry in the tunnel's queue relating to control messages
170    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
171    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
172    * transmission in case we receive updated information.
173    */
174   struct CadetTunnelQueueEntry *last_control_qe;
175
176   /**
177    * Client owner of the tunnel, if any.
178    * (Used if this channel represends the initiating end of the tunnel.)
179    */
180   struct CadetClient *owner;
181
182   /**
183    * Client destination of the tunnel, if any.
184    * (Used if this channel represents the listening end of the tunnel.)
185    */
186   struct CadetClient *dest;
187
188   /**
189    * Head of DLL of messages sent and not yet ACK'd.
190    */
191   struct CadetReliableMessage *head_sent;
192
193   /**
194    * Tail of DLL of messages sent and not yet ACK'd.
195    */
196   struct CadetReliableMessage *tail_sent;
197
198   /**
199    * Head of DLL of messages received out of order or while client was unready.
200    */
201   struct CadetOutOfOrderMessage *head_recv;
202
203   /**
204    * Tail DLL of messages received out of order or while client was unready.
205    */
206   struct CadetOutOfOrderMessage *tail_recv;
207
208   /**
209    * Task to resend/poll in case no ACK is received.
210    */
211   struct GNUNET_SCHEDULER_Task *retry_task;
212
213   /**
214    * Last time the channel was used
215    */
216   struct GNUNET_TIME_Absolute timestamp;
217
218   /**
219    * Destination port of the channel.
220    */
221   struct GNUNET_HashCode port;
222
223   /**
224    * Counter for exponential backoff.
225    */
226   struct GNUNET_TIME_Relative retry_time;
227
228   /**
229    * How long does it usually take to get an ACK.
230    */
231   struct GNUNET_TIME_Relative expected_delay;
232
233   /**
234    * Bitfield of already-received messages past @e mid_recv.
235    */
236   uint64_t mid_futures;
237
238   /**
239    * Next MID expected for incoming traffic.
240    */
241   struct ChannelMessageIdentifier mid_recv;
242
243   /**
244    * Next MID to use for outgoing traffic.
245    */
246   struct ChannelMessageIdentifier mid_send;
247
248   /**
249    * Total (reliable) messages pending ACK for this channel.
250    */
251   unsigned int pending_messages;
252
253   /**
254    * Maximum (reliable) messages pending ACK for this channel
255    * before we throttle the client.
256    */
257   unsigned int max_pending_messages;
258
259   /**
260    * Number identifying this channel in its tunnel.
261    */
262   struct GNUNET_CADET_ChannelTunnelNumber gid;
263
264   /**
265    * Local tunnel number for local client owning the channel.
266    * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
267    */
268   struct GNUNET_CADET_ClientChannelNumber lid;
269
270   /**
271    * Channel state.
272    */
273   enum CadetChannelState state;
274
275   /**
276    * Can we send data to the client?
277    */
278   int client_ready;
279
280   /**
281    * Can the client send data to us?
282    */
283   int client_allowed;
284
285   /**
286    * Is the tunnel bufferless (minimum latency)?
287    */
288   int nobuffer;
289
290   /**
291    * Is the tunnel reliable?
292    */
293   int reliable;
294
295   /**
296    * Is the tunnel out-of-order?
297    */
298   int out_of_order;
299
300   /**
301    * Flag to signal the destruction of the channel.  If this is set to
302    * #GNUNET_YES the channel will be destroyed once the queue is
303    * empty.
304    */
305   int destroy;
306
307 };
308
309
310
311 /**
312  * Get the static string for identification of the channel.
313  *
314  * @param ch Channel.
315  *
316  * @return Static string with the channel IDs.
317  */
318 const char *
319 GCCH_2s (const struct CadetChannel *ch)
320 {
321   static char buf[128];
322
323   if (NULL == ch)
324     return "(NULL Channel)";
325   GNUNET_snprintf (buf,
326                    sizeof (buf),
327                    "%s:%s gid:%X (%X)",
328                    GCT_2s (ch->t),
329                    GNUNET_h2s (&ch->port),
330                    ch->gid,
331                    ntohl (ch->lid.channel_of_client));
332   return buf;
333 }
334
335
336 /**
337  * Get the channel's public ID.
338  *
339  * @param ch Channel.
340  *
341  * @return ID used to identify the channel with the remote peer.
342  */
343 struct GNUNET_CADET_ChannelTunnelNumber
344 GCCH_get_id (const struct CadetChannel *ch)
345 {
346   return ch->gid;
347 }
348
349
350 /**
351  * Destroy the given channel.
352  *
353  * @param ch channel to destroy
354  */
355 static void
356 channel_destroy (struct CadetChannel *ch)
357 {
358   struct CadetReliableMessage *crm;
359   struct CadetOutOfOrderMessage *com;
360
361   while (NULL != (crm = ch->head_sent))
362   {
363     GNUNET_assert (ch == crm->ch);
364     if (NULL != crm->qe)
365     {
366       GCT_send_cancel (crm->qe);
367       crm->qe = NULL;
368     }
369     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
370                                  ch->tail_sent,
371                                  crm);
372     GNUNET_free (crm);
373   }
374   while (NULL != (com = ch->head_recv))
375   {
376     GNUNET_CONTAINER_DLL_remove (ch->head_recv,
377                                  ch->tail_recv,
378                                  com);
379     GNUNET_MQ_discard (com->env);
380     GNUNET_free (com);
381   }
382   if (NULL != ch->last_control_qe)
383   {
384     GCT_send_cancel (ch->last_control_qe);
385     ch->last_control_qe = NULL;
386   }
387   if (NULL != ch->retry_task)
388   {
389     GNUNET_SCHEDULER_cancel (ch->retry_task);
390     ch->retry_task = NULL;
391   }
392   GCT_remove_channel (ch->t,
393                       ch,
394                       ch->gid);
395   GNUNET_free (ch);
396 }
397
398
399 /**
400  * Send a channel create message.
401  *
402  * @param cls Channel for which to send.
403  */
404 static void
405 send_create (void *cls);
406
407
408 /**
409  * Function called once the tunnel confirms that we sent the
410  * create message.  Delays for a bit until we retry.
411  *
412  * @param cls our `struct CadetChannel`.
413  */
414 static void
415 create_sent_cb (void *cls)
416 {
417   struct CadetChannel *ch = cls;
418
419   ch->last_control_qe = NULL;
420   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
421   ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
422                                                  &send_create,
423                                                  ch);
424 }
425
426
427 /**
428  * Send a channel create message.
429  *
430  * @param cls Channel for which to send.
431  */
432 static void
433 send_create (void *cls)
434 {
435   struct CadetChannel *ch = cls;
436   struct GNUNET_CADET_ChannelOpenMessage msgcc;
437   uint32_t options;
438
439   options = 0;
440   if (ch->nobuffer)
441     options |= GNUNET_CADET_OPTION_NOBUFFER;
442   if (ch->reliable)
443     options |= GNUNET_CADET_OPTION_RELIABLE;
444   if (ch->out_of_order)
445     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
446   msgcc.header.size = htons (sizeof (msgcc));
447   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
448   msgcc.opt = htonl (options);
449   msgcc.port = ch->port;
450   msgcc.chid = ch->gid;
451   ch->state = CADET_CHANNEL_CREATE_SENT;
452   ch->last_control_qe = GCT_send (ch->t,
453                                   &msgcc.header,
454                                   &create_sent_cb,
455                                   ch);
456 }
457
458
459 /**
460  * Create a new channel.
461  *
462  * @param owner local client owning the channel
463  * @param owner_id local chid of this channel at the @a owner
464  * @param destination peer to which we should build the channel
465  * @param port desired port at @a destination
466  * @param options options for the channel
467  * @return handle to the new channel
468  */
469 struct CadetChannel *
470 GCCH_channel_local_new (struct CadetClient *owner,
471                         struct GNUNET_CADET_ClientChannelNumber owner_id,
472                         struct CadetPeer *destination,
473                         const struct GNUNET_HashCode *port,
474                         uint32_t options)
475 {
476   struct CadetChannel *ch;
477
478   ch = GNUNET_new (struct CadetChannel);
479   ch->max_pending_messages = 32; /* FIXME: allow control via options
480                                     or adjust dynamically... */
481   ch->owner = owner;
482   ch->lid = owner_id;
483   ch->port = *port;
484   ch->t = GCP_get_tunnel (destination,
485                           GNUNET_YES);
486   ch->gid = GCT_add_channel (ch->t,
487                              ch);
488   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
489   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
490   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
491   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
492   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create,
493                                              ch);
494   GNUNET_STATISTICS_update (stats,
495                             "# channels",
496                             1,
497                             GNUNET_NO);
498   return ch;
499 }
500
501
502 /**
503  * We had an incoming channel to a port that is closed.
504  * It has not been opened for a while, drop it.
505  *
506  * @param cls the channel to drop
507  */
508 static void
509 timeout_closed_cb (void *cls)
510 {
511   struct CadetChannel *ch = cls;
512
513   ch->retry_task = NULL;
514   channel_destroy (ch);
515 }
516
517
518 /**
519  * Create a new channel.
520  *
521  * @param t tunnel to the remote peer
522  * @param gid identifier of this channel in the tunnel
523  * @param port desired local port
524  * @param options options for the channel
525  * @return handle to the new channel
526  */
527 struct CadetChannel *
528 GCCH_channel_incoming_new (struct CadetTunnel *t,
529                            struct GNUNET_CADET_ChannelTunnelNumber gid,
530                            const struct GNUNET_HashCode *port,
531                            uint32_t options)
532 {
533   struct CadetChannel *ch;
534   struct CadetClient *c;
535
536   ch = GNUNET_new (struct CadetChannel);
537   ch->max_pending_messages = 32; /* FIXME: allow control via options
538                                     or adjust dynamically... */
539   ch->port = *port;
540   ch->t = t;
541   ch->gid = gid;
542   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
543   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
544   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
545   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
546   GNUNET_STATISTICS_update (stats,
547                             "# channels",
548                             1,
549                             GNUNET_NO);
550
551   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
552                                          port);
553   if (NULL == c)
554   {
555     /* port closed, wait for it to possibly open */
556     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
557                                               port,
558                                               ch,
559                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
560     ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
561                                                    &timeout_closed_cb,
562                                                    ch);
563   }
564   else
565   {
566     GCCH_bind (ch,
567                c);
568   }
569   GNUNET_STATISTICS_update (stats,
570                             "# channels",
571                             1,
572                             GNUNET_NO);
573   return ch;
574 }
575
576
577 /**
578  * Function called once the tunnel confirms that we sent the
579  * ACK message.  Just remembers it was sent, we do not expect
580  * ACKs for ACKs ;-).
581  *
582  * @param cls our `struct CadetChannel`.
583  */
584 static void
585 send_ack_cb (void *cls)
586 {
587   struct CadetChannel *ch = cls;
588
589   ch->last_control_qe = NULL;
590 }
591
592
593 /**
594  * Compute and send the current ACK to the other peer.
595  *
596  * @param ch channel to send the ACK for
597  */
598 static void
599 send_channel_ack (struct CadetChannel *ch)
600 {
601   struct GNUNET_CADET_ChannelDataAckMessage msg;
602
603   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
604   msg.header.size = htons (sizeof (msg));
605   msg.gid = ch->gid;
606   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
607   msg.futures = GNUNET_htonll (ch->mid_futures);
608   if (NULL != ch->last_control_qe)
609     GCT_send_cancel (ch->last_control_qe);
610   ch->last_control_qe = GCT_send (ch->t,
611                                   &msg.header,
612                                   &send_ack_cb,
613                                   ch);
614 }
615
616
617 /**
618  * Send our initial ACK to the client confirming that the
619  * connection is up.
620  *
621  * @param cls the `struct CadetChannel`
622  */
623 static void
624 send_connect_ack (void *cls)
625 {
626   struct CadetChannel *ch = cls;
627
628   ch->retry_task = NULL;
629   send_channel_ack (ch);
630 }
631
632
633 /**
634  * A client is bound to the port that we have a channel
635  * open to.  Send the acknowledgement for the connection
636  * request and establish the link with the client.
637  *
638  * @param ch open incoming channel
639  * @param c client listening on the respective port
640  */
641 void
642 GCCH_bind (struct CadetChannel *ch,
643            struct CadetClient *c)
644 {
645   uint32_t options;
646
647   if (NULL != ch->retry_task)
648   {
649     /* there might be a timeout task here */
650     GNUNET_SCHEDULER_cancel (ch->retry_task);
651     ch->retry_task = NULL;
652   }
653   options = 0;
654   if (ch->nobuffer)
655     options |= GNUNET_CADET_OPTION_NOBUFFER;
656   if (ch->reliable)
657     options |= GNUNET_CADET_OPTION_RELIABLE;
658   if (ch->out_of_order)
659     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
660   ch->dest = c;
661   ch->lid = GSC_bind (c,
662                       ch,
663                       GCT_get_destination (ch->t),
664                       &ch->port,
665                       options);
666   ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
667
668   /* notify other peer that we accepted the connection */
669   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
670                                              ch);
671 }
672
673
674 /**
675  * Destroy locally created channel.  Called by the
676  * local client, so no need to tell the client.
677  *
678  * @param ch channel to destroy
679  */
680 void
681 GCCH_channel_local_destroy (struct CadetChannel *ch)
682 {
683   if (GNUNET_YES == ch->destroy)
684   {
685     /* other end already destroyed, with the local client gone, no need
686        to finish transmissions, just destroy immediately. */
687     channel_destroy (ch);
688     return;
689   }
690   if (NULL != ch->head_sent)
691   {
692     /* allow send queue to train first */
693     ch->destroy = GNUNET_YES;
694     return;
695   }
696   /* Nothing left to do, just finish destruction */
697   channel_destroy (ch);
698 }
699
700
701 /**
702  * Destroy channel that was incoming.  Called by the
703  * local client, so no need to tell the client.
704  *
705  * @param ch channel to destroy
706  */
707 void
708 GCCH_channel_incoming_destroy (struct CadetChannel *ch)
709 {
710   if (GNUNET_YES == ch->destroy)
711   {
712     /* other end already destroyed, with the remote client gone, no need
713        to finish transmissions, just destroy immediately. */
714     channel_destroy (ch);
715     return;
716   }
717   if (NULL != ch->head_recv)
718   {
719     /* allow local client to see all data first */
720     ch->destroy = GNUNET_YES;
721     return;
722   }
723   /* Nothing left to do, just finish destruction */
724   channel_destroy (ch);
725 }
726
727
728 /**
729  * Destroy channel, based on the other peer closing the
730  * connection.  Also needs to remove this channel from
731  * the tunnel.
732  *
733  * FIXME: need to make it possible to defer destruction until we have
734  * received all messages up to the destroy, and right now the destroy
735  * message (and this API) fails to give is the information we need!
736  *
737  * FIXME: also need to know if the other peer got a destroy from
738  * us before!
739  *
740  * @param ch channel to destroy
741  */
742 void
743 GCCH_channel_remote_destroy (struct CadetChannel *ch)
744 {
745   GNUNET_break (0); // FIXME!
746 }
747
748
749 /**
750  * Function called once the tunnel has sent one of our messages.
751  * If the message is unreliable, simply frees the `crm`. If the
752  * message was reliable, calculate retransmission time and
753  * wait for ACK (or retransmit).
754  *
755  * @param cls the `struct CadetReliableMessage` that was sent
756  */
757 static void
758 data_sent_cb (void *cls);
759
760
761 /**
762  * We need to retry a transmission, the last one took too long to
763  * be acknowledged.
764  *
765  * @param cls the `struct CadetChannel` where we need to retransmit
766  */
767 static void
768 retry_transmission (void *cls)
769 {
770   struct CadetChannel *ch = cls;
771   struct CadetReliableMessage *crm = ch->head_sent;
772
773   GNUNET_assert (NULL == crm->qe);
774   crm->qe = GCT_send (ch->t,
775                       &crm->data_message.header,
776                       &data_sent_cb,
777                       crm);
778 }
779
780
781 /**
782  * Check if we can now allow the client to transmit, and if so,
783  * let the client know about it.
784  *
785  * @param ch channel to check
786  */
787 static void
788 GCCH_check_allow_client (struct CadetChannel *ch)
789 {
790   struct GNUNET_MQ_Envelope *env;
791   struct GNUNET_CADET_LocalAck *msg;
792
793   if (GNUNET_YES == ch->client_allowed)
794     return; /* client already allowed! */
795   if (CADET_CHANNEL_READY != ch->state)
796   {
797     /* destination did not yet ACK our CREATE! */
798     LOG (GNUNET_ERROR_TYPE_DEBUG,
799          "Channel %s not yet ready, throttling client until ACK.\n",
800          GCCH_2s (ch));
801     return;
802   }
803   if (ch->pending_messages > ch->max_pending_messages)
804   {
805     /* Too many messages in queue. */
806     LOG (GNUNET_ERROR_TYPE_DEBUG,
807          "Message queue still too long on channel %s, throttling client until ACK.\n",
808          GCCH_2s (ch));
809     return;
810   }
811   if ( (NULL != ch->head_sent) &&
812        (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
813   {
814     LOG (GNUNET_ERROR_TYPE_DEBUG,
815          "Gap in ACKs too big on channel %s, throttling client until ACK.\n",
816          GCCH_2s (ch));
817     return;
818   }
819   ch->client_allowed = GNUNET_YES;
820
821
822   LOG (GNUNET_ERROR_TYPE_DEBUG,
823        "Sending local ack to channel %s client\n",
824        GCCH_2s (ch));
825   env = GNUNET_MQ_msg (msg,
826                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
827   msg->channel_id = ch->lid;
828   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
829                       env);
830 }
831
832
833 /**
834  * Function called once the tunnel has sent one of our messages.
835  * If the message is unreliable, simply frees the `crm`. If the
836  * message was reliable, calculate retransmission time and
837  * wait for ACK (or retransmit).
838  *
839  * @param cls the `struct CadetReliableMessage` that was sent
840  */
841 static void
842 data_sent_cb (void *cls)
843 {
844   struct CadetReliableMessage *crm = cls;
845   struct CadetChannel *ch = crm->ch;
846   struct CadetReliableMessage *off;
847
848   crm->qe = NULL;
849   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
850                                ch->tail_sent,
851                                crm);
852   if (GNUNET_NO == ch->reliable)
853   {
854     GNUNET_free (crm);
855     ch->pending_messages--;
856     GCCH_check_allow_client (ch);
857     return;
858   }
859   if (0 == crm->retry_delay.rel_value_us)
860     crm->retry_delay = ch->expected_delay;
861   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
862
863   /* find position for re-insertion into the DLL */
864   if ( (NULL == ch->head_sent) ||
865        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
866   {
867     /* insert at HEAD, also (re)schedule retry task! */
868     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
869                                  ch->tail_sent,
870                                  crm);
871     if (NULL != ch->retry_task)
872       GNUNET_SCHEDULER_cancel (ch->retry_task);
873     ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
874                                                    &retry_transmission,
875                                                    ch);
876     return;
877   }
878   for (off = ch->head_sent; NULL != off; off = off->next)
879     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
880       break;
881   if (NULL == off)
882   {
883     /* insert at tail */
884     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
885                                       ch->tail_sent,
886                                       crm);
887   }
888   else
889   {
890     /* insert before off */
891     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
892                                        ch->tail_sent,
893                                        off->prev,
894                                        crm);
895   }
896 }
897
898
899 /**
900  * Handle data given by a client.
901  *
902  * Check whether the client is allowed to send in this tunnel, save if
903  * channel is reliable and send an ACK to the client if there is still
904  * buffer space in the tunnel.
905  *
906  * @param ch Channel.
907  * @param message payload to transmit.
908  * @return #GNUNET_OK if everything goes well,
909  *         #GNUNET_SYSERR in case of an error.
910  */
911 int
912 GCCH_handle_local_data (struct CadetChannel *ch,
913                         const struct GNUNET_MessageHeader *message)
914 {
915   uint16_t payload_size = ntohs (message->size);
916   struct CadetReliableMessage *crm;
917
918   if (GNUNET_NO == ch->client_allowed)
919   {
920     GNUNET_break_op (0);
921     return GNUNET_SYSERR;
922   }
923   ch->client_allowed = GNUNET_NO;
924   ch->pending_messages++;
925
926   /* Everything is correct, send the message. */
927   crm = GNUNET_malloc (sizeof (*crm) + payload_size);
928   crm->ch = ch;
929   crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size);
930   crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
931   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
932   crm->data_message.mid = ch->mid_send;
933   crm->data_message.gid = ch->gid;
934   GNUNET_memcpy (&crm[1],
935                  message,
936                  payload_size);
937   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
938                                ch->tail_sent,
939                                crm);
940   LOG (GNUNET_ERROR_TYPE_DEBUG,
941        "Sending %u bytes from local client to channel %s\n",
942        payload_size,
943        GCCH_2s (ch));
944   crm->qe = GCT_send (ch->t,
945                       &crm->data_message.header,
946                       &data_sent_cb,
947                       crm);
948   GCCH_check_allow_client (ch);
949   return GNUNET_OK;
950 }
951
952
953 /**
954  * Try to deliver messages to the local client, if it is ready for more.
955  *
956  * @param ch channel to process
957  */
958 static void
959 send_client_buffered_data (struct CadetChannel *ch)
960 {
961   struct CadetOutOfOrderMessage *com;
962
963   if (GNUNET_NO == ch->client_ready)
964     return; /* client not ready */
965   com = ch->head_recv;
966   if (NULL == com)
967     return; /* none pending */
968   if ( (com->mid.mid != ch->mid_recv.mid) &&
969        (GNUNET_NO == ch->out_of_order) )
970     return; /* missing next one in-order */
971
972   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973               "Passing payload message to client on channel %s\n",
974               GCCH_2s (ch));
975
976   /* all good, pass next message to client */
977   GNUNET_CONTAINER_DLL_remove (ch->head_recv,
978                                ch->tail_recv,
979                                com);
980   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
981   ch->mid_futures >>= 1; /* equivalent to division by 2 */
982   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
983                       com->env);
984   GNUNET_free (com);
985   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
986        (GNUNET_YES == ch->reliable) )
987   {
988     /* The next 15 messages were also already received (0xFF), this
989        suggests that the sender may be blocked on flow control
990        urgently waiting for an ACK from us. (As we have an inherent
991        maximum of 64 bits, and 15 is getting too close for comfort.)
992        So we should send one now. */
993     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994                 "Sender on channel %s likely blocked on flow-control, sending ACK now.\n",
995                 GCCH_2s (ch));
996     if (GNUNET_YES == ch->reliable)
997       send_channel_ack (ch);
998   }
999
1000   if (NULL != ch->head_recv)
1001     return;
1002   if (GNUNET_NO == ch->destroy)
1003     return;
1004   channel_destroy (ch);
1005 }
1006
1007
1008 /**
1009  * Handle ACK from client on local channel.
1010  *
1011  * @param ch channel to destroy
1012  */
1013 void
1014 GCCH_handle_local_ack (struct CadetChannel *ch)
1015 {
1016   ch->client_ready = GNUNET_YES;
1017   send_client_buffered_data (ch);
1018 }
1019
1020
1021 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1022
1023
1024 /**
1025  * Log channel info.
1026  *
1027  * @param ch Channel.
1028  * @param level Debug level to use.
1029  */
1030 void
1031 GCCH_debug (struct CadetChannel *ch,
1032             enum GNUNET_ErrorType level)
1033 {
1034   int do_log;
1035
1036   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1037                                        "cadet-chn",
1038                                        __FILE__, __FUNCTION__, __LINE__);
1039   if (0 == do_log)
1040     return;
1041
1042   if (NULL == ch)
1043   {
1044     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1045     return;
1046   }
1047   LOG2 (level,
1048         "CHN Channel %s:%X (%p)\n",
1049         GCT_2s (ch->t),
1050         ch->gid,
1051         ch);
1052   if (NULL != ch->owner)
1053   {
1054     LOG2 (level,
1055           "CHN origin %s ready %s local-id: %u\n",
1056           GSC_2s (ch->owner),
1057           ch->client_ready ? "YES" : "NO",
1058           ntohl (ch->lid.channel_of_client));
1059   }
1060   if (NULL != ch->dest)
1061   {
1062     LOG2 (level,
1063           "CHN destination %s ready %s local-id: %u\n",
1064           GSC_2s (ch->dest),
1065           ch->client_ready ? "YES" : "NO",
1066           ntohl (ch->lid.channel_of_client));
1067   }
1068   LOG2 (level,
1069         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1070         ntohl (ch->mid_recv.mid),
1071         (unsigned long long) ch->mid_futures,
1072         ntohl (ch->mid_send.mid));
1073 }
1074
1075
1076
1077 /* end of gnunet-service-cadet-new_channel.c */