60e1a77668bac35af450990e1cdae15bf9f8e93d
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - introduce shutdown so we can have half-closed channels, modify
29  *   destroy to include MID to have FIN-ACK equivalents, etc.
30  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31  * - check that '0xFFULL' really is sufficient for flow control!
32  * - revisit handling of 'unreliable' traffic!
33  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34  * - figure out flow control without ACKs (unreliable traffic!)
35  */
36 #include "platform.h"
37 #include "gnunet_util_lib.h"
38 #include "cadet.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
46
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
48
49 /**
50  * How long do we initially wait before retransmitting?
51  */
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
53
54 /**
55  * How long do we wait before dropping state about incoming
56  * connection to closed port?
57  */
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
59
60
61 /**
62  * All the states a connection can be in.
63  */
64 enum CadetChannelState
65 {
66   /**
67    * Uninitialized status, should never appear in operation.
68    */
69   CADET_CHANNEL_NEW,
70
71   /**
72    * Connection create message sent, waiting for ACK.
73    */
74   CADET_CHANNEL_CREATE_SENT,
75
76   /**
77    * Connection confirmed, ready to carry traffic.
78    */
79   CADET_CHANNEL_READY
80 };
81
82
83 /**
84  * Info needed to retry a message in case it gets lost.
85  * Note that we DO use this structure also for unreliable
86  * messages.
87  */
88 struct CadetReliableMessage
89 {
90   /**
91    * Double linked list, FIFO style
92    */
93   struct CadetReliableMessage *next;
94
95   /**
96    * Double linked list, FIFO style
97    */
98   struct CadetReliableMessage *prev;
99
100   /**
101    * Which channel is this message in?
102    */
103   struct CadetChannel *ch;
104
105   /**
106    * Entry in the tunnels queue for this message, NULL if it has left
107    * the tunnel.  Used to cancel transmission in case we receive an
108    * ACK in time.
109    */
110   struct CadetTunnelQueueEntry *qe;
111
112   /**
113    * How soon should we retry if we fail to get an ACK?
114    * Messages in the queue are sorted by this value.
115    */
116   struct GNUNET_TIME_Absolute next_retry;
117
118   /**
119    * How long do we wait for an ACK after transmission?
120    * Use for the back-off calculation.
121    */
122   struct GNUNET_TIME_Relative retry_delay;
123
124   /**
125    * Data message we are trying to send.
126    */
127   struct GNUNET_CADET_ChannelAppDataMessage data_message;
128
129   /* followed by variable-size payload */
130 };
131
132
133 /**
134  * List of received out-of-order data messages.
135  */
136 struct CadetOutOfOrderMessage
137 {
138   /**
139    * Double linked list, FIFO style
140    */
141   struct CadetOutOfOrderMessage *next;
142
143   /**
144    * Double linked list, FIFO style
145    */
146   struct CadetOutOfOrderMessage *prev;
147
148   /**
149    * ID of the message (messages up to this point needed
150    * before we give this one to the client).
151    */
152   struct ChannelMessageIdentifier mid;
153
154   /**
155    * The envelope with the payload of the out-of-order message
156    */
157   struct GNUNET_MQ_Envelope *env;
158
159 };
160
161
162 /**
163  * Struct containing all information regarding a channel to a remote client.
164  */
165 struct CadetChannel
166 {
167   /**
168    * Tunnel this channel is in.
169    */
170   struct CadetTunnel *t;
171
172   /**
173    * Last entry in the tunnel's queue relating to control messages
174    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
175    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
176    * transmission in case we receive updated information.
177    */
178   struct CadetTunnelQueueEntry *last_control_qe;
179
180   /**
181    * Client owner of the tunnel, if any.
182    * (Used if this channel represends the initiating end of the tunnel.)
183    */
184   struct CadetClient *owner;
185
186   /**
187    * Client destination of the tunnel, if any.
188    * (Used if this channel represents the listening end of the tunnel.)
189    */
190   struct CadetClient *dest;
191
192   /**
193    * Head of DLL of messages sent and not yet ACK'd.
194    */
195   struct CadetReliableMessage *head_sent;
196
197   /**
198    * Tail of DLL of messages sent and not yet ACK'd.
199    */
200   struct CadetReliableMessage *tail_sent;
201
202   /**
203    * Head of DLL of messages received out of order or while client was unready.
204    */
205   struct CadetOutOfOrderMessage *head_recv;
206
207   /**
208    * Tail DLL of messages received out of order or while client was unready.
209    */
210   struct CadetOutOfOrderMessage *tail_recv;
211
212   /**
213    * Task to resend/poll in case no ACK is received.
214    */
215   struct GNUNET_SCHEDULER_Task *retry_task;
216
217   /**
218    * Last time the channel was used
219    */
220   struct GNUNET_TIME_Absolute timestamp;
221
222   /**
223    * Destination port of the channel.
224    */
225   struct GNUNET_HashCode port;
226
227   /**
228    * Counter for exponential backoff.
229    */
230   struct GNUNET_TIME_Relative retry_time;
231
232   /**
233    * How long does it usually take to get an ACK.
234    */
235   struct GNUNET_TIME_Relative expected_delay;
236
237   /**
238    * Bitfield of already-received messages past @e mid_recv.
239    */
240   uint64_t mid_futures;
241
242   /**
243    * Next MID expected for incoming traffic.
244    */
245   struct ChannelMessageIdentifier mid_recv;
246
247   /**
248    * Next MID to use for outgoing traffic.
249    */
250   struct ChannelMessageIdentifier mid_send;
251
252   /**
253    * Total (reliable) messages pending ACK for this channel.
254    */
255   unsigned int pending_messages;
256
257   /**
258    * Maximum (reliable) messages pending ACK for this channel
259    * before we throttle the client.
260    */
261   unsigned int max_pending_messages;
262
263   /**
264    * Number identifying this channel in its tunnel.
265    */
266   struct GNUNET_CADET_ChannelTunnelNumber ctn;
267
268   /**
269    * Local tunnel number for local client owning the channel.
270    * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
271    */
272   struct GNUNET_CADET_ClientChannelNumber ccn;
273
274   /**
275    * Channel state.
276    */
277   enum CadetChannelState state;
278
279   /**
280    * Can we send data to the client?
281    */
282   int client_ready;
283
284   /**
285    * Can the client send data to us?
286    */
287   int client_allowed;
288
289   /**
290    * Is the tunnel bufferless (minimum latency)?
291    */
292   int nobuffer;
293
294   /**
295    * Is the tunnel reliable?
296    */
297   int reliable;
298
299   /**
300    * Is the tunnel out-of-order?
301    */
302   int out_of_order;
303
304   /**
305    * Flag to signal the destruction of the channel.  If this is set to
306    * #GNUNET_YES the channel will be destroyed once the queue is
307    * empty.
308    */
309   int destroy;
310
311 };
312
313
314 /**
315  * Get the static string for identification of the channel.
316  *
317  * @param ch Channel.
318  *
319  * @return Static string with the channel IDs.
320  */
321 const char *
322 GCCH_2s (const struct CadetChannel *ch)
323 {
324   static char buf[128];
325
326   if (NULL == ch)
327     return "(NULL Channel)";
328   GNUNET_snprintf (buf,
329                    sizeof (buf),
330                    "%s:%s ctn:%X (%X)",
331                    GCT_2s (ch->t),
332                    GNUNET_h2s (&ch->port),
333                    ch->ctn,
334                    ntohl (ch->ccn.channel_of_client));
335   return buf;
336 }
337
338
339 /**
340  * Get the channel's public ID.
341  *
342  * @param ch Channel.
343  *
344  * @return ID used to identify the channel with the remote peer.
345  */
346 struct GNUNET_CADET_ChannelTunnelNumber
347 GCCH_get_id (const struct CadetChannel *ch)
348 {
349   return ch->ctn;
350 }
351
352
353 /**
354  * Destroy the given channel.
355  *
356  * @param ch channel to destroy
357  */
358 static void
359 channel_destroy (struct CadetChannel *ch)
360 {
361   struct CadetReliableMessage *crm;
362   struct CadetOutOfOrderMessage *com;
363
364   while (NULL != (crm = ch->head_sent))
365   {
366     GNUNET_assert (ch == crm->ch);
367     if (NULL != crm->qe)
368     {
369       GCT_send_cancel (crm->qe);
370       crm->qe = NULL;
371     }
372     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
373                                  ch->tail_sent,
374                                  crm);
375     GNUNET_free (crm);
376   }
377   while (NULL != (com = ch->head_recv))
378   {
379     GNUNET_CONTAINER_DLL_remove (ch->head_recv,
380                                  ch->tail_recv,
381                                  com);
382     GNUNET_MQ_discard (com->env);
383     GNUNET_free (com);
384   }
385   if (NULL != ch->last_control_qe)
386   {
387     GCT_send_cancel (ch->last_control_qe);
388     ch->last_control_qe = NULL;
389   }
390   if (NULL != ch->retry_task)
391   {
392     GNUNET_SCHEDULER_cancel (ch->retry_task);
393     ch->retry_task = NULL;
394   }
395   GCT_remove_channel (ch->t,
396                       ch,
397                       ch->ctn);
398   GNUNET_free (ch);
399 }
400
401
402 /**
403  * Send a channel create message.
404  *
405  * @param cls Channel for which to send.
406  */
407 static void
408 send_channel_open (void *cls);
409
410
411 /**
412  * Function called once the tunnel confirms that we sent the
413  * create message.  Delays for a bit until we retry.
414  *
415  * @param cls our `struct CadetChannel`.
416  */
417 static void
418 channel_open_sent_cb (void *cls)
419 {
420   struct CadetChannel *ch = cls;
421
422   ch->last_control_qe = NULL;
423   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
424   ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
425                                                  &send_channel_open,
426                                                  ch);
427 }
428
429
430 /**
431  * Send a channel open message.
432  *
433  * @param cls Channel for which to send.
434  */
435 static void
436 send_channel_open (void *cls)
437 {
438   struct CadetChannel *ch = cls;
439   struct GNUNET_CADET_ChannelOpenMessage msgcc;
440   uint32_t options;
441
442   ch->retry_task = NULL;
443   LOG (GNUNET_ERROR_TYPE_DEBUG,
444        "Sending CHANNEL_OPEN message for channel %s\n",
445        GCCH_2s (ch));
446   options = 0;
447   if (ch->nobuffer)
448     options |= GNUNET_CADET_OPTION_NOBUFFER;
449   if (ch->reliable)
450     options |= GNUNET_CADET_OPTION_RELIABLE;
451   if (ch->out_of_order)
452     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
453   msgcc.header.size = htons (sizeof (msgcc));
454   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
455   msgcc.opt = htonl (options);
456   msgcc.port = ch->port;
457   msgcc.ctn = ch->ctn;
458   ch->state = CADET_CHANNEL_CREATE_SENT;
459   ch->last_control_qe = GCT_send (ch->t,
460                                   &msgcc.header,
461                                   &channel_open_sent_cb,
462                                   ch);
463 }
464
465
466 /**
467  * Function called once and only once after a channel was bound
468  * to its tunnel via #GCT_add_channel() is ready for transmission.
469  * Note that this is only the case for channels that this peer
470  * initiates, as for incoming channels we assume that they are
471  * ready for transmission immediately upon receiving the open
472  * message.  Used to bootstrap the #GCT_send() process.
473  *
474  * @param ch the channel for which the tunnel is now ready
475  */
476 void
477 GCCH_tunnel_up (struct CadetChannel *ch)
478 {
479   GNUNET_assert (NULL == ch->retry_task);
480   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_channel_open,
481                                              ch);
482 }
483
484
485 /**
486  * Create a new channel.
487  *
488  * @param owner local client owning the channel
489  * @param ccn local number of this channel at the @a owner
490  * @param destination peer to which we should build the channel
491  * @param port desired port at @a destination
492  * @param options options for the channel
493  * @return handle to the new channel
494  */
495 struct CadetChannel *
496 GCCH_channel_local_new (struct CadetClient *owner,
497                         struct GNUNET_CADET_ClientChannelNumber ccn,
498                         struct CadetPeer *destination,
499                         const struct GNUNET_HashCode *port,
500                         uint32_t options)
501 {
502   struct CadetChannel *ch;
503
504   ch = GNUNET_new (struct CadetChannel);
505   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
506   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
507   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
508   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
509   ch->owner = owner;
510   ch->ccn = ccn;
511   ch->port = *port;
512   ch->t = GCP_get_tunnel (destination,
513                           GNUNET_YES);
514   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
515   ch->ctn = GCT_add_channel (ch->t,
516                              ch);
517   GNUNET_STATISTICS_update (stats,
518                             "# channels",
519                             1,
520                             GNUNET_NO);
521   LOG (GNUNET_ERROR_TYPE_DEBUG,
522        "Created channel to port %s at peer %s for client %s using tunnel %s\n",
523        GNUNET_h2s (port),
524        GCP_2s (destination),
525        GSC_2s (owner),
526        GCT_2s (ch->t));
527   return ch;
528 }
529
530
531 /**
532  * We had an incoming channel to a port that is closed.
533  * It has not been opened for a while, drop it.
534  *
535  * @param cls the channel to drop
536  */
537 static void
538 timeout_closed_cb (void *cls)
539 {
540   struct CadetChannel *ch = cls;
541
542   ch->retry_task = NULL;
543   LOG (GNUNET_ERROR_TYPE_DEBUG,
544        "Closing incoming channel to port %s from peer %s due to timeout\n",
545        GNUNET_h2s (&ch->port),
546        GCP_2s (GCT_get_destination (ch->t)));
547   channel_destroy (ch);
548 }
549
550
551 /**
552  * Create a new channel based on a request coming in over the network.
553  *
554  * @param t tunnel to the remote peer
555  * @param ctn identifier of this channel in the tunnel
556  * @param port desired local port
557  * @param options options for the channel
558  * @return handle to the new channel
559  */
560 struct CadetChannel *
561 GCCH_channel_incoming_new (struct CadetTunnel *t,
562                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
563                            const struct GNUNET_HashCode *port,
564                            uint32_t options)
565 {
566   struct CadetChannel *ch;
567   struct CadetClient *c;
568
569   ch = GNUNET_new (struct CadetChannel);
570   ch->port = *port;
571   ch->t = t;
572   ch->ctn = ctn;
573   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
574   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
575   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
576   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
577   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
578   GNUNET_STATISTICS_update (stats,
579                             "# channels",
580                             1,
581                             GNUNET_NO);
582
583   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
584                                          port);
585   if (NULL == c)
586   {
587     /* port closed, wait for it to possibly open */
588     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
589                                               port,
590                                               ch,
591                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
592     ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
593                                                    &timeout_closed_cb,
594                                                    ch);
595     LOG (GNUNET_ERROR_TYPE_DEBUG,
596          "Created loose incoming channel to port %s from peer %s\n",
597          GNUNET_h2s (&ch->port),
598          GCP_2s (GCT_get_destination (ch->t)));
599   }
600   else
601   {
602     GCCH_bind (ch,
603                c);
604   }
605   GNUNET_STATISTICS_update (stats,
606                             "# channels",
607                             1,
608                             GNUNET_NO);
609   return ch;
610 }
611
612
613 /**
614  * Function called once the tunnel confirms that we sent the
615  * ACK message.  Just remembers it was sent, we do not expect
616  * ACKs for ACKs ;-).
617  *
618  * @param cls our `struct CadetChannel`.
619  */
620 static void
621 send_ack_cb (void *cls)
622 {
623   struct CadetChannel *ch = cls;
624
625   ch->last_control_qe = NULL;
626 }
627
628
629 /**
630  * Compute and send the current ACK to the other peer.
631  *
632  * @param ch channel to send the ACK for
633  */
634 static void
635 send_channel_data_ack (struct CadetChannel *ch)
636 {
637   struct GNUNET_CADET_ChannelDataAckMessage msg;
638
639   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
640   msg.header.size = htons (sizeof (msg));
641   msg.ctn = ch->ctn;
642   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
643   msg.futures = GNUNET_htonll (ch->mid_futures);
644   if (NULL != ch->last_control_qe)
645     GCT_send_cancel (ch->last_control_qe);
646   ch->last_control_qe = GCT_send (ch->t,
647                                   &msg.header,
648                                   &send_ack_cb,
649                                   ch);
650 }
651
652
653 /**
654  * Send our initial ACK to the client confirming that the
655  * connection is up.
656  *
657  * @param cls the `struct CadetChannel`
658  */
659 static void
660 send_connect_ack (void *cls)
661 {
662   struct CadetChannel *ch = cls;
663
664   ch->retry_task = NULL;
665   send_channel_data_ack (ch);
666 }
667
668
669 /**
670  * Send a LOCAL ACK to the client to solicit more messages.
671  *
672  * @param ch channel the ack is for
673  * @param c client to send the ACK to
674  */
675 static void
676 send_ack_to_client (struct CadetChannel *ch,
677                     struct CadetClient *c)
678 {
679   struct GNUNET_MQ_Envelope *env;
680   struct GNUNET_CADET_LocalAck *ack;
681
682   env = GNUNET_MQ_msg (ack,
683                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
684   ack->ccn = ch->ccn;
685   GSC_send_to_client (c,
686                       env);
687 }
688
689
690 /**
691  * A client is bound to the port that we have a channel
692  * open to.  Send the acknowledgement for the connection
693  * request and establish the link with the client.
694  *
695  * @param ch open incoming channel
696  * @param c client listening on the respective port
697  */
698 void
699 GCCH_bind (struct CadetChannel *ch,
700            struct CadetClient *c)
701 {
702   struct GNUNET_MQ_Envelope *env;
703   struct GNUNET_CADET_LocalChannelCreateMessage *tcm;
704   uint32_t options;
705
706   LOG (GNUNET_ERROR_TYPE_DEBUG,
707        "Binding channel %s from tunnel %s to port %s of client %s\n",
708        GCCH_2s (ch),
709        GCT_2s (ch->t),
710        GNUNET_h2s (&ch->port),
711        GSC_2s (c));
712   if (NULL != ch->retry_task)
713   {
714     /* there might be a timeout task here */
715     GNUNET_SCHEDULER_cancel (ch->retry_task);
716     ch->retry_task = NULL;
717   }
718   options = 0;
719   if (ch->nobuffer)
720     options |= GNUNET_CADET_OPTION_NOBUFFER;
721   if (ch->reliable)
722     options |= GNUNET_CADET_OPTION_RELIABLE;
723   if (ch->out_of_order)
724     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
725   ch->dest = c;
726   ch->ccn = GSC_bind (c,
727                       ch,
728                       GCT_get_destination (ch->t),
729                       &ch->port,
730                       options);
731   ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
732
733   /* notify other peer that we accepted the connection */
734   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
735                                              ch);
736   /* give client it's initial supply of ACKs */
737   env = GNUNET_MQ_msg (tcm,
738                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
739   tcm->ccn = ch->ccn;
740   tcm->peer = *GCP_get_id (GCT_get_destination (ch->t));
741   tcm->port = ch->port;
742   tcm->opt = htonl (options);
743   GSC_send_to_client (ch->dest,
744                       env);
745   for (unsigned int i=0;i<ch->max_pending_messages;i++)
746     send_ack_to_client (ch,
747                         ch->owner);
748 }
749
750
751 /**
752  * Destroy locally created channel.  Called by the
753  * local client, so no need to tell the client.
754  *
755  * @param ch channel to destroy
756  */
757 void
758 GCCH_channel_local_destroy (struct CadetChannel *ch)
759 {
760   if (GNUNET_YES == ch->destroy)
761   {
762     /* other end already destroyed, with the local client gone, no need
763        to finish transmissions, just destroy immediately. */
764     channel_destroy (ch);
765     return;
766   }
767   if (NULL != ch->head_sent)
768   {
769     /* allow send queue to train first */
770     ch->destroy = GNUNET_YES;
771     return;
772   }
773   /* Nothing left to do, just finish destruction */
774   GCT_send_channel_destroy (ch->t,
775                             ch->ctn);
776   channel_destroy (ch);
777 }
778
779
780 /**
781  * Destroy channel that was incoming.  Called by the
782  * local client, so no need to tell the client.
783  *
784  * @param ch channel to destroy
785  */
786 void
787 GCCH_channel_incoming_destroy (struct CadetChannel *ch)
788 {
789   if (GNUNET_YES == ch->destroy)
790   {
791     /* other end already destroyed, with the remote client gone, no need
792        to finish transmissions, just destroy immediately. */
793     channel_destroy (ch);
794     return;
795   }
796   if (NULL != ch->head_recv)
797   {
798     /* allow local client to see all data first */
799     ch->destroy = GNUNET_YES;
800     return;
801   }
802   /* Nothing left to do, just finish destruction */
803   GCT_send_channel_destroy (ch->t,
804                             ch->ctn);
805   channel_destroy (ch);
806 }
807
808
809 /**
810  * We got an acknowledgement for the creation of the channel
811  * (the port is open on the other side). Begin transmissions.
812  *
813  * @param ch channel to destroy
814  */
815 void
816 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
817 {
818   switch (ch->state)
819   {
820   case CADET_CHANNEL_NEW:
821     /* this should be impossible */
822     GNUNET_break (0);
823     break;
824   case CADET_CHANNEL_CREATE_SENT:
825     if (NULL == ch->owner)
826     {
827       /* We're not the owner, wrong direction! */
828       GNUNET_break_op (0);
829       return;
830     }
831     LOG (GNUNET_ERROR_TYPE_DEBUG,
832          "Received channel OPEN_ACK for waiting channel %s, entering READY state\n",
833          GCCH_2s (ch));
834     GNUNET_SCHEDULER_cancel (ch->retry_task);
835     ch->retry_task = NULL;
836     ch->state = CADET_CHANNEL_READY;
837     /* On first connect, send client as many ACKs as we allow messages
838        to be buffered! */
839     for (unsigned int i=0;i<ch->max_pending_messages;i++)
840       send_ack_to_client (ch,
841                           ch->owner);
842     break;
843   case CADET_CHANNEL_READY:
844     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
845     LOG (GNUNET_ERROR_TYPE_DEBUG,
846          "Received duplicate channel OPEN_ACK for channel %s\n",
847          GCCH_2s (ch));
848     GNUNET_STATISTICS_update (stats,
849                               "# duplicate CREATE_ACKs",
850                               1,
851                               GNUNET_NO);
852     break;
853   }
854 }
855
856
857 /**
858  * Test if element @a e1 comes before element @a e2.
859  *
860  * TODO: use opportunity to create generic list insertion sort
861  * logic in container!
862  *
863  * @param cls closure, our `struct CadetChannel`
864  * @param e1 an element of to sort
865  * @param e2 another element to sort
866  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
867  */
868 static int
869 is_before (void *cls,
870            void *e1,
871            void *e2)
872 {
873   struct CadetOutOfOrderMessage *m1 = e1;
874   struct CadetOutOfOrderMessage *m2 = e2;
875   uint32_t v1 = ntohl (m1->mid.mid);
876   uint32_t v2 = ntohl (m2->mid.mid);
877   uint32_t delta;
878
879   delta = v1 - v2;
880   if (delta > (uint32_t) INT_MAX)
881   {
882     /* in overflow range, we can safely assume we wrapped around */
883     return GNUNET_NO;
884   }
885   else
886   {
887     return GNUNET_YES;
888   }
889 }
890
891
892 /**
893  * We got payload data for a channel.  Pass it on to the client
894  * and send an ACK to the other end (once flow control allows it!)
895  *
896  * @param ch channel that got data
897  */
898 void
899 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
900                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
901 {
902   struct GNUNET_MQ_Envelope *env;
903   struct GNUNET_CADET_LocalData *ld;
904   struct CadetOutOfOrderMessage *com;
905   size_t payload_size;
906
907   payload_size = ntohs (msg->header.size) - sizeof (*msg);
908   LOG (GNUNET_ERROR_TYPE_DEBUG,
909        "Receicved %u bytes of application data on channel %s\n",
910        (unsigned int) payload_size,
911        GCCH_2s (ch));
912   env = GNUNET_MQ_msg_extra (ld,
913                              payload_size,
914                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
915   ld->ccn = ch->ccn;
916   GNUNET_memcpy (&ld[1],
917                  &msg[1],
918                  payload_size);
919   if ( (GNUNET_YES == ch->client_ready) &&
920        ( (GNUNET_YES == ch->out_of_order) ||
921          (msg->mid.mid == ch->mid_recv.mid) ) )
922   {
923     GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
924                         env);
925     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
926     ch->mid_futures >>= 1;
927   }
928   else
929   {
930     /* FIXME-SECURITY: if the element is WAY too far ahead,
931        drop it (can't buffer too much!) */
932     com = GNUNET_new (struct CadetOutOfOrderMessage);
933     com->mid = msg->mid;
934     com->env = env;
935     /* sort into list ordered by "is_before" */
936     if ( (NULL == ch->head_recv) ||
937          (GNUNET_YES == is_before (ch,
938                                    com,
939                                    ch->head_recv)) )
940     {
941       GNUNET_CONTAINER_DLL_insert (ch->head_recv,
942                                    ch->tail_recv,
943                                    com);
944     }
945     else
946     {
947       struct CadetOutOfOrderMessage *pos;
948
949       for (pos = ch->head_recv;
950            NULL != pos;
951            pos = pos->next)
952       {
953         if (GNUNET_YES !=
954             is_before (ch,
955                        pos,
956                        com))
957           break;
958       }
959       if (NULL == pos)
960         GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
961                                           ch->tail_recv,
962                                           com);
963       else
964         GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
965                                            ch->tail_recv,
966                                            com,
967                                            pos->prev);
968     }
969   }
970 }
971
972
973 /**
974  * We got an acknowledgement for payload data for a channel.
975  * Possibly resume transmissions.
976  *
977  * @param ch channel that got the ack
978  * @param ack details about what was received
979  */
980 void
981 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
982                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
983 {
984   struct CadetReliableMessage *crm;
985
986   if (GNUNET_NO == ch->reliable)
987   {
988     /* not expecting ACKs on unreliable channel, odd */
989     GNUNET_break_op (0);
990     return;
991   }
992   for (crm = ch->head_sent;
993         NULL != crm;
994        crm = crm->next)
995     if (ack->mid.mid == crm->data_message.mid.mid)
996       break;
997   if (NULL == crm)
998   {
999     /* ACK for message we already dropped, might have been a
1000        duplicate ACK? Ignore. */
1001     GNUNET_STATISTICS_update (stats,
1002                               "# duplicate DATA_ACKs",
1003                               1,
1004                               GNUNET_NO);
1005     return;
1006   }
1007   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1008                                ch->tail_sent,
1009                                crm);
1010   ch->pending_messages--;
1011   GNUNET_free (crm);
1012   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1013   send_ack_to_client (ch,
1014                       (NULL == ch->owner) ? ch->dest : ch->owner);
1015 }
1016
1017
1018 /**
1019  * Destroy channel, based on the other peer closing the
1020  * connection.  Also needs to remove this channel from
1021  * the tunnel.
1022  *
1023  * @param ch channel to destroy
1024  */
1025 void
1026 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1027 {
1028   struct GNUNET_MQ_Envelope *env;
1029   struct GNUNET_CADET_LocalChannelDestroyMessage *tdm;
1030
1031   LOG (GNUNET_ERROR_TYPE_DEBUG,
1032        "Received remote channel DESTROY for channel %s\n",
1033        GCCH_2s (ch));
1034   ch->destroy = GNUNET_YES;
1035   env = GNUNET_MQ_msg (tdm,
1036                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
1037   tdm->ccn = ch->ccn;
1038   GSC_send_to_client ((NULL != ch->owner) ? ch->owner : ch->dest,
1039                       env);
1040   channel_destroy (ch);
1041 }
1042
1043
1044 /**
1045  * Function called once the tunnel has sent one of our messages.
1046  * If the message is unreliable, simply frees the `crm`. If the
1047  * message was reliable, calculate retransmission time and
1048  * wait for ACK (or retransmit).
1049  *
1050  * @param cls the `struct CadetReliableMessage` that was sent
1051  */
1052 static void
1053 data_sent_cb (void *cls);
1054
1055
1056 /**
1057  * We need to retry a transmission, the last one took too long to
1058  * be acknowledged.
1059  *
1060  * @param cls the `struct CadetChannel` where we need to retransmit
1061  */
1062 static void
1063 retry_transmission (void *cls)
1064 {
1065   struct CadetChannel *ch = cls;
1066   struct CadetReliableMessage *crm = ch->head_sent;
1067
1068   ch->retry_task = NULL;
1069   GNUNET_assert (NULL == crm->qe);
1070   crm->qe = GCT_send (ch->t,
1071                       &crm->data_message.header,
1072                       &data_sent_cb,
1073                       crm);
1074 }
1075
1076
1077 /**
1078  * Check if we can now allow the client to transmit, and if so,
1079  * let the client know about it.
1080  *
1081  * @param ch channel to check
1082  */
1083 static void
1084 GCCH_check_allow_client (struct CadetChannel *ch)
1085 {
1086   struct GNUNET_MQ_Envelope *env;
1087   struct GNUNET_CADET_LocalAck *msg;
1088
1089   if (GNUNET_YES == ch->client_allowed)
1090     return; /* client already allowed! */
1091   if (CADET_CHANNEL_READY != ch->state)
1092   {
1093     /* destination did not yet ACK our CREATE! */
1094     LOG (GNUNET_ERROR_TYPE_DEBUG,
1095          "Channel %s not yet ready, throttling client until ACK.\n",
1096          GCCH_2s (ch));
1097     return;
1098   }
1099   if (ch->pending_messages > ch->max_pending_messages)
1100   {
1101     /* Too many messages in queue. */
1102     LOG (GNUNET_ERROR_TYPE_DEBUG,
1103          "Message queue still too long on channel %s, throttling client until ACK.\n",
1104          GCCH_2s (ch));
1105     return;
1106   }
1107   if ( (NULL != ch->head_sent) &&
1108        (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
1109   {
1110     LOG (GNUNET_ERROR_TYPE_DEBUG,
1111          "Gap in ACKs too big on channel %s, throttling client until ACK.\n",
1112          GCCH_2s (ch));
1113     return;
1114   }
1115   ch->client_allowed = GNUNET_YES;
1116
1117
1118   LOG (GNUNET_ERROR_TYPE_DEBUG,
1119        "Sending local ack to channel %s client\n",
1120        GCCH_2s (ch));
1121   env = GNUNET_MQ_msg (msg,
1122                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1123   msg->ccn = ch->ccn;
1124   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1125                       env);
1126 }
1127
1128
1129 /**
1130  * Function called once the tunnel has sent one of our messages.
1131  * If the message is unreliable, simply frees the `crm`. If the
1132  * message was reliable, calculate retransmission time and
1133  * wait for ACK (or retransmit).
1134  *
1135  * @param cls the `struct CadetReliableMessage` that was sent
1136  */
1137 static void
1138 data_sent_cb (void *cls)
1139 {
1140   struct CadetReliableMessage *crm = cls;
1141   struct CadetChannel *ch = crm->ch;
1142   struct CadetReliableMessage *off;
1143
1144   crm->qe = NULL;
1145   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1146                                ch->tail_sent,
1147                                crm);
1148   if (GNUNET_NO == ch->reliable)
1149   {
1150     GNUNET_free (crm);
1151     ch->pending_messages--;
1152     GCCH_check_allow_client (ch);
1153     return;
1154   }
1155   if (0 == crm->retry_delay.rel_value_us)
1156     crm->retry_delay = ch->expected_delay;
1157   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1158
1159   /* find position for re-insertion into the DLL */
1160   if ( (NULL == ch->head_sent) ||
1161        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1162   {
1163     /* insert at HEAD, also (re)schedule retry task! */
1164     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1165                                  ch->tail_sent,
1166                                  crm);
1167     if (NULL != ch->retry_task)
1168       GNUNET_SCHEDULER_cancel (ch->retry_task);
1169     ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1170                                                    &retry_transmission,
1171                                                    ch);
1172     return;
1173   }
1174   for (off = ch->head_sent; NULL != off; off = off->next)
1175     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1176       break;
1177   if (NULL == off)
1178   {
1179     /* insert at tail */
1180     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1181                                       ch->tail_sent,
1182                                       crm);
1183   }
1184   else
1185   {
1186     /* insert before off */
1187     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1188                                        ch->tail_sent,
1189                                        off->prev,
1190                                        crm);
1191   }
1192 }
1193
1194
1195 /**
1196  * Handle data given by a client.
1197  *
1198  * Check whether the client is allowed to send in this tunnel, save if
1199  * channel is reliable and send an ACK to the client if there is still
1200  * buffer space in the tunnel.
1201  *
1202  * @param ch Channel.
1203  * @param message payload to transmit.
1204  * @return #GNUNET_OK if everything goes well,
1205  *         #GNUNET_SYSERR in case of an error.
1206  */
1207 int
1208 GCCH_handle_local_data (struct CadetChannel *ch,
1209                         const struct GNUNET_MessageHeader *message)
1210 {
1211   uint16_t payload_size = ntohs (message->size);
1212   struct CadetReliableMessage *crm;
1213
1214   if (GNUNET_NO == ch->client_allowed)
1215   {
1216     GNUNET_break_op (0);
1217     return GNUNET_SYSERR;
1218   }
1219   ch->client_allowed = GNUNET_NO;
1220   ch->pending_messages++;
1221
1222   /* Everything is correct, send the message. */
1223   crm = GNUNET_malloc (sizeof (*crm) + payload_size);
1224   crm->ch = ch;
1225   crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size);
1226   crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1227   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1228   crm->data_message.mid = ch->mid_send;
1229   crm->data_message.ctn = ch->ctn;
1230   GNUNET_memcpy (&crm[1],
1231                  message,
1232                  payload_size);
1233   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1234                                ch->tail_sent,
1235                                crm);
1236   LOG (GNUNET_ERROR_TYPE_DEBUG,
1237        "Sending %u bytes from local client to channel %s\n",
1238        payload_size,
1239        GCCH_2s (ch));
1240   crm->qe = GCT_send (ch->t,
1241                       &crm->data_message.header,
1242                       &data_sent_cb,
1243                       crm);
1244   GCCH_check_allow_client (ch);
1245   return GNUNET_OK;
1246 }
1247
1248
1249 /**
1250  * Try to deliver messages to the local client, if it is ready for more.
1251  *
1252  * @param ch channel to process
1253  */
1254 static void
1255 send_client_buffered_data (struct CadetChannel *ch)
1256 {
1257   struct CadetOutOfOrderMessage *com;
1258
1259   if (GNUNET_NO == ch->client_ready)
1260     return; /* client not ready */
1261   com = ch->head_recv;
1262   if (NULL == com)
1263     return; /* none pending */
1264   if ( (com->mid.mid != ch->mid_recv.mid) &&
1265        (GNUNET_NO == ch->out_of_order) )
1266     return; /* missing next one in-order */
1267
1268   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269               "Passing payload message to client on channel %s\n",
1270               GCCH_2s (ch));
1271
1272   /* all good, pass next message to client */
1273   GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1274                                ch->tail_recv,
1275                                com);
1276   /* FIXME: if unreliable, this is not aggressive
1277      enough, as it would be OK to have lost some! */
1278   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1279   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1280   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1281                       com->env);
1282   GNUNET_free (com);
1283   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1284        (GNUNET_YES == ch->reliable) )
1285   {
1286     /* The next 15 messages were also already received (0xFF), this
1287        suggests that the sender may be blocked on flow control
1288        urgently waiting for an ACK from us. (As we have an inherent
1289        maximum of 64 bits, and 15 is getting too close for comfort.)
1290        So we should send one now. */
1291     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1292                 "Sender on channel %s likely blocked on flow-control, sending ACK now.\n",
1293                 GCCH_2s (ch));
1294     if (GNUNET_YES == ch->reliable)
1295       send_channel_data_ack (ch);
1296   }
1297
1298   if (NULL != ch->head_recv)
1299     return;
1300   if (GNUNET_NO == ch->destroy)
1301     return;
1302   GCT_send_channel_destroy (ch->t,
1303                             ch->ctn);
1304   channel_destroy (ch);
1305 }
1306
1307
1308 /**
1309  * Handle ACK from client on local channel.
1310  *
1311  * @param ch channel to destroy
1312  */
1313 void
1314 GCCH_handle_local_ack (struct CadetChannel *ch)
1315 {
1316   ch->client_ready = GNUNET_YES;
1317   send_client_buffered_data (ch);
1318 }
1319
1320
1321 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1322
1323
1324 /**
1325  * Log channel info.
1326  *
1327  * @param ch Channel.
1328  * @param level Debug level to use.
1329  */
1330 void
1331 GCCH_debug (struct CadetChannel *ch,
1332             enum GNUNET_ErrorType level)
1333 {
1334   int do_log;
1335
1336   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1337                                        "cadet-chn",
1338                                        __FILE__, __FUNCTION__, __LINE__);
1339   if (0 == do_log)
1340     return;
1341
1342   if (NULL == ch)
1343   {
1344     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1345     return;
1346   }
1347   LOG2 (level,
1348         "CHN Channel %s:%X (%p)\n",
1349         GCT_2s (ch->t),
1350         ch->ctn,
1351         ch);
1352   if (NULL != ch->owner)
1353   {
1354     LOG2 (level,
1355           "CHN origin %s ready %s local-id: %u\n",
1356           GSC_2s (ch->owner),
1357           ch->client_ready ? "YES" : "NO",
1358           ntohl (ch->ccn.channel_of_client));
1359   }
1360   if (NULL != ch->dest)
1361   {
1362     LOG2 (level,
1363           "CHN destination %s ready %s local-id: %u\n",
1364           GSC_2s (ch->dest),
1365           ch->client_ready ? "YES" : "NO",
1366           ntohl (ch->ccn.channel_of_client));
1367   }
1368   LOG2 (level,
1369         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1370         ntohl (ch->mid_recv.mid),
1371         (unsigned long long) ch->mid_futures,
1372         ntohl (ch->mid_send.mid));
1373 }
1374
1375
1376
1377 /* end of gnunet-service-cadet-new_channel.c */