fixing misc cleanup issues
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - introduce shutdown so we can have half-closed channels, modify
29  *   destroy to include MID to have FIN-ACK equivalents, etc.
30  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31  * - check that '0xFFULL' really is sufficient for flow control!
32  * - revisit handling of 'unreliable' traffic!
33  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34  * - figure out flow control without ACKs (unreliable traffic!)
35  */
36 #include "platform.h"
37 #include "gnunet_util_lib.h"
38 #include "cadet.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
46
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
48
49 /**
50  * How long do we initially wait before retransmitting?
51  */
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
53
54 /**
55  * How long do we wait before dropping state about incoming
56  * connection to closed port?
57  */
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
59
60
61 /**
62  * All the states a connection can be in.
63  */
64 enum CadetChannelState
65 {
66   /**
67    * Uninitialized status, should never appear in operation.
68    */
69   CADET_CHANNEL_NEW,
70
71   /**
72    * Connection create message sent, waiting for ACK.
73    */
74   CADET_CHANNEL_OPEN_SENT,
75
76   /**
77    * Connection confirmed, ready to carry traffic.
78    */
79   CADET_CHANNEL_READY
80 };
81
82
83 /**
84  * Info needed to retry a message in case it gets lost.
85  * Note that we DO use this structure also for unreliable
86  * messages.
87  */
88 struct CadetReliableMessage
89 {
90   /**
91    * Double linked list, FIFO style
92    */
93   struct CadetReliableMessage *next;
94
95   /**
96    * Double linked list, FIFO style
97    */
98   struct CadetReliableMessage *prev;
99
100   /**
101    * Which channel is this message in?
102    */
103   struct CadetChannel *ch;
104
105   /**
106    * Entry in the tunnels queue for this message, NULL if it has left
107    * the tunnel.  Used to cancel transmission in case we receive an
108    * ACK in time.
109    */
110   struct CadetTunnelQueueEntry *qe;
111
112   /**
113    * How soon should we retry if we fail to get an ACK?
114    * Messages in the queue are sorted by this value.
115    */
116   struct GNUNET_TIME_Absolute next_retry;
117
118   /**
119    * How long do we wait for an ACK after transmission?
120    * Use for the back-off calculation.
121    */
122   struct GNUNET_TIME_Relative retry_delay;
123
124   /**
125    * Data message we are trying to send.
126    */
127   struct GNUNET_CADET_ChannelAppDataMessage data_message;
128
129   /* followed by variable-size payload */
130 };
131
132
133 /**
134  * List of received out-of-order data messages.
135  */
136 struct CadetOutOfOrderMessage
137 {
138   /**
139    * Double linked list, FIFO style
140    */
141   struct CadetOutOfOrderMessage *next;
142
143   /**
144    * Double linked list, FIFO style
145    */
146   struct CadetOutOfOrderMessage *prev;
147
148   /**
149    * ID of the message (messages up to this point needed
150    * before we give this one to the client).
151    */
152   struct ChannelMessageIdentifier mid;
153
154   /**
155    * The envelope with the payload of the out-of-order message
156    */
157   struct GNUNET_MQ_Envelope *env;
158
159 };
160
161
162 /**
163  * Struct containing all information regarding a channel to a remote client.
164  */
165 struct CadetChannel
166 {
167   /**
168    * Tunnel this channel is in.
169    */
170   struct CadetTunnel *t;
171
172   /**
173    * Client owner of the tunnel, if any.
174    * (Used if this channel represends the initiating end of the tunnel.)
175    */
176   struct CadetClient *owner;
177
178   /**
179    * Client destination of the tunnel, if any.
180    * (Used if this channel represents the listening end of the tunnel.)
181    */
182   struct CadetClient *dest;
183
184   /**
185    * Last entry in the tunnel's queue relating to control messages
186    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
187    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
188    * transmission in case we receive updated information.
189    */
190   struct CadetTunnelQueueEntry *last_control_qe;
191
192   /**
193    * Head of DLL of messages sent and not yet ACK'd.
194    */
195   struct CadetReliableMessage *head_sent;
196
197   /**
198    * Tail of DLL of messages sent and not yet ACK'd.
199    */
200   struct CadetReliableMessage *tail_sent;
201
202   /**
203    * Head of DLL of messages received out of order or while client was unready.
204    */
205   struct CadetOutOfOrderMessage *head_recv;
206
207   /**
208    * Tail DLL of messages received out of order or while client was unready.
209    */
210   struct CadetOutOfOrderMessage *tail_recv;
211
212   /**
213    * Task to resend/poll in case no ACK is received.
214    */
215   struct GNUNET_SCHEDULER_Task *retry_task;
216
217   /**
218    * Last time the channel was used
219    */
220   struct GNUNET_TIME_Absolute timestamp;
221
222   /**
223    * Destination port of the channel.
224    */
225   struct GNUNET_HashCode port;
226
227   /**
228    * Counter for exponential backoff.
229    */
230   struct GNUNET_TIME_Relative retry_time;
231
232   /**
233    * How long does it usually take to get an ACK.
234    */
235   struct GNUNET_TIME_Relative expected_delay;
236
237   /**
238    * Bitfield of already-received messages past @e mid_recv.
239    */
240   uint64_t mid_futures;
241
242   /**
243    * Next MID expected for incoming traffic.
244    */
245   struct ChannelMessageIdentifier mid_recv;
246
247   /**
248    * Next MID to use for outgoing traffic.
249    */
250   struct ChannelMessageIdentifier mid_send;
251
252   /**
253    * Total (reliable) messages pending ACK for this channel.
254    */
255   unsigned int pending_messages;
256
257   /**
258    * Maximum (reliable) messages pending ACK for this channel
259    * before we throttle the client.
260    */
261   unsigned int max_pending_messages;
262
263   /**
264    * Number identifying this channel in its tunnel.
265    */
266   struct GNUNET_CADET_ChannelTunnelNumber ctn;
267
268   /**
269    * Local tunnel number for local client owning the channel.
270    * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
271    */
272   struct GNUNET_CADET_ClientChannelNumber ccn;
273
274   /**
275    * Channel state.
276    */
277   enum CadetChannelState state;
278
279   /**
280    * Can we send data to the client?
281    */
282   int client_ready;
283
284   /**
285    * Can the client send data to us?
286    */
287   int client_allowed;
288
289   /**
290    * Is the tunnel bufferless (minimum latency)?
291    */
292   int nobuffer;
293
294   /**
295    * Is the tunnel reliable?
296    */
297   int reliable;
298
299   /**
300    * Is the tunnel out-of-order?
301    */
302   int out_of_order;
303
304   /**
305    * Flag to signal the destruction of the channel.  If this is set to
306    * #GNUNET_YES the channel will be destroyed once the queue is
307    * empty.
308    */
309   int destroy;
310
311 };
312
313
314 /**
315  * Get the static string for identification of the channel.
316  *
317  * @param ch Channel.
318  *
319  * @return Static string with the channel IDs.
320  */
321 const char *
322 GCCH_2s (const struct CadetChannel *ch)
323 {
324   static char buf[128];
325
326   GNUNET_snprintf (buf,
327                    sizeof (buf),
328                    "Channel %s:%s ctn:%X(%X)",
329                    GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
330                    GNUNET_h2s (&ch->port),
331                    ch->ctn,
332                    ntohl (ch->ccn.channel_of_client));
333   return buf;
334 }
335
336
337 /**
338  * Get the channel's public ID.
339  *
340  * @param ch Channel.
341  *
342  * @return ID used to identify the channel with the remote peer.
343  */
344 struct GNUNET_CADET_ChannelTunnelNumber
345 GCCH_get_id (const struct CadetChannel *ch)
346 {
347   return ch->ctn;
348 }
349
350
351 /**
352  * Destroy the given channel.
353  *
354  * @param ch channel to destroy
355  */
356 static void
357 channel_destroy (struct CadetChannel *ch)
358 {
359   struct CadetReliableMessage *crm;
360   struct CadetOutOfOrderMessage *com;
361
362   while (NULL != (crm = ch->head_sent))
363   {
364     GNUNET_assert (ch == crm->ch);
365     if (NULL != crm->qe)
366     {
367       GCT_send_cancel (crm->qe);
368       crm->qe = NULL;
369     }
370     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
371                                  ch->tail_sent,
372                                  crm);
373     GNUNET_free (crm);
374   }
375   while (NULL != (com = ch->head_recv))
376   {
377     GNUNET_CONTAINER_DLL_remove (ch->head_recv,
378                                  ch->tail_recv,
379                                  com);
380     GNUNET_MQ_discard (com->env);
381     GNUNET_free (com);
382   }
383   if (NULL != ch->last_control_qe)
384   {
385     GCT_send_cancel (ch->last_control_qe);
386     ch->last_control_qe = NULL;
387   }
388   if (NULL != ch->retry_task)
389   {
390     GNUNET_SCHEDULER_cancel (ch->retry_task);
391     ch->retry_task = NULL;
392   }
393   GCT_remove_channel (ch->t,
394                       ch,
395                       ch->ctn);
396   GNUNET_free (ch);
397 }
398
399
400 /**
401  * Send a channel create message.
402  *
403  * @param cls Channel for which to send.
404  */
405 static void
406 send_channel_open (void *cls);
407
408
409 /**
410  * Function called once the tunnel confirms that we sent the
411  * create message.  Delays for a bit until we retry.
412  *
413  * @param cls our `struct CadetChannel`.
414  */
415 static void
416 channel_open_sent_cb (void *cls)
417 {
418   struct CadetChannel *ch = cls;
419
420   GNUNET_assert (NULL != ch->last_control_qe);
421   ch->last_control_qe = NULL;
422   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
423   ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
424                                                  &send_channel_open,
425                                                  ch);
426 }
427
428
429 /**
430  * Send a channel open message.
431  *
432  * @param cls Channel for which to send.
433  */
434 static void
435 send_channel_open (void *cls)
436 {
437   struct CadetChannel *ch = cls;
438   struct GNUNET_CADET_ChannelOpenMessage msgcc;
439   uint32_t options;
440
441   ch->retry_task = NULL;
442   LOG (GNUNET_ERROR_TYPE_DEBUG,
443        "Sending CHANNEL_OPEN message for %s\n",
444        GCCH_2s (ch));
445   options = 0;
446   if (ch->nobuffer)
447     options |= GNUNET_CADET_OPTION_NOBUFFER;
448   if (ch->reliable)
449     options |= GNUNET_CADET_OPTION_RELIABLE;
450   if (ch->out_of_order)
451     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
452   msgcc.header.size = htons (sizeof (msgcc));
453   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
454   msgcc.opt = htonl (options);
455   msgcc.port = ch->port;
456   msgcc.ctn = ch->ctn;
457   ch->state = CADET_CHANNEL_OPEN_SENT;
458   ch->last_control_qe = GCT_send (ch->t,
459                                   &msgcc.header,
460                                   &channel_open_sent_cb,
461                                   ch);
462 }
463
464
465 /**
466  * Function called once and only once after a channel was bound
467  * to its tunnel via #GCT_add_channel() is ready for transmission.
468  * Note that this is only the case for channels that this peer
469  * initiates, as for incoming channels we assume that they are
470  * ready for transmission immediately upon receiving the open
471  * message.  Used to bootstrap the #GCT_send() process.
472  *
473  * @param ch the channel for which the tunnel is now ready
474  */
475 void
476 GCCH_tunnel_up (struct CadetChannel *ch)
477 {
478   GNUNET_assert (NULL == ch->retry_task);
479   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_channel_open,
480                                              ch);
481 }
482
483
484 /**
485  * Create a new channel.
486  *
487  * @param owner local client owning the channel
488  * @param ccn local number of this channel at the @a owner
489  * @param destination peer to which we should build the channel
490  * @param port desired port at @a destination
491  * @param options options for the channel
492  * @return handle to the new channel
493  */
494 struct CadetChannel *
495 GCCH_channel_local_new (struct CadetClient *owner,
496                         struct GNUNET_CADET_ClientChannelNumber ccn,
497                         struct CadetPeer *destination,
498                         const struct GNUNET_HashCode *port,
499                         uint32_t options)
500 {
501   struct CadetChannel *ch;
502
503   ch = GNUNET_new (struct CadetChannel);
504   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
505   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
506   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
507   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
508   ch->owner = owner;
509   ch->ccn = ccn;
510   ch->port = *port;
511   ch->t = GCP_get_tunnel (destination,
512                           GNUNET_YES);
513   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
514   ch->ctn = GCT_add_channel (ch->t,
515                              ch);
516   GNUNET_STATISTICS_update (stats,
517                             "# channels",
518                             1,
519                             GNUNET_NO);
520   LOG (GNUNET_ERROR_TYPE_DEBUG,
521        "Created channel to port %s at peer %s for client %s using tunnel %s\n",
522        GNUNET_h2s (port),
523        GCP_2s (destination),
524        GSC_2s (owner),
525        GCT_2s (ch->t));
526   return ch;
527 }
528
529
530 /**
531  * We had an incoming channel to a port that is closed.
532  * It has not been opened for a while, drop it.
533  *
534  * @param cls the channel to drop
535  */
536 static void
537 timeout_closed_cb (void *cls)
538 {
539   struct CadetChannel *ch = cls;
540
541   ch->retry_task = NULL;
542   LOG (GNUNET_ERROR_TYPE_DEBUG,
543        "Closing incoming channel to port %s from peer %s due to timeout\n",
544        GNUNET_h2s (&ch->port),
545        GCP_2s (GCT_get_destination (ch->t)));
546   channel_destroy (ch);
547 }
548
549
550 /**
551  * Create a new channel based on a request coming in over the network.
552  *
553  * @param t tunnel to the remote peer
554  * @param ctn identifier of this channel in the tunnel
555  * @param port desired local port
556  * @param options options for the channel
557  * @return handle to the new channel
558  */
559 struct CadetChannel *
560 GCCH_channel_incoming_new (struct CadetTunnel *t,
561                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
562                            const struct GNUNET_HashCode *port,
563                            uint32_t options)
564 {
565   struct CadetChannel *ch;
566   struct CadetClient *c;
567
568   ch = GNUNET_new (struct CadetChannel);
569   ch->port = *port;
570   ch->t = t;
571   ch->ctn = ctn;
572   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
573   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
574   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
575   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
576   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
577   GNUNET_STATISTICS_update (stats,
578                             "# channels",
579                             1,
580                             GNUNET_NO);
581
582   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
583                                          port);
584   if (NULL == c)
585   {
586     /* port closed, wait for it to possibly open */
587     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
588                                               port,
589                                               ch,
590                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
591     ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
592                                                    &timeout_closed_cb,
593                                                    ch);
594     LOG (GNUNET_ERROR_TYPE_DEBUG,
595          "Created loose incoming channel to port %s from peer %s\n",
596          GNUNET_h2s (&ch->port),
597          GCP_2s (GCT_get_destination (ch->t)));
598   }
599   else
600   {
601     GCCH_bind (ch,
602                c);
603   }
604   GNUNET_STATISTICS_update (stats,
605                             "# channels",
606                             1,
607                             GNUNET_NO);
608   return ch;
609 }
610
611
612 /**
613  * Function called once the tunnel confirms that we sent the
614  * ACK message.  Just remembers it was sent, we do not expect
615  * ACKs for ACKs ;-).
616  *
617  * @param cls our `struct CadetChannel`.
618  */
619 static void
620 send_ack_cb (void *cls)
621 {
622   struct CadetChannel *ch = cls;
623
624   GNUNET_assert (NULL != ch->last_control_qe);
625   ch->last_control_qe = NULL;
626 }
627
628
629 /**
630  * Compute and send the current ACK to the other peer.
631  *
632  * @param ch channel to send the ACK for
633  */
634 static void
635 send_channel_data_ack (struct CadetChannel *ch)
636 {
637   struct GNUNET_CADET_ChannelDataAckMessage msg;
638
639   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
640   msg.header.size = htons (sizeof (msg));
641   msg.ctn = ch->ctn;
642   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
643   msg.futures = GNUNET_htonll (ch->mid_futures);
644   if (NULL != ch->last_control_qe)
645     GCT_send_cancel (ch->last_control_qe);
646   ch->last_control_qe = GCT_send (ch->t,
647                                   &msg.header,
648                                   &send_ack_cb,
649                                   ch);
650 }
651
652
653 /**
654  * Send our initial ACK to the client confirming that the
655  * connection is up.
656  *
657  * @param cls the `struct CadetChannel`
658  */
659 static void
660 send_connect_ack (void *cls)
661 {
662   struct CadetChannel *ch = cls;
663
664   ch->retry_task = NULL;
665   send_channel_data_ack (ch);
666 }
667
668
669 /**
670  * Send a LOCAL ACK to the client to solicit more messages.
671  *
672  * @param ch channel the ack is for
673  * @param c client to send the ACK to
674  */
675 static void
676 send_ack_to_client (struct CadetChannel *ch,
677                     struct CadetClient *c)
678 {
679   struct GNUNET_MQ_Envelope *env;
680   struct GNUNET_CADET_LocalAck *ack;
681
682   env = GNUNET_MQ_msg (ack,
683                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
684   ack->ccn = ch->ccn;
685   GSC_send_to_client (c,
686                       env);
687 }
688
689
690 /**
691  * A client is bound to the port that we have a channel
692  * open to.  Send the acknowledgement for the connection
693  * request and establish the link with the client.
694  *
695  * @param ch open incoming channel
696  * @param c client listening on the respective port
697  */
698 void
699 GCCH_bind (struct CadetChannel *ch,
700            struct CadetClient *c)
701 {
702   struct GNUNET_MQ_Envelope *env;
703   struct GNUNET_CADET_LocalChannelCreateMessage *tcm;
704   uint32_t options;
705
706   LOG (GNUNET_ERROR_TYPE_DEBUG,
707        "Binding %s from tunnel %s to port %s of client %s\n",
708        GCCH_2s (ch),
709        GCT_2s (ch->t),
710        GNUNET_h2s (&ch->port),
711        GSC_2s (c));
712   if (NULL != ch->retry_task)
713   {
714     /* there might be a timeout task here */
715     GNUNET_SCHEDULER_cancel (ch->retry_task);
716     ch->retry_task = NULL;
717   }
718   options = 0;
719   if (ch->nobuffer)
720     options |= GNUNET_CADET_OPTION_NOBUFFER;
721   if (ch->reliable)
722     options |= GNUNET_CADET_OPTION_RELIABLE;
723   if (ch->out_of_order)
724     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
725   ch->dest = c;
726   ch->ccn = GSC_bind (c,
727                       ch,
728                       GCT_get_destination (ch->t),
729                       &ch->port,
730                       options);
731   ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
732
733   /* notify other peer that we accepted the connection */
734   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
735                                              ch);
736   /* give client it's initial supply of ACKs */
737   env = GNUNET_MQ_msg (tcm,
738                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
739   tcm->ccn = ch->ccn;
740   tcm->peer = *GCP_get_id (GCT_get_destination (ch->t));
741   tcm->port = ch->port;
742   tcm->opt = htonl (options);
743   GSC_send_to_client (ch->dest,
744                       env);
745   for (unsigned int i=0;i<ch->max_pending_messages;i++)
746     send_ack_to_client (ch,
747                         ch->dest);
748 }
749
750
751 /**
752  * Destroy locally created channel.  Called by the
753  * local client, so no need to tell the client.
754  *
755  * @param ch channel to destroy
756  */
757 void
758 GCCH_channel_local_destroy (struct CadetChannel *ch)
759 {
760   if (GNUNET_YES == ch->destroy)
761   {
762     /* other end already destroyed, with the local client gone, no need
763        to finish transmissions, just destroy immediately. */
764     channel_destroy (ch);
765     return;
766   }
767   if (NULL != ch->head_sent)
768   {
769     /* allow send queue to train first */
770     ch->destroy = GNUNET_YES;
771     return;
772   }
773   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
774   if (CADET_CHANNEL_NEW != ch->state)
775     GCT_send_channel_destroy (ch->t,
776                               ch->ctn);
777   /* Now finish our clean up */
778   channel_destroy (ch);
779 }
780
781
782 /**
783  * Destroy channel that was incoming.  Called by the
784  * local client, so no need to tell the client.
785  *
786  * @param ch channel to destroy
787  */
788 void
789 GCCH_channel_incoming_destroy (struct CadetChannel *ch)
790 {
791   if (GNUNET_YES == ch->destroy)
792   {
793     /* other end already destroyed, with the remote client gone, no need
794        to finish transmissions, just destroy immediately. */
795     channel_destroy (ch);
796     return;
797   }
798   if (NULL != ch->head_recv)
799   {
800     /* allow local client to see all data first */
801     ch->destroy = GNUNET_YES;
802     return;
803   }
804   /* Nothing left to do, just finish destruction */
805   GCT_send_channel_destroy (ch->t,
806                             ch->ctn);
807   channel_destroy (ch);
808 }
809
810
811 /**
812  * We got an acknowledgement for the creation of the channel
813  * (the port is open on the other side). Begin transmissions.
814  *
815  * @param ch channel to destroy
816  */
817 void
818 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
819 {
820   switch (ch->state)
821   {
822   case CADET_CHANNEL_NEW:
823     /* this should be impossible */
824     GNUNET_break (0);
825     break;
826   case CADET_CHANNEL_OPEN_SENT:
827     if (NULL == ch->owner)
828     {
829       /* We're not the owner, wrong direction! */
830       GNUNET_break_op (0);
831       return;
832     }
833     LOG (GNUNET_ERROR_TYPE_DEBUG,
834          "Received channel OPEN_ACK for waiting %s, entering READY state\n",
835          GCCH_2s (ch));
836     GNUNET_SCHEDULER_cancel (ch->retry_task);
837     ch->retry_task = NULL;
838     ch->state = CADET_CHANNEL_READY;
839     /* On first connect, send client as many ACKs as we allow messages
840        to be buffered! */
841     for (unsigned int i=0;i<ch->max_pending_messages;i++)
842       send_ack_to_client (ch,
843                           ch->owner);
844     break;
845   case CADET_CHANNEL_READY:
846     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
847     LOG (GNUNET_ERROR_TYPE_DEBUG,
848          "Received duplicate channel OPEN_ACK for %s\n",
849          GCCH_2s (ch));
850     GNUNET_STATISTICS_update (stats,
851                               "# duplicate CREATE_ACKs",
852                               1,
853                               GNUNET_NO);
854     break;
855   }
856 }
857
858
859 /**
860  * Test if element @a e1 comes before element @a e2.
861  *
862  * TODO: use opportunity to create generic list insertion sort
863  * logic in container!
864  *
865  * @param cls closure, our `struct CadetChannel`
866  * @param e1 an element of to sort
867  * @param e2 another element to sort
868  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
869  */
870 static int
871 is_before (void *cls,
872            void *e1,
873            void *e2)
874 {
875   struct CadetOutOfOrderMessage *m1 = e1;
876   struct CadetOutOfOrderMessage *m2 = e2;
877   uint32_t v1 = ntohl (m1->mid.mid);
878   uint32_t v2 = ntohl (m2->mid.mid);
879   uint32_t delta;
880
881   delta = v1 - v2;
882   if (delta > (uint32_t) INT_MAX)
883   {
884     /* in overflow range, we can safely assume we wrapped around */
885     return GNUNET_NO;
886   }
887   else
888   {
889     return GNUNET_YES;
890   }
891 }
892
893
894 /**
895  * We got payload data for a channel.  Pass it on to the client
896  * and send an ACK to the other end (once flow control allows it!)
897  *
898  * @param ch channel that got data
899  */
900 void
901 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
902                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
903 {
904   struct GNUNET_MQ_Envelope *env;
905   struct GNUNET_CADET_LocalData *ld;
906   struct CadetOutOfOrderMessage *com;
907   size_t payload_size;
908
909   payload_size = ntohs (msg->header.size) - sizeof (*msg);
910   LOG (GNUNET_ERROR_TYPE_DEBUG,
911        "Receicved %u bytes of application data on %s\n",
912        (unsigned int) payload_size,
913        GCCH_2s (ch));
914   env = GNUNET_MQ_msg_extra (ld,
915                              payload_size,
916                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
917   ld->ccn = ch->ccn;
918   GNUNET_memcpy (&ld[1],
919                  &msg[1],
920                  payload_size);
921   if ( (GNUNET_YES == ch->client_ready) &&
922        ( (GNUNET_YES == ch->out_of_order) ||
923          (msg->mid.mid == ch->mid_recv.mid) ) )
924   {
925     GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
926                         env);
927     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
928     ch->mid_futures >>= 1;
929   }
930   else
931   {
932     /* FIXME-SECURITY: if the element is WAY too far ahead,
933        drop it (can't buffer too much!) */
934     com = GNUNET_new (struct CadetOutOfOrderMessage);
935     com->mid = msg->mid;
936     com->env = env;
937     /* sort into list ordered by "is_before" */
938     if ( (NULL == ch->head_recv) ||
939          (GNUNET_YES == is_before (ch,
940                                    com,
941                                    ch->head_recv)) )
942     {
943       GNUNET_CONTAINER_DLL_insert (ch->head_recv,
944                                    ch->tail_recv,
945                                    com);
946     }
947     else
948     {
949       struct CadetOutOfOrderMessage *pos;
950
951       for (pos = ch->head_recv;
952            NULL != pos;
953            pos = pos->next)
954       {
955         if (GNUNET_YES !=
956             is_before (ch,
957                        pos,
958                        com))
959           break;
960       }
961       if (NULL == pos)
962         GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
963                                           ch->tail_recv,
964                                           com);
965       else
966         GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
967                                            ch->tail_recv,
968                                            com,
969                                            pos->prev);
970     }
971   }
972 }
973
974
975 /**
976  * We got an acknowledgement for payload data for a channel.
977  * Possibly resume transmissions.
978  *
979  * @param ch channel that got the ack
980  * @param ack details about what was received
981  */
982 void
983 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
984                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
985 {
986   struct CadetReliableMessage *crm;
987
988   if (GNUNET_NO == ch->reliable)
989   {
990     /* not expecting ACKs on unreliable channel, odd */
991     GNUNET_break_op (0);
992     return;
993   }
994   for (crm = ch->head_sent;
995         NULL != crm;
996        crm = crm->next)
997     if (ack->mid.mid == crm->data_message.mid.mid)
998       break;
999   if (NULL == crm)
1000   {
1001     /* ACK for message we already dropped, might have been a
1002        duplicate ACK? Ignore. */
1003     GNUNET_STATISTICS_update (stats,
1004                               "# duplicate DATA_ACKs",
1005                               1,
1006                               GNUNET_NO);
1007     return;
1008   }
1009   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1010                                ch->tail_sent,
1011                                crm);
1012   ch->pending_messages--;
1013   GNUNET_free (crm);
1014   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1015   send_ack_to_client (ch,
1016                       (NULL == ch->owner) ? ch->dest : ch->owner);
1017 }
1018
1019
1020 /**
1021  * Destroy channel, based on the other peer closing the
1022  * connection.  Also needs to remove this channel from
1023  * the tunnel.
1024  *
1025  * @param ch channel to destroy
1026  */
1027 void
1028 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1029 {
1030   LOG (GNUNET_ERROR_TYPE_DEBUG,
1031        "Received remote channel DESTROY for %s\n",
1032        GCCH_2s (ch));
1033   ch->destroy = GNUNET_YES;
1034   GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest,
1035                                      ch->ccn,
1036                                      ch);
1037   channel_destroy (ch);
1038 }
1039
1040
1041 /**
1042  * Function called once the tunnel has sent one of our messages.
1043  * If the message is unreliable, simply frees the `crm`. If the
1044  * message was reliable, calculate retransmission time and
1045  * wait for ACK (or retransmit).
1046  *
1047  * @param cls the `struct CadetReliableMessage` that was sent
1048  */
1049 static void
1050 data_sent_cb (void *cls);
1051
1052
1053 /**
1054  * We need to retry a transmission, the last one took too long to
1055  * be acknowledged.
1056  *
1057  * @param cls the `struct CadetChannel` where we need to retransmit
1058  */
1059 static void
1060 retry_transmission (void *cls)
1061 {
1062   struct CadetChannel *ch = cls;
1063   struct CadetReliableMessage *crm = ch->head_sent;
1064
1065   ch->retry_task = NULL;
1066   GNUNET_assert (NULL == crm->qe);
1067   crm->qe = GCT_send (ch->t,
1068                       &crm->data_message.header,
1069                       &data_sent_cb,
1070                       crm);
1071 }
1072
1073
1074 /**
1075  * Check if we can now allow the client to transmit, and if so,
1076  * let the client know about it.
1077  *
1078  * @param ch channel to check
1079  */
1080 static void
1081 GCCH_check_allow_client (struct CadetChannel *ch)
1082 {
1083   struct GNUNET_MQ_Envelope *env;
1084   struct GNUNET_CADET_LocalAck *msg;
1085
1086   if (GNUNET_YES == ch->client_allowed)
1087     return; /* client already allowed! */
1088   if (CADET_CHANNEL_READY != ch->state)
1089   {
1090     /* destination did not yet ACK our CREATE! */
1091     LOG (GNUNET_ERROR_TYPE_DEBUG,
1092          "%s not yet ready, throttling client until ACK.\n",
1093          GCCH_2s (ch));
1094     return;
1095   }
1096   if (ch->pending_messages > ch->max_pending_messages)
1097   {
1098     /* Too many messages in queue. */
1099     LOG (GNUNET_ERROR_TYPE_DEBUG,
1100          "Message queue still too long on %s, throttling client until ACK.\n",
1101          GCCH_2s (ch));
1102     return;
1103   }
1104   if ( (NULL != ch->head_sent) &&
1105        (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
1106   {
1107     LOG (GNUNET_ERROR_TYPE_DEBUG,
1108          "Gap in ACKs too big on %s, throttling client until ACK.\n",
1109          GCCH_2s (ch));
1110     return;
1111   }
1112   ch->client_allowed = GNUNET_YES;
1113
1114
1115   LOG (GNUNET_ERROR_TYPE_DEBUG,
1116        "Sending local ack to %s client\n",
1117        GCCH_2s (ch));
1118   env = GNUNET_MQ_msg (msg,
1119                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1120   msg->ccn = ch->ccn;
1121   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1122                       env);
1123 }
1124
1125
1126 /**
1127  * Function called once the tunnel has sent one of our messages.
1128  * If the message is unreliable, simply frees the `crm`. If the
1129  * message was reliable, calculate retransmission time and
1130  * wait for ACK (or retransmit).
1131  *
1132  * @param cls the `struct CadetReliableMessage` that was sent
1133  */
1134 static void
1135 data_sent_cb (void *cls)
1136 {
1137   struct CadetReliableMessage *crm = cls;
1138   struct CadetChannel *ch = crm->ch;
1139   struct CadetReliableMessage *off;
1140
1141   crm->qe = NULL;
1142   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1143                                ch->tail_sent,
1144                                crm);
1145   if (GNUNET_NO == ch->reliable)
1146   {
1147     GNUNET_free (crm);
1148     ch->pending_messages--;
1149     GCCH_check_allow_client (ch);
1150     return;
1151   }
1152   if (0 == crm->retry_delay.rel_value_us)
1153     crm->retry_delay = ch->expected_delay;
1154   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1155
1156   /* find position for re-insertion into the DLL */
1157   if ( (NULL == ch->head_sent) ||
1158        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1159   {
1160     /* insert at HEAD, also (re)schedule retry task! */
1161     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1162                                  ch->tail_sent,
1163                                  crm);
1164     if (NULL != ch->retry_task)
1165       GNUNET_SCHEDULER_cancel (ch->retry_task);
1166     ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1167                                                    &retry_transmission,
1168                                                    ch);
1169     return;
1170   }
1171   for (off = ch->head_sent; NULL != off; off = off->next)
1172     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1173       break;
1174   if (NULL == off)
1175   {
1176     /* insert at tail */
1177     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1178                                       ch->tail_sent,
1179                                       crm);
1180   }
1181   else
1182   {
1183     /* insert before off */
1184     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1185                                        ch->tail_sent,
1186                                        off->prev,
1187                                        crm);
1188   }
1189 }
1190
1191
1192 /**
1193  * Handle data given by a client.
1194  *
1195  * Check whether the client is allowed to send in this tunnel, save if
1196  * channel is reliable and send an ACK to the client if there is still
1197  * buffer space in the tunnel.
1198  *
1199  * @param ch Channel.
1200  * @param message payload to transmit.
1201  * @return #GNUNET_OK if everything goes well,
1202  *         #GNUNET_SYSERR in case of an error.
1203  */
1204 int
1205 GCCH_handle_local_data (struct CadetChannel *ch,
1206                         const struct GNUNET_MessageHeader *message)
1207 {
1208   uint16_t payload_size = ntohs (message->size);
1209   struct CadetReliableMessage *crm;
1210
1211   if (GNUNET_NO == ch->client_allowed)
1212   {
1213     GNUNET_break_op (0);
1214     return GNUNET_SYSERR;
1215   }
1216   ch->client_allowed = GNUNET_NO;
1217   ch->pending_messages++;
1218
1219   /* Everything is correct, send the message. */
1220   crm = GNUNET_malloc (sizeof (*crm) + payload_size);
1221   crm->ch = ch;
1222   crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size);
1223   crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1224   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1225   crm->data_message.mid = ch->mid_send;
1226   crm->data_message.ctn = ch->ctn;
1227   GNUNET_memcpy (&crm[1],
1228                  message,
1229                  payload_size);
1230   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1231                                ch->tail_sent,
1232                                crm);
1233   LOG (GNUNET_ERROR_TYPE_DEBUG,
1234        "Sending %u bytes from local client to %s\n",
1235        payload_size,
1236        GCCH_2s (ch));
1237   crm->qe = GCT_send (ch->t,
1238                       &crm->data_message.header,
1239                       &data_sent_cb,
1240                       crm);
1241   GCCH_check_allow_client (ch);
1242   return GNUNET_OK;
1243 }
1244
1245
1246 /**
1247  * Try to deliver messages to the local client, if it is ready for more.
1248  *
1249  * @param ch channel to process
1250  */
1251 static void
1252 send_client_buffered_data (struct CadetChannel *ch)
1253 {
1254   struct CadetOutOfOrderMessage *com;
1255
1256   if (GNUNET_NO == ch->client_ready)
1257     return; /* client not ready */
1258   com = ch->head_recv;
1259   if (NULL == com)
1260     return; /* none pending */
1261   if ( (com->mid.mid != ch->mid_recv.mid) &&
1262        (GNUNET_NO == ch->out_of_order) )
1263     return; /* missing next one in-order */
1264
1265   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266               "Passing payload message to client on %s\n",
1267               GCCH_2s (ch));
1268
1269   /* all good, pass next message to client */
1270   GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1271                                ch->tail_recv,
1272                                com);
1273   /* FIXME: if unreliable, this is not aggressive
1274      enough, as it would be OK to have lost some! */
1275   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1276   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1277   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1278                       com->env);
1279   GNUNET_free (com);
1280   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1281        (GNUNET_YES == ch->reliable) )
1282   {
1283     /* The next 15 messages were also already received (0xFF), this
1284        suggests that the sender may be blocked on flow control
1285        urgently waiting for an ACK from us. (As we have an inherent
1286        maximum of 64 bits, and 15 is getting too close for comfort.)
1287        So we should send one now. */
1288     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1289                 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1290                 GCCH_2s (ch));
1291     if (GNUNET_YES == ch->reliable)
1292       send_channel_data_ack (ch);
1293   }
1294
1295   if (NULL != ch->head_recv)
1296     return;
1297   if (GNUNET_NO == ch->destroy)
1298     return;
1299   GCT_send_channel_destroy (ch->t,
1300                             ch->ctn);
1301   channel_destroy (ch);
1302 }
1303
1304
1305 /**
1306  * Handle ACK from client on local channel.
1307  *
1308  * @param ch channel to destroy
1309  */
1310 void
1311 GCCH_handle_local_ack (struct CadetChannel *ch)
1312 {
1313   ch->client_ready = GNUNET_YES;
1314   send_client_buffered_data (ch);
1315 }
1316
1317
1318 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1319
1320
1321 /**
1322  * Log channel info.
1323  *
1324  * @param ch Channel.
1325  * @param level Debug level to use.
1326  */
1327 void
1328 GCCH_debug (struct CadetChannel *ch,
1329             enum GNUNET_ErrorType level)
1330 {
1331   int do_log;
1332
1333   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1334                                        "cadet-chn",
1335                                        __FILE__, __FUNCTION__, __LINE__);
1336   if (0 == do_log)
1337     return;
1338
1339   if (NULL == ch)
1340   {
1341     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1342     return;
1343   }
1344   LOG2 (level,
1345         "CHN %s:%X (%p)\n",
1346         GCT_2s (ch->t),
1347         ch->ctn,
1348         ch);
1349   if (NULL != ch->owner)
1350   {
1351     LOG2 (level,
1352           "CHN origin %s ready %s local-id: %u\n",
1353           GSC_2s (ch->owner),
1354           ch->client_ready ? "YES" : "NO",
1355           ntohl (ch->ccn.channel_of_client));
1356   }
1357   if (NULL != ch->dest)
1358   {
1359     LOG2 (level,
1360           "CHN destination %s ready %s local-id: %u\n",
1361           GSC_2s (ch->dest),
1362           ch->client_ready ? "YES" : "NO",
1363           ntohl (ch->ccn.channel_of_client));
1364   }
1365   LOG2 (level,
1366         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1367         ntohl (ch->mid_recv.mid),
1368         (unsigned long long) ch->mid_futures,
1369         ntohl (ch->mid_send.mid));
1370 }
1371
1372
1373
1374 /* end of gnunet-service-cadet-new_channel.c */