6f57538acdcc1f18bd1521788188c3ba517fcacc
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - introduce shutdown so we can have half-closed channels, modify
29  *   destroy to include MID to have FIN-ACK equivalents, etc.
30  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31  * - check that '0xFFULL' really is sufficient for flow control!
32  * - revisit handling of 'unreliable' traffic!
33  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34  * - figure out flow control without ACKs (unreliable traffic!)
35  */
36 #include "platform.h"
37 #include "gnunet_util_lib.h"
38 #include "cadet.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
46
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
48
49 /**
50  * How long do we initially wait before retransmitting?
51  */
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
53
54 /**
55  * How long do we wait before dropping state about incoming
56  * connection to closed port?
57  */
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
59
60
61 /**
62  * All the states a connection can be in.
63  */
64 enum CadetChannelState
65 {
66   /**
67    * Uninitialized status, should never appear in operation.
68    */
69   CADET_CHANNEL_NEW,
70
71   /**
72    * Connection create message sent, waiting for ACK.
73    */
74   CADET_CHANNEL_CREATE_SENT,
75
76   /**
77    * Connection confirmed, ready to carry traffic.
78    */
79   CADET_CHANNEL_READY
80 };
81
82
83 /**
84  * Info needed to retry a message in case it gets lost.
85  * Note that we DO use this structure also for unreliable
86  * messages.
87  */
88 struct CadetReliableMessage
89 {
90   /**
91    * Double linked list, FIFO style
92    */
93   struct CadetReliableMessage *next;
94
95   /**
96    * Double linked list, FIFO style
97    */
98   struct CadetReliableMessage *prev;
99
100   /**
101    * Which channel is this message in?
102    */
103   struct CadetChannel *ch;
104
105   /**
106    * Entry in the tunnels queue for this message, NULL if it has left
107    * the tunnel.  Used to cancel transmission in case we receive an
108    * ACK in time.
109    */
110   struct CadetTunnelQueueEntry *qe;
111
112   /**
113    * How soon should we retry if we fail to get an ACK?
114    * Messages in the queue are sorted by this value.
115    */
116   struct GNUNET_TIME_Absolute next_retry;
117
118   /**
119    * How long do we wait for an ACK after transmission?
120    * Use for the back-off calculation.
121    */
122   struct GNUNET_TIME_Relative retry_delay;
123
124   /**
125    * Data message we are trying to send.
126    */
127   struct GNUNET_CADET_ChannelAppDataMessage data_message;
128
129   /* followed by variable-size payload */
130 };
131
132
133 /**
134  * List of received out-of-order data messages.
135  */
136 struct CadetOutOfOrderMessage
137 {
138   /**
139    * Double linked list, FIFO style
140    */
141   struct CadetOutOfOrderMessage *next;
142
143   /**
144    * Double linked list, FIFO style
145    */
146   struct CadetOutOfOrderMessage *prev;
147
148   /**
149    * ID of the message (messages up to this point needed
150    * before we give this one to the client).
151    */
152   struct ChannelMessageIdentifier mid;
153
154   /**
155    * The envelope with the payload of the out-of-order message
156    */
157   struct GNUNET_MQ_Envelope *env;
158
159 };
160
161
162 /**
163  * Struct containing all information regarding a channel to a remote client.
164  */
165 struct CadetChannel
166 {
167   /**
168    * Tunnel this channel is in.
169    */
170   struct CadetTunnel *t;
171
172   /**
173    * Last entry in the tunnel's queue relating to control messages
174    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
175    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
176    * transmission in case we receive updated information.
177    */
178   struct CadetTunnelQueueEntry *last_control_qe;
179
180   /**
181    * Client owner of the tunnel, if any.
182    * (Used if this channel represends the initiating end of the tunnel.)
183    */
184   struct CadetClient *owner;
185
186   /**
187    * Client destination of the tunnel, if any.
188    * (Used if this channel represents the listening end of the tunnel.)
189    */
190   struct CadetClient *dest;
191
192   /**
193    * Head of DLL of messages sent and not yet ACK'd.
194    */
195   struct CadetReliableMessage *head_sent;
196
197   /**
198    * Tail of DLL of messages sent and not yet ACK'd.
199    */
200   struct CadetReliableMessage *tail_sent;
201
202   /**
203    * Head of DLL of messages received out of order or while client was unready.
204    */
205   struct CadetOutOfOrderMessage *head_recv;
206
207   /**
208    * Tail DLL of messages received out of order or while client was unready.
209    */
210   struct CadetOutOfOrderMessage *tail_recv;
211
212   /**
213    * Task to resend/poll in case no ACK is received.
214    */
215   struct GNUNET_SCHEDULER_Task *retry_task;
216
217   /**
218    * Last time the channel was used
219    */
220   struct GNUNET_TIME_Absolute timestamp;
221
222   /**
223    * Destination port of the channel.
224    */
225   struct GNUNET_HashCode port;
226
227   /**
228    * Counter for exponential backoff.
229    */
230   struct GNUNET_TIME_Relative retry_time;
231
232   /**
233    * How long does it usually take to get an ACK.
234    */
235   struct GNUNET_TIME_Relative expected_delay;
236
237   /**
238    * Bitfield of already-received messages past @e mid_recv.
239    */
240   uint64_t mid_futures;
241
242   /**
243    * Next MID expected for incoming traffic.
244    */
245   struct ChannelMessageIdentifier mid_recv;
246
247   /**
248    * Next MID to use for outgoing traffic.
249    */
250   struct ChannelMessageIdentifier mid_send;
251
252   /**
253    * Total (reliable) messages pending ACK for this channel.
254    */
255   unsigned int pending_messages;
256
257   /**
258    * Maximum (reliable) messages pending ACK for this channel
259    * before we throttle the client.
260    */
261   unsigned int max_pending_messages;
262
263   /**
264    * Number identifying this channel in its tunnel.
265    */
266   struct GNUNET_CADET_ChannelTunnelNumber ctn;
267
268   /**
269    * Local tunnel number for local client owning the channel.
270    * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
271    */
272   struct GNUNET_CADET_ClientChannelNumber ccn;
273
274   /**
275    * Channel state.
276    */
277   enum CadetChannelState state;
278
279   /**
280    * Can we send data to the client?
281    */
282   int client_ready;
283
284   /**
285    * Can the client send data to us?
286    */
287   int client_allowed;
288
289   /**
290    * Is the tunnel bufferless (minimum latency)?
291    */
292   int nobuffer;
293
294   /**
295    * Is the tunnel reliable?
296    */
297   int reliable;
298
299   /**
300    * Is the tunnel out-of-order?
301    */
302   int out_of_order;
303
304   /**
305    * Flag to signal the destruction of the channel.  If this is set to
306    * #GNUNET_YES the channel will be destroyed once the queue is
307    * empty.
308    */
309   int destroy;
310
311 };
312
313
314 /**
315  * Get the static string for identification of the channel.
316  *
317  * @param ch Channel.
318  *
319  * @return Static string with the channel IDs.
320  */
321 const char *
322 GCCH_2s (const struct CadetChannel *ch)
323 {
324   static char buf[128];
325
326   if (NULL == ch)
327     return "(NULL Channel)";
328   GNUNET_snprintf (buf,
329                    sizeof (buf),
330                    "%s:%s ctn:%X (%X)",
331                    GCT_2s (ch->t),
332                    GNUNET_h2s (&ch->port),
333                    ch->ctn,
334                    ntohl (ch->ccn.channel_of_client));
335   return buf;
336 }
337
338
339 /**
340  * Get the channel's public ID.
341  *
342  * @param ch Channel.
343  *
344  * @return ID used to identify the channel with the remote peer.
345  */
346 struct GNUNET_CADET_ChannelTunnelNumber
347 GCCH_get_id (const struct CadetChannel *ch)
348 {
349   return ch->ctn;
350 }
351
352
353 /**
354  * Destroy the given channel.
355  *
356  * @param ch channel to destroy
357  */
358 static void
359 channel_destroy (struct CadetChannel *ch)
360 {
361   struct CadetReliableMessage *crm;
362   struct CadetOutOfOrderMessage *com;
363
364   while (NULL != (crm = ch->head_sent))
365   {
366     GNUNET_assert (ch == crm->ch);
367     if (NULL != crm->qe)
368     {
369       GCT_send_cancel (crm->qe);
370       crm->qe = NULL;
371     }
372     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
373                                  ch->tail_sent,
374                                  crm);
375     GNUNET_free (crm);
376   }
377   while (NULL != (com = ch->head_recv))
378   {
379     GNUNET_CONTAINER_DLL_remove (ch->head_recv,
380                                  ch->tail_recv,
381                                  com);
382     GNUNET_MQ_discard (com->env);
383     GNUNET_free (com);
384   }
385   if (NULL != ch->last_control_qe)
386   {
387     GCT_send_cancel (ch->last_control_qe);
388     ch->last_control_qe = NULL;
389   }
390   if (NULL != ch->retry_task)
391   {
392     GNUNET_SCHEDULER_cancel (ch->retry_task);
393     ch->retry_task = NULL;
394   }
395   GCT_remove_channel (ch->t,
396                       ch,
397                       ch->ctn);
398   GNUNET_free (ch);
399 }
400
401
402 /**
403  * Send a channel create message.
404  *
405  * @param cls Channel for which to send.
406  */
407 static void
408 send_channel_open (void *cls);
409
410
411 /**
412  * Function called once the tunnel confirms that we sent the
413  * create message.  Delays for a bit until we retry.
414  *
415  * @param cls our `struct CadetChannel`.
416  */
417 static void
418 channel_open_sent_cb (void *cls)
419 {
420   struct CadetChannel *ch = cls;
421
422   ch->last_control_qe = NULL;
423   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
424   ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
425                                                  &send_channel_open,
426                                                  ch);
427 }
428
429
430 /**
431  * Send a channel open message.
432  *
433  * @param cls Channel for which to send.
434  */
435 static void
436 send_channel_open (void *cls)
437 {
438   struct CadetChannel *ch = cls;
439   struct GNUNET_CADET_ChannelOpenMessage msgcc;
440   uint32_t options;
441
442   ch->retry_task = NULL;
443   options = 0;
444   if (ch->nobuffer)
445     options |= GNUNET_CADET_OPTION_NOBUFFER;
446   if (ch->reliable)
447     options |= GNUNET_CADET_OPTION_RELIABLE;
448   if (ch->out_of_order)
449     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
450   msgcc.header.size = htons (sizeof (msgcc));
451   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
452   msgcc.opt = htonl (options);
453   msgcc.port = ch->port;
454   msgcc.ctn = ch->ctn;
455   ch->state = CADET_CHANNEL_CREATE_SENT;
456   ch->last_control_qe = GCT_send (ch->t,
457                                   &msgcc.header,
458                                   &channel_open_sent_cb,
459                                   ch);
460   LOG (GNUNET_ERROR_TYPE_DEBUG,
461        "Sending CHANNEL_OPEN message for channel %s\n",
462        GCCH_2s (ch));
463 }
464
465
466 /**
467  * Create a new channel.
468  *
469  * @param owner local client owning the channel
470  * @param ccn local number of this channel at the @a owner
471  * @param destination peer to which we should build the channel
472  * @param port desired port at @a destination
473  * @param options options for the channel
474  * @return handle to the new channel
475  */
476 struct CadetChannel *
477 GCCH_channel_local_new (struct CadetClient *owner,
478                         struct GNUNET_CADET_ClientChannelNumber ccn,
479                         struct CadetPeer *destination,
480                         const struct GNUNET_HashCode *port,
481                         uint32_t options)
482 {
483   struct CadetChannel *ch;
484
485   ch = GNUNET_new (struct CadetChannel);
486   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
487   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
488   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
489   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
490   ch->owner = owner;
491   ch->ccn = ccn;
492   ch->port = *port;
493   ch->t = GCP_get_tunnel (destination,
494                           GNUNET_YES);
495   ch->ctn = GCT_add_channel (ch->t,
496                               ch);
497   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
498   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_channel_open,
499                                              ch);
500   GNUNET_STATISTICS_update (stats,
501                             "# channels",
502                             1,
503                             GNUNET_NO);
504   LOG (GNUNET_ERROR_TYPE_DEBUG,
505        "Created channel to port %s at peer %s for client %s using tunnel %s\n",
506        GNUNET_h2s (port),
507        GCP_2s (destination),
508        GSC_2s (owner),
509        GCT_2s (ch->t));
510   return ch;
511 }
512
513
514 /**
515  * We had an incoming channel to a port that is closed.
516  * It has not been opened for a while, drop it.
517  *
518  * @param cls the channel to drop
519  */
520 static void
521 timeout_closed_cb (void *cls)
522 {
523   struct CadetChannel *ch = cls;
524
525   ch->retry_task = NULL;
526   LOG (GNUNET_ERROR_TYPE_DEBUG,
527        "Closing incoming channel to port %s from peer %s due to timeout\n",
528        GNUNET_h2s (&ch->port),
529        GCP_2s (GCT_get_destination (ch->t)));
530   channel_destroy (ch);
531 }
532
533
534 /**
535  * Create a new channel based on a request coming in over the network.
536  *
537  * @param t tunnel to the remote peer
538  * @param ctn identifier of this channel in the tunnel
539  * @param port desired local port
540  * @param options options for the channel
541  * @return handle to the new channel
542  */
543 struct CadetChannel *
544 GCCH_channel_incoming_new (struct CadetTunnel *t,
545                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
546                            const struct GNUNET_HashCode *port,
547                            uint32_t options)
548 {
549   struct CadetChannel *ch;
550   struct CadetClient *c;
551
552   ch = GNUNET_new (struct CadetChannel);
553   ch->port = *port;
554   ch->t = t;
555   ch->ctn = ctn;
556   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
557   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
558   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
559   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
560   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
561   GNUNET_STATISTICS_update (stats,
562                             "# channels",
563                             1,
564                             GNUNET_NO);
565
566   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
567                                          port);
568   if (NULL == c)
569   {
570     /* port closed, wait for it to possibly open */
571     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
572                                               port,
573                                               ch,
574                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
575     ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
576                                                    &timeout_closed_cb,
577                                                    ch);
578     LOG (GNUNET_ERROR_TYPE_DEBUG,
579          "Created loose incoming channel to port %s from peer %s\n",
580          GNUNET_h2s (&ch->port),
581          GCP_2s (GCT_get_destination (ch->t)));
582   }
583   else
584   {
585     GCCH_bind (ch,
586                c);
587   }
588   GNUNET_STATISTICS_update (stats,
589                             "# channels",
590                             1,
591                             GNUNET_NO);
592   return ch;
593 }
594
595
596 /**
597  * Function called once the tunnel confirms that we sent the
598  * ACK message.  Just remembers it was sent, we do not expect
599  * ACKs for ACKs ;-).
600  *
601  * @param cls our `struct CadetChannel`.
602  */
603 static void
604 send_ack_cb (void *cls)
605 {
606   struct CadetChannel *ch = cls;
607
608   ch->last_control_qe = NULL;
609 }
610
611
612 /**
613  * Compute and send the current ACK to the other peer.
614  *
615  * @param ch channel to send the ACK for
616  */
617 static void
618 send_channel_data_ack (struct CadetChannel *ch)
619 {
620   struct GNUNET_CADET_ChannelDataAckMessage msg;
621
622   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
623   msg.header.size = htons (sizeof (msg));
624   msg.ctn = ch->ctn;
625   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
626   msg.futures = GNUNET_htonll (ch->mid_futures);
627   if (NULL != ch->last_control_qe)
628     GCT_send_cancel (ch->last_control_qe);
629   ch->last_control_qe = GCT_send (ch->t,
630                                   &msg.header,
631                                   &send_ack_cb,
632                                   ch);
633 }
634
635
636 /**
637  * Send our initial ACK to the client confirming that the
638  * connection is up.
639  *
640  * @param cls the `struct CadetChannel`
641  */
642 static void
643 send_connect_ack (void *cls)
644 {
645   struct CadetChannel *ch = cls;
646
647   ch->retry_task = NULL;
648   send_channel_data_ack (ch);
649 }
650
651
652 /**
653  * Send a LOCAL ACK to the client to solicit more messages.
654  *
655  * @param ch channel the ack is for
656  * @param c client to send the ACK to
657  */
658 static void
659 send_ack_to_client (struct CadetChannel *ch,
660                     struct CadetClient *c)
661 {
662   struct GNUNET_MQ_Envelope *env;
663   struct GNUNET_CADET_LocalAck *ack;
664
665   env = GNUNET_MQ_msg (ack,
666                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
667   ack->ccn = ch->ccn;
668   GSC_send_to_client (c,
669                       env);
670 }
671
672
673 /**
674  * A client is bound to the port that we have a channel
675  * open to.  Send the acknowledgement for the connection
676  * request and establish the link with the client.
677  *
678  * @param ch open incoming channel
679  * @param c client listening on the respective port
680  */
681 void
682 GCCH_bind (struct CadetChannel *ch,
683            struct CadetClient *c)
684 {
685   struct GNUNET_MQ_Envelope *env;
686   struct GNUNET_CADET_LocalChannelCreateMessage *tcm;
687   uint32_t options;
688
689   LOG (GNUNET_ERROR_TYPE_DEBUG,
690        "Binding channel %s from tunnel %s to port %s of client %s\n",
691        GCCH_2s (ch),
692        GCT_2s (ch->t),
693        GNUNET_h2s (&ch->port),
694        GSC_2s (c));
695   if (NULL != ch->retry_task)
696   {
697     /* there might be a timeout task here */
698     GNUNET_SCHEDULER_cancel (ch->retry_task);
699     ch->retry_task = NULL;
700   }
701   options = 0;
702   if (ch->nobuffer)
703     options |= GNUNET_CADET_OPTION_NOBUFFER;
704   if (ch->reliable)
705     options |= GNUNET_CADET_OPTION_RELIABLE;
706   if (ch->out_of_order)
707     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
708   ch->dest = c;
709   ch->ccn = GSC_bind (c,
710                       ch,
711                       GCT_get_destination (ch->t),
712                       &ch->port,
713                       options);
714   ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
715
716   /* notify other peer that we accepted the connection */
717   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
718                                              ch);
719   /* give client it's initial supply of ACKs */
720   env = GNUNET_MQ_msg (tcm,
721                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
722   tcm->ccn = ch->ccn;
723   tcm->peer = *GCP_get_id (GCT_get_destination (ch->t));
724   tcm->port = ch->port;
725   tcm->opt = htonl (options);
726   GSC_send_to_client (ch->dest,
727                       env);
728   for (unsigned int i=0;i<ch->max_pending_messages;i++)
729     send_ack_to_client (ch,
730                         ch->owner);
731 }
732
733
734 /**
735  * Destroy locally created channel.  Called by the
736  * local client, so no need to tell the client.
737  *
738  * @param ch channel to destroy
739  */
740 void
741 GCCH_channel_local_destroy (struct CadetChannel *ch)
742 {
743   if (GNUNET_YES == ch->destroy)
744   {
745     /* other end already destroyed, with the local client gone, no need
746        to finish transmissions, just destroy immediately. */
747     channel_destroy (ch);
748     return;
749   }
750   if (NULL != ch->head_sent)
751   {
752     /* allow send queue to train first */
753     ch->destroy = GNUNET_YES;
754     return;
755   }
756   /* Nothing left to do, just finish destruction */
757   GCT_send_channel_destroy (ch->t,
758                             ch->ctn);
759   channel_destroy (ch);
760 }
761
762
763 /**
764  * Destroy channel that was incoming.  Called by the
765  * local client, so no need to tell the client.
766  *
767  * @param ch channel to destroy
768  */
769 void
770 GCCH_channel_incoming_destroy (struct CadetChannel *ch)
771 {
772   if (GNUNET_YES == ch->destroy)
773   {
774     /* other end already destroyed, with the remote client gone, no need
775        to finish transmissions, just destroy immediately. */
776     channel_destroy (ch);
777     return;
778   }
779   if (NULL != ch->head_recv)
780   {
781     /* allow local client to see all data first */
782     ch->destroy = GNUNET_YES;
783     return;
784   }
785   /* Nothing left to do, just finish destruction */
786   GCT_send_channel_destroy (ch->t,
787                             ch->ctn);
788   channel_destroy (ch);
789 }
790
791
792 /**
793  * We got an acknowledgement for the creation of the channel
794  * (the port is open on the other side). Begin transmissions.
795  *
796  * @param ch channel to destroy
797  */
798 void
799 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
800 {
801   switch (ch->state)
802   {
803   case CADET_CHANNEL_NEW:
804     /* this should be impossible */
805     GNUNET_break (0);
806     break;
807   case CADET_CHANNEL_CREATE_SENT:
808     if (NULL == ch->owner)
809     {
810       /* We're not the owner, wrong direction! */
811       GNUNET_break_op (0);
812       return;
813     }
814     LOG (GNUNET_ERROR_TYPE_DEBUG,
815          "Received channel OPEN_ACK for waiting channel %s, entering READY state\n",
816          GCCH_2s (ch));
817     ch->state = CADET_CHANNEL_READY;
818     /* On first connect, send client as many ACKs as we allow messages
819        to be buffered! */
820     for (unsigned int i=0;i<ch->max_pending_messages;i++)
821       send_ack_to_client (ch,
822                           ch->owner);
823     break;
824   case CADET_CHANNEL_READY:
825     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
826     LOG (GNUNET_ERROR_TYPE_DEBUG,
827          "Received duplicate channel OPEN_ACK for channel %s\n",
828          GCCH_2s (ch));
829     GNUNET_STATISTICS_update (stats,
830                               "# duplicate CREATE_ACKs",
831                               1,
832                               GNUNET_NO);
833     break;
834   }
835 }
836
837
838 /**
839  * Test if element @a e1 comes before element @a e2.
840  *
841  * TODO: use opportunity to create generic list insertion sort
842  * logic in container!
843  *
844  * @param cls closure, our `struct CadetChannel`
845  * @param e1 an element of to sort
846  * @param e2 another element to sort
847  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
848  */
849 static int
850 is_before (void *cls,
851            void *e1,
852            void *e2)
853 {
854   struct CadetOutOfOrderMessage *m1 = e1;
855   struct CadetOutOfOrderMessage *m2 = e2;
856   uint32_t v1 = ntohl (m1->mid.mid);
857   uint32_t v2 = ntohl (m2->mid.mid);
858   uint32_t delta;
859
860   delta = v1 - v2;
861   if (delta > (uint32_t) INT_MAX)
862   {
863     /* in overflow range, we can safely assume we wrapped around */
864     return GNUNET_NO;
865   }
866   else
867   {
868     return GNUNET_YES;
869   }
870 }
871
872
873 /**
874  * We got payload data for a channel.  Pass it on to the client
875  * and send an ACK to the other end (once flow control allows it!)
876  *
877  * @param ch channel that got data
878  */
879 void
880 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
881                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
882 {
883   struct GNUNET_MQ_Envelope *env;
884   struct GNUNET_CADET_LocalData *ld;
885   struct CadetOutOfOrderMessage *com;
886   size_t payload_size;
887
888   payload_size = ntohs (msg->header.size) - sizeof (*msg);
889   LOG (GNUNET_ERROR_TYPE_DEBUG,
890        "Receicved %u bytes of application data on channel %s\n",
891        (unsigned int) payload_size,
892        GCCH_2s (ch));
893   env = GNUNET_MQ_msg_extra (ld,
894                              payload_size,
895                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
896   ld->ccn = ch->ccn;
897   GNUNET_memcpy (&ld[1],
898                  &msg[1],
899                  payload_size);
900   if ( (GNUNET_YES == ch->client_ready) &&
901        ( (GNUNET_YES == ch->out_of_order) ||
902          (msg->mid.mid == ch->mid_recv.mid) ) )
903   {
904     GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
905                         env);
906     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
907     ch->mid_futures >>= 1;
908   }
909   else
910   {
911     /* FIXME-SECURITY: if the element is WAY too far ahead,
912        drop it (can't buffer too much!) */
913     com = GNUNET_new (struct CadetOutOfOrderMessage);
914     com->mid = msg->mid;
915     com->env = env;
916     /* sort into list ordered by "is_before" */
917     if ( (NULL == ch->head_recv) ||
918          (GNUNET_YES == is_before (ch,
919                                    com,
920                                    ch->head_recv)) )
921     {
922       GNUNET_CONTAINER_DLL_insert (ch->head_recv,
923                                    ch->tail_recv,
924                                    com);
925     }
926     else
927     {
928       struct CadetOutOfOrderMessage *pos;
929
930       for (pos = ch->head_recv;
931            NULL != pos;
932            pos = pos->next)
933       {
934         if (GNUNET_YES !=
935             is_before (ch,
936                        pos,
937                        com))
938           break;
939       }
940       if (NULL == pos)
941         GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
942                                           ch->tail_recv,
943                                           com);
944       else
945         GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
946                                            ch->tail_recv,
947                                            com,
948                                            pos->prev);
949     }
950   }
951 }
952
953
954 /**
955  * We got an acknowledgement for payload data for a channel.
956  * Possibly resume transmissions.
957  *
958  * @param ch channel that got the ack
959  * @param ack details about what was received
960  */
961 void
962 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
963                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
964 {
965   struct CadetReliableMessage *crm;
966
967   if (GNUNET_NO == ch->reliable)
968   {
969     /* not expecting ACKs on unreliable channel, odd */
970     GNUNET_break_op (0);
971     return;
972   }
973   for (crm = ch->head_sent;
974         NULL != crm;
975        crm = crm->next)
976     if (ack->mid.mid == crm->data_message.mid.mid)
977       break;
978   if (NULL == crm)
979   {
980     /* ACK for message we already dropped, might have been a
981        duplicate ACK? Ignore. */
982     GNUNET_STATISTICS_update (stats,
983                               "# duplicate DATA_ACKs",
984                               1,
985                               GNUNET_NO);
986     return;
987   }
988   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
989                                ch->tail_sent,
990                                crm);
991   ch->pending_messages--;
992   GNUNET_free (crm);
993   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
994   send_ack_to_client (ch,
995                       (NULL == ch->owner) ? ch->dest : ch->owner);
996 }
997
998
999 /**
1000  * Destroy channel, based on the other peer closing the
1001  * connection.  Also needs to remove this channel from
1002  * the tunnel.
1003  *
1004  * @param ch channel to destroy
1005  */
1006 void
1007 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1008 {
1009   struct GNUNET_MQ_Envelope *env;
1010   struct GNUNET_CADET_LocalChannelDestroyMessage *tdm;
1011
1012   LOG (GNUNET_ERROR_TYPE_DEBUG,
1013        "Received remote channel DESTROY for channel %s\n",
1014        GCCH_2s (ch));
1015   ch->destroy = GNUNET_YES;
1016   env = GNUNET_MQ_msg (tdm,
1017                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
1018   tdm->ccn = ch->ccn;
1019   GSC_send_to_client ((NULL != ch->owner) ? ch->owner : ch->dest,
1020                       env);
1021   channel_destroy (ch);
1022 }
1023
1024
1025 /**
1026  * Function called once the tunnel has sent one of our messages.
1027  * If the message is unreliable, simply frees the `crm`. If the
1028  * message was reliable, calculate retransmission time and
1029  * wait for ACK (or retransmit).
1030  *
1031  * @param cls the `struct CadetReliableMessage` that was sent
1032  */
1033 static void
1034 data_sent_cb (void *cls);
1035
1036
1037 /**
1038  * We need to retry a transmission, the last one took too long to
1039  * be acknowledged.
1040  *
1041  * @param cls the `struct CadetChannel` where we need to retransmit
1042  */
1043 static void
1044 retry_transmission (void *cls)
1045 {
1046   struct CadetChannel *ch = cls;
1047   struct CadetReliableMessage *crm = ch->head_sent;
1048
1049   ch->retry_task = NULL;
1050   GNUNET_assert (NULL == crm->qe);
1051   crm->qe = GCT_send (ch->t,
1052                       &crm->data_message.header,
1053                       &data_sent_cb,
1054                       crm);
1055 }
1056
1057
1058 /**
1059  * Check if we can now allow the client to transmit, and if so,
1060  * let the client know about it.
1061  *
1062  * @param ch channel to check
1063  */
1064 static void
1065 GCCH_check_allow_client (struct CadetChannel *ch)
1066 {
1067   struct GNUNET_MQ_Envelope *env;
1068   struct GNUNET_CADET_LocalAck *msg;
1069
1070   if (GNUNET_YES == ch->client_allowed)
1071     return; /* client already allowed! */
1072   if (CADET_CHANNEL_READY != ch->state)
1073   {
1074     /* destination did not yet ACK our CREATE! */
1075     LOG (GNUNET_ERROR_TYPE_DEBUG,
1076          "Channel %s not yet ready, throttling client until ACK.\n",
1077          GCCH_2s (ch));
1078     return;
1079   }
1080   if (ch->pending_messages > ch->max_pending_messages)
1081   {
1082     /* Too many messages in queue. */
1083     LOG (GNUNET_ERROR_TYPE_DEBUG,
1084          "Message queue still too long on channel %s, throttling client until ACK.\n",
1085          GCCH_2s (ch));
1086     return;
1087   }
1088   if ( (NULL != ch->head_sent) &&
1089        (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
1090   {
1091     LOG (GNUNET_ERROR_TYPE_DEBUG,
1092          "Gap in ACKs too big on channel %s, throttling client until ACK.\n",
1093          GCCH_2s (ch));
1094     return;
1095   }
1096   ch->client_allowed = GNUNET_YES;
1097
1098
1099   LOG (GNUNET_ERROR_TYPE_DEBUG,
1100        "Sending local ack to channel %s client\n",
1101        GCCH_2s (ch));
1102   env = GNUNET_MQ_msg (msg,
1103                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1104   msg->ccn = ch->ccn;
1105   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1106                       env);
1107 }
1108
1109
1110 /**
1111  * Function called once the tunnel has sent one of our messages.
1112  * If the message is unreliable, simply frees the `crm`. If the
1113  * message was reliable, calculate retransmission time and
1114  * wait for ACK (or retransmit).
1115  *
1116  * @param cls the `struct CadetReliableMessage` that was sent
1117  */
1118 static void
1119 data_sent_cb (void *cls)
1120 {
1121   struct CadetReliableMessage *crm = cls;
1122   struct CadetChannel *ch = crm->ch;
1123   struct CadetReliableMessage *off;
1124
1125   crm->qe = NULL;
1126   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1127                                ch->tail_sent,
1128                                crm);
1129   if (GNUNET_NO == ch->reliable)
1130   {
1131     GNUNET_free (crm);
1132     ch->pending_messages--;
1133     GCCH_check_allow_client (ch);
1134     return;
1135   }
1136   if (0 == crm->retry_delay.rel_value_us)
1137     crm->retry_delay = ch->expected_delay;
1138   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1139
1140   /* find position for re-insertion into the DLL */
1141   if ( (NULL == ch->head_sent) ||
1142        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1143   {
1144     /* insert at HEAD, also (re)schedule retry task! */
1145     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1146                                  ch->tail_sent,
1147                                  crm);
1148     if (NULL != ch->retry_task)
1149       GNUNET_SCHEDULER_cancel (ch->retry_task);
1150     ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1151                                                    &retry_transmission,
1152                                                    ch);
1153     return;
1154   }
1155   for (off = ch->head_sent; NULL != off; off = off->next)
1156     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1157       break;
1158   if (NULL == off)
1159   {
1160     /* insert at tail */
1161     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1162                                       ch->tail_sent,
1163                                       crm);
1164   }
1165   else
1166   {
1167     /* insert before off */
1168     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1169                                        ch->tail_sent,
1170                                        off->prev,
1171                                        crm);
1172   }
1173 }
1174
1175
1176 /**
1177  * Handle data given by a client.
1178  *
1179  * Check whether the client is allowed to send in this tunnel, save if
1180  * channel is reliable and send an ACK to the client if there is still
1181  * buffer space in the tunnel.
1182  *
1183  * @param ch Channel.
1184  * @param message payload to transmit.
1185  * @return #GNUNET_OK if everything goes well,
1186  *         #GNUNET_SYSERR in case of an error.
1187  */
1188 int
1189 GCCH_handle_local_data (struct CadetChannel *ch,
1190                         const struct GNUNET_MessageHeader *message)
1191 {
1192   uint16_t payload_size = ntohs (message->size);
1193   struct CadetReliableMessage *crm;
1194
1195   if (GNUNET_NO == ch->client_allowed)
1196   {
1197     GNUNET_break_op (0);
1198     return GNUNET_SYSERR;
1199   }
1200   ch->client_allowed = GNUNET_NO;
1201   ch->pending_messages++;
1202
1203   /* Everything is correct, send the message. */
1204   crm = GNUNET_malloc (sizeof (*crm) + payload_size);
1205   crm->ch = ch;
1206   crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size);
1207   crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1208   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1209   crm->data_message.mid = ch->mid_send;
1210   crm->data_message.ctn = ch->ctn;
1211   GNUNET_memcpy (&crm[1],
1212                  message,
1213                  payload_size);
1214   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1215                                ch->tail_sent,
1216                                crm);
1217   LOG (GNUNET_ERROR_TYPE_DEBUG,
1218        "Sending %u bytes from local client to channel %s\n",
1219        payload_size,
1220        GCCH_2s (ch));
1221   crm->qe = GCT_send (ch->t,
1222                       &crm->data_message.header,
1223                       &data_sent_cb,
1224                       crm);
1225   GCCH_check_allow_client (ch);
1226   return GNUNET_OK;
1227 }
1228
1229
1230 /**
1231  * Try to deliver messages to the local client, if it is ready for more.
1232  *
1233  * @param ch channel to process
1234  */
1235 static void
1236 send_client_buffered_data (struct CadetChannel *ch)
1237 {
1238   struct CadetOutOfOrderMessage *com;
1239
1240   if (GNUNET_NO == ch->client_ready)
1241     return; /* client not ready */
1242   com = ch->head_recv;
1243   if (NULL == com)
1244     return; /* none pending */
1245   if ( (com->mid.mid != ch->mid_recv.mid) &&
1246        (GNUNET_NO == ch->out_of_order) )
1247     return; /* missing next one in-order */
1248
1249   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1250               "Passing payload message to client on channel %s\n",
1251               GCCH_2s (ch));
1252
1253   /* all good, pass next message to client */
1254   GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1255                                ch->tail_recv,
1256                                com);
1257   /* FIXME: if unreliable, this is not aggressive
1258      enough, as it would be OK to have lost some! */
1259   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1260   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1261   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1262                       com->env);
1263   GNUNET_free (com);
1264   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1265        (GNUNET_YES == ch->reliable) )
1266   {
1267     /* The next 15 messages were also already received (0xFF), this
1268        suggests that the sender may be blocked on flow control
1269        urgently waiting for an ACK from us. (As we have an inherent
1270        maximum of 64 bits, and 15 is getting too close for comfort.)
1271        So we should send one now. */
1272     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273                 "Sender on channel %s likely blocked on flow-control, sending ACK now.\n",
1274                 GCCH_2s (ch));
1275     if (GNUNET_YES == ch->reliable)
1276       send_channel_data_ack (ch);
1277   }
1278
1279   if (NULL != ch->head_recv)
1280     return;
1281   if (GNUNET_NO == ch->destroy)
1282     return;
1283   GCT_send_channel_destroy (ch->t,
1284                             ch->ctn);
1285   channel_destroy (ch);
1286 }
1287
1288
1289 /**
1290  * Handle ACK from client on local channel.
1291  *
1292  * @param ch channel to destroy
1293  */
1294 void
1295 GCCH_handle_local_ack (struct CadetChannel *ch)
1296 {
1297   ch->client_ready = GNUNET_YES;
1298   send_client_buffered_data (ch);
1299 }
1300
1301
1302 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1303
1304
1305 /**
1306  * Log channel info.
1307  *
1308  * @param ch Channel.
1309  * @param level Debug level to use.
1310  */
1311 void
1312 GCCH_debug (struct CadetChannel *ch,
1313             enum GNUNET_ErrorType level)
1314 {
1315   int do_log;
1316
1317   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1318                                        "cadet-chn",
1319                                        __FILE__, __FUNCTION__, __LINE__);
1320   if (0 == do_log)
1321     return;
1322
1323   if (NULL == ch)
1324   {
1325     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1326     return;
1327   }
1328   LOG2 (level,
1329         "CHN Channel %s:%X (%p)\n",
1330         GCT_2s (ch->t),
1331         ch->ctn,
1332         ch);
1333   if (NULL != ch->owner)
1334   {
1335     LOG2 (level,
1336           "CHN origin %s ready %s local-id: %u\n",
1337           GSC_2s (ch->owner),
1338           ch->client_ready ? "YES" : "NO",
1339           ntohl (ch->ccn.channel_of_client));
1340   }
1341   if (NULL != ch->dest)
1342   {
1343     LOG2 (level,
1344           "CHN destination %s ready %s local-id: %u\n",
1345           GSC_2s (ch->dest),
1346           ch->client_ready ? "YES" : "NO",
1347           ntohl (ch->ccn.channel_of_client));
1348   }
1349   LOG2 (level,
1350         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1351         ntohl (ch->mid_recv.mid),
1352         (unsigned long long) ch->mid_futures,
1353         ntohl (ch->mid_send.mid));
1354 }
1355
1356
1357
1358 /* end of gnunet-service-cadet-new_channel.c */