correctly handle assignment of cid during channel open, send channel open ack, and...
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - introduce shutdown so we can have half-closed channels, modify
29  *   destroy to include MID to have FIN-ACK equivalents, etc.
30  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31  * - check that '0xFFULL' really is sufficient for flow control!
32  * - revisit handling of 'unreliable' traffic!
33  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34  * - figure out flow control without ACKs (unreliable traffic!)
35  */
36 #include "platform.h"
37 #include "gnunet_util_lib.h"
38 #include "cadet.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
46
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
48
49 /**
50  * How long do we initially wait before retransmitting?
51  */
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
53
54 /**
55  * How long do we wait before dropping state about incoming
56  * connection to closed port?
57  */
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
59
60
61 /**
62  * All the states a connection can be in.
63  */
64 enum CadetChannelState
65 {
66   /**
67    * Uninitialized status, should never appear in operation.
68    */
69   CADET_CHANNEL_NEW,
70
71   /**
72    * Connection create message sent, waiting for ACK.
73    */
74   CADET_CHANNEL_OPEN_SENT,
75
76   /**
77    * Connection confirmed, ready to carry traffic.
78    */
79   CADET_CHANNEL_READY
80 };
81
82
83 /**
84  * Info needed to retry a message in case it gets lost.
85  * Note that we DO use this structure also for unreliable
86  * messages.
87  */
88 struct CadetReliableMessage
89 {
90   /**
91    * Double linked list, FIFO style
92    */
93   struct CadetReliableMessage *next;
94
95   /**
96    * Double linked list, FIFO style
97    */
98   struct CadetReliableMessage *prev;
99
100   /**
101    * Which channel is this message in?
102    */
103   struct CadetChannel *ch;
104
105   /**
106    * Entry in the tunnels queue for this message, NULL if it has left
107    * the tunnel.  Used to cancel transmission in case we receive an
108    * ACK in time.
109    */
110   struct CadetTunnelQueueEntry *qe;
111
112   /**
113    * How soon should we retry if we fail to get an ACK?
114    * Messages in the queue are sorted by this value.
115    */
116   struct GNUNET_TIME_Absolute next_retry;
117
118   /**
119    * How long do we wait for an ACK after transmission?
120    * Use for the back-off calculation.
121    */
122   struct GNUNET_TIME_Relative retry_delay;
123
124   /**
125    * Data message we are trying to send.
126    */
127   struct GNUNET_CADET_ChannelAppDataMessage data_message;
128
129   /* followed by variable-size payload */
130 };
131
132
133 /**
134  * List of received out-of-order data messages.
135  */
136 struct CadetOutOfOrderMessage
137 {
138   /**
139    * Double linked list, FIFO style
140    */
141   struct CadetOutOfOrderMessage *next;
142
143   /**
144    * Double linked list, FIFO style
145    */
146   struct CadetOutOfOrderMessage *prev;
147
148   /**
149    * ID of the message (messages up to this point needed
150    * before we give this one to the client).
151    */
152   struct ChannelMessageIdentifier mid;
153
154   /**
155    * The envelope with the payload of the out-of-order message
156    */
157   struct GNUNET_MQ_Envelope *env;
158
159 };
160
161
162 /**
163  * Struct containing all information regarding a channel to a remote client.
164  */
165 struct CadetChannel
166 {
167   /**
168    * Tunnel this channel is in.
169    */
170   struct CadetTunnel *t;
171
172   /**
173    * Client owner of the tunnel, if any.
174    * (Used if this channel represends the initiating end of the tunnel.)
175    */
176   struct CadetClient *owner;
177
178   /**
179    * Client destination of the tunnel, if any.
180    * (Used if this channel represents the listening end of the tunnel.)
181    */
182   struct CadetClient *dest;
183
184   /**
185    * Last entry in the tunnel's queue relating to control messages
186    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
187    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
188    * transmission in case we receive updated information.
189    */
190   struct CadetTunnelQueueEntry *last_control_qe;
191
192   /**
193    * Head of DLL of messages sent and not yet ACK'd.
194    */
195   struct CadetReliableMessage *head_sent;
196
197   /**
198    * Tail of DLL of messages sent and not yet ACK'd.
199    */
200   struct CadetReliableMessage *tail_sent;
201
202   /**
203    * Head of DLL of messages received out of order or while client was unready.
204    */
205   struct CadetOutOfOrderMessage *head_recv;
206
207   /**
208    * Tail DLL of messages received out of order or while client was unready.
209    */
210   struct CadetOutOfOrderMessage *tail_recv;
211
212   /**
213    * Task to resend/poll in case no ACK is received.
214    */
215   struct GNUNET_SCHEDULER_Task *retry_control_task;
216
217     /**
218    * Task to resend/poll in case no ACK is received.
219    */
220   struct GNUNET_SCHEDULER_Task *retry_data_task;
221
222   /**
223    * Last time the channel was used
224    */
225   struct GNUNET_TIME_Absolute timestamp;
226
227   /**
228    * Destination port of the channel.
229    */
230   struct GNUNET_HashCode port;
231
232   /**
233    * Counter for exponential backoff.
234    */
235   struct GNUNET_TIME_Relative retry_time;
236
237   /**
238    * How long does it usually take to get an ACK.
239    */
240   struct GNUNET_TIME_Relative expected_delay;
241
242   /**
243    * Bitfield of already-received messages past @e mid_recv.
244    */
245   uint64_t mid_futures;
246
247   /**
248    * Next MID expected for incoming traffic.
249    */
250   struct ChannelMessageIdentifier mid_recv;
251
252   /**
253    * Next MID to use for outgoing traffic.
254    */
255   struct ChannelMessageIdentifier mid_send;
256
257   /**
258    * Total (reliable) messages pending ACK for this channel.
259    */
260   unsigned int pending_messages;
261
262   /**
263    * Maximum (reliable) messages pending ACK for this channel
264    * before we throttle the client.
265    */
266   unsigned int max_pending_messages;
267
268   /**
269    * Number identifying this channel in its tunnel.
270    */
271   struct GNUNET_CADET_ChannelTunnelNumber ctn;
272
273   /**
274    * Local tunnel number for local client owning the channel.
275    * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
276    */
277   struct GNUNET_CADET_ClientChannelNumber ccn;
278
279   /**
280    * Channel state.
281    */
282   enum CadetChannelState state;
283
284   /**
285    * Can we send data to the client?
286    */
287   int client_ready;
288
289   /**
290    * Can the client send data to us?
291    */
292   int client_allowed;
293
294   /**
295    * Is the tunnel bufferless (minimum latency)?
296    */
297   int nobuffer;
298
299   /**
300    * Is the tunnel reliable?
301    */
302   int reliable;
303
304   /**
305    * Is the tunnel out-of-order?
306    */
307   int out_of_order;
308
309   /**
310    * Flag to signal the destruction of the channel.  If this is set to
311    * #GNUNET_YES the channel will be destroyed once the queue is
312    * empty.
313    */
314   int destroy;
315
316 };
317
318
319 /**
320  * Get the static string for identification of the channel.
321  *
322  * @param ch Channel.
323  *
324  * @return Static string with the channel IDs.
325  */
326 const char *
327 GCCH_2s (const struct CadetChannel *ch)
328 {
329   static char buf[128];
330
331   GNUNET_snprintf (buf,
332                    sizeof (buf),
333                    "Channel %s:%s ctn:%X(%X)",
334                    GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
335                    GNUNET_h2s (&ch->port),
336                    ch->ctn,
337                    ntohl (ch->ccn.channel_of_client));
338   return buf;
339 }
340
341
342 /**
343  * Get the channel's public ID.
344  *
345  * @param ch Channel.
346  *
347  * @return ID used to identify the channel with the remote peer.
348  */
349 struct GNUNET_CADET_ChannelTunnelNumber
350 GCCH_get_id (const struct CadetChannel *ch)
351 {
352   return ch->ctn;
353 }
354
355
356 /**
357  * Destroy the given channel.
358  *
359  * @param ch channel to destroy
360  */
361 static void
362 channel_destroy (struct CadetChannel *ch)
363 {
364   struct CadetReliableMessage *crm;
365   struct CadetOutOfOrderMessage *com;
366
367   while (NULL != (crm = ch->head_sent))
368   {
369     GNUNET_assert (ch == crm->ch);
370     if (NULL != crm->qe)
371     {
372       GCT_send_cancel (crm->qe);
373       crm->qe = NULL;
374     }
375     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
376                                  ch->tail_sent,
377                                  crm);
378     GNUNET_free (crm);
379   }
380   while (NULL != (com = ch->head_recv))
381   {
382     GNUNET_CONTAINER_DLL_remove (ch->head_recv,
383                                  ch->tail_recv,
384                                  com);
385     GNUNET_MQ_discard (com->env);
386     GNUNET_free (com);
387   }
388   if (NULL != ch->last_control_qe)
389   {
390     GCT_send_cancel (ch->last_control_qe);
391     ch->last_control_qe = NULL;
392   }
393   if (NULL != ch->retry_data_task)
394   {
395     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
396     ch->retry_data_task = NULL;
397   }
398   if (NULL != ch->retry_control_task)
399   {
400     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
401     ch->retry_control_task = NULL;
402   }
403   GCT_remove_channel (ch->t,
404                       ch,
405                       ch->ctn);
406   GNUNET_free (ch);
407 }
408
409
410 /**
411  * Send a channel create message.
412  *
413  * @param cls Channel for which to send.
414  */
415 static void
416 send_channel_open (void *cls);
417
418
419 /**
420  * Function called once the tunnel confirms that we sent the
421  * create message.  Delays for a bit until we retry.
422  *
423  * @param cls our `struct CadetChannel`.
424  */
425 static void
426 channel_open_sent_cb (void *cls)
427 {
428   struct CadetChannel *ch = cls;
429
430   GNUNET_assert (NULL != ch->last_control_qe);
431   ch->last_control_qe = NULL;
432   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
433   ch->retry_control_task
434     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
435                                     &send_channel_open,
436                                     ch);
437 }
438
439
440 /**
441  * Send a channel open message.
442  *
443  * @param cls Channel for which to send.
444  */
445 static void
446 send_channel_open (void *cls)
447 {
448   struct CadetChannel *ch = cls;
449   struct GNUNET_CADET_ChannelOpenMessage msgcc;
450   uint32_t options;
451
452   ch->retry_control_task = NULL;
453   LOG (GNUNET_ERROR_TYPE_DEBUG,
454        "Sending CHANNEL_OPEN message for %s\n",
455        GCCH_2s (ch));
456   options = 0;
457   if (ch->nobuffer)
458     options |= GNUNET_CADET_OPTION_NOBUFFER;
459   if (ch->reliable)
460     options |= GNUNET_CADET_OPTION_RELIABLE;
461   if (ch->out_of_order)
462     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
463   msgcc.header.size = htons (sizeof (msgcc));
464   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
465   msgcc.opt = htonl (options);
466   msgcc.port = ch->port;
467   msgcc.ctn = ch->ctn;
468   ch->state = CADET_CHANNEL_OPEN_SENT;
469   ch->last_control_qe = GCT_send (ch->t,
470                                   &msgcc.header,
471                                   &channel_open_sent_cb,
472                                   ch);
473 }
474
475
476 /**
477  * Function called once and only once after a channel was bound
478  * to its tunnel via #GCT_add_channel() is ready for transmission.
479  * Note that this is only the case for channels that this peer
480  * initiates, as for incoming channels we assume that they are
481  * ready for transmission immediately upon receiving the open
482  * message.  Used to bootstrap the #GCT_send() process.
483  *
484  * @param ch the channel for which the tunnel is now ready
485  */
486 void
487 GCCH_tunnel_up (struct CadetChannel *ch)
488 {
489   GNUNET_assert (NULL == ch->retry_control_task);
490   ch->retry_control_task
491     = GNUNET_SCHEDULER_add_now (&send_channel_open,
492                                 ch);
493 }
494
495
496 /**
497  * Create a new channel.
498  *
499  * @param owner local client owning the channel
500  * @param ccn local number of this channel at the @a owner
501  * @param destination peer to which we should build the channel
502  * @param port desired port at @a destination
503  * @param options options for the channel
504  * @return handle to the new channel
505  */
506 struct CadetChannel *
507 GCCH_channel_local_new (struct CadetClient *owner,
508                         struct GNUNET_CADET_ClientChannelNumber ccn,
509                         struct CadetPeer *destination,
510                         const struct GNUNET_HashCode *port,
511                         uint32_t options)
512 {
513   struct CadetChannel *ch;
514
515   ch = GNUNET_new (struct CadetChannel);
516   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
517   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
518   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
519   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
520   ch->owner = owner;
521   ch->ccn = ccn;
522   ch->port = *port;
523   ch->t = GCP_get_tunnel (destination,
524                           GNUNET_YES);
525   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
526   ch->ctn = GCT_add_channel (ch->t,
527                              ch);
528   GNUNET_STATISTICS_update (stats,
529                             "# channels",
530                             1,
531                             GNUNET_NO);
532   LOG (GNUNET_ERROR_TYPE_DEBUG,
533        "Created channel to port %s at peer %s for %s using %s\n",
534        GNUNET_h2s (port),
535        GCP_2s (destination),
536        GSC_2s (owner),
537        GCT_2s (ch->t));
538   return ch;
539 }
540
541
542 /**
543  * We had an incoming channel to a port that is closed.
544  * It has not been opened for a while, drop it.
545  *
546  * @param cls the channel to drop
547  */
548 static void
549 timeout_closed_cb (void *cls)
550 {
551   struct CadetChannel *ch = cls;
552
553   ch->retry_control_task = NULL;
554   LOG (GNUNET_ERROR_TYPE_DEBUG,
555        "Closing incoming channel to port %s from peer %s due to timeout\n",
556        GNUNET_h2s (&ch->port),
557        GCP_2s (GCT_get_destination (ch->t)));
558   channel_destroy (ch);
559 }
560
561
562 /**
563  * Create a new channel based on a request coming in over the network.
564  *
565  * @param t tunnel to the remote peer
566  * @param ctn identifier of this channel in the tunnel
567  * @param port desired local port
568  * @param options options for the channel
569  * @return handle to the new channel
570  */
571 struct CadetChannel *
572 GCCH_channel_incoming_new (struct CadetTunnel *t,
573                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
574                            const struct GNUNET_HashCode *port,
575                            uint32_t options)
576 {
577   struct CadetChannel *ch;
578   struct CadetClient *c;
579
580   ch = GNUNET_new (struct CadetChannel);
581   ch->port = *port;
582   ch->t = t;
583   ch->ctn = ctn;
584   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
585   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
586   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
587   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
588   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
589   GNUNET_STATISTICS_update (stats,
590                             "# channels",
591                             1,
592                             GNUNET_NO);
593
594   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
595                                          port);
596   if (NULL == c)
597   {
598     /* port closed, wait for it to possibly open */
599     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
600                                               port,
601                                               ch,
602                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
603     ch->retry_control_task
604       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
605                                       &timeout_closed_cb,
606                                       ch);
607     LOG (GNUNET_ERROR_TYPE_DEBUG,
608          "Created loose incoming channel to port %s from peer %s\n",
609          GNUNET_h2s (&ch->port),
610          GCP_2s (GCT_get_destination (ch->t)));
611   }
612   else
613   {
614     GCCH_bind (ch,
615                c);
616   }
617   GNUNET_STATISTICS_update (stats,
618                             "# channels",
619                             1,
620                             GNUNET_NO);
621   return ch;
622 }
623
624
625 /**
626  * Function called once the tunnel confirms that we sent the
627  * ACK message.  Just remembers it was sent, we do not expect
628  * ACKs for ACKs ;-).
629  *
630  * @param cls our `struct CadetChannel`.
631  */
632 static void
633 send_ack_cb (void *cls)
634 {
635   struct CadetChannel *ch = cls;
636
637   GNUNET_assert (NULL != ch->last_control_qe);
638   ch->last_control_qe = NULL;
639 }
640
641
642 /**
643  * Compute and send the current ACK to the other peer.
644  *
645  * @param ch channel to send the ACK for
646  */
647 static void
648 send_channel_data_ack (struct CadetChannel *ch)
649 {
650   struct GNUNET_CADET_ChannelDataAckMessage msg;
651
652   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
653   msg.header.size = htons (sizeof (msg));
654   msg.ctn = ch->ctn;
655   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
656   msg.futures = GNUNET_htonll (ch->mid_futures);
657   if (NULL != ch->last_control_qe)
658     GCT_send_cancel (ch->last_control_qe);
659   ch->last_control_qe = GCT_send (ch->t,
660                                   &msg.header,
661                                   &send_ack_cb,
662                                   ch);
663 }
664
665
666 /**
667  * Send our initial ACK to the client confirming that the
668  * connection is up.
669  *
670  * @param cls the `struct CadetChannel`
671  */
672 static void
673 send_open_ack (void *cls)
674 {
675   struct CadetChannel *ch = cls;
676   struct GNUNET_CADET_ChannelManageMessage msg;
677
678   LOG (GNUNET_ERROR_TYPE_DEBUG,
679        "Sending CHANNEL_OPEN_ACK on channel %s\n",
680        GCCH_2s (ch));
681   ch->retry_control_task = NULL;
682   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
683   msg.header.size = htons (sizeof (msg));
684   msg.reserved = htonl (0);
685   msg.ctn = ch->ctn;
686   if (NULL != ch->last_control_qe)
687     GCT_send_cancel (ch->last_control_qe);
688   ch->last_control_qe = GCT_send (ch->t,
689                                   &msg.header,
690                                   &send_ack_cb,
691                                   ch);
692 }
693
694
695 /**
696  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
697  * this channel.  If the binding was successful, (re)transmit the
698  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
699  *
700  * @param ch channel that got the duplicate open
701  */
702 void
703 GCCH_handle_duplicate_open (struct CadetChannel *ch)
704 {
705   if (NULL == ch->dest)
706   {
707     LOG (GNUNET_ERROR_TYPE_DEBUG,
708          "Ignoring duplicate channel OPEN on %s: port is closed\n",
709          GCCH_2s (ch));
710     return;
711   }
712   if (NULL != ch->retry_control_task)
713   {
714     LOG (GNUNET_ERROR_TYPE_DEBUG,
715          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
716          GCCH_2s (ch));
717     return;
718   }
719   ch->retry_control_task
720     = GNUNET_SCHEDULER_add_now (&send_open_ack,
721                                 ch);
722 }
723
724
725 /**
726  * Send a LOCAL ACK to the client to solicit more messages.
727  *
728  * @param ch channel the ack is for
729  * @param c client to send the ACK to
730  */
731 static void
732 send_ack_to_client (struct CadetChannel *ch,
733                     struct CadetClient *c)
734 {
735   struct GNUNET_MQ_Envelope *env;
736   struct GNUNET_CADET_LocalAck *ack;
737
738   env = GNUNET_MQ_msg (ack,
739                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
740   ack->ccn = ch->ccn;
741   GSC_send_to_client (c,
742                       env);
743 }
744
745
746 /**
747  * A client is bound to the port that we have a channel
748  * open to.  Send the acknowledgement for the connection
749  * request and establish the link with the client.
750  *
751  * @param ch open incoming channel
752  * @param c client listening on the respective port
753  */
754 void
755 GCCH_bind (struct CadetChannel *ch,
756            struct CadetClient *c)
757 {
758   struct GNUNET_MQ_Envelope *env;
759   struct GNUNET_CADET_LocalChannelCreateMessage *tcm;
760   uint32_t options;
761
762   LOG (GNUNET_ERROR_TYPE_DEBUG,
763        "Binding %s from %s to port %s of %s\n",
764        GCCH_2s (ch),
765        GCT_2s (ch->t),
766        GNUNET_h2s (&ch->port),
767        GSC_2s (c));
768   if (NULL != ch->retry_control_task)
769   {
770     /* there might be a timeout task here */
771     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
772     ch->retry_control_task = NULL;
773   }
774   options = 0;
775   if (ch->nobuffer)
776     options |= GNUNET_CADET_OPTION_NOBUFFER;
777   if (ch->reliable)
778     options |= GNUNET_CADET_OPTION_RELIABLE;
779   if (ch->out_of_order)
780     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
781   ch->dest = c;
782   ch->ccn = GSC_bind (c,
783                       ch,
784                       GCT_get_destination (ch->t),
785                       &ch->port,
786                       options);
787   ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
788
789   /* notify other peer that we accepted the connection */
790   ch->retry_control_task
791     = GNUNET_SCHEDULER_add_now (&send_open_ack,
792                                 ch);
793   /* give client it's initial supply of ACKs */
794   env = GNUNET_MQ_msg (tcm,
795                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
796   tcm->ccn = ch->ccn;
797   tcm->peer = *GCP_get_id (GCT_get_destination (ch->t));
798   tcm->port = ch->port;
799   tcm->opt = htonl (options);
800   GSC_send_to_client (ch->dest,
801                       env);
802   for (unsigned int i=0;i<ch->max_pending_messages;i++)
803     send_ack_to_client (ch,
804                         ch->dest);
805 }
806
807
808 /**
809  * Destroy locally created channel.  Called by the
810  * local client, so no need to tell the client.
811  *
812  * @param ch channel to destroy
813  */
814 void
815 GCCH_channel_local_destroy (struct CadetChannel *ch)
816 {
817   if (GNUNET_YES == ch->destroy)
818   {
819     /* other end already destroyed, with the local client gone, no need
820        to finish transmissions, just destroy immediately. */
821     channel_destroy (ch);
822     return;
823   }
824   if (NULL != ch->head_sent)
825   {
826     /* allow send queue to train first */
827     ch->destroy = GNUNET_YES;
828     return;
829   }
830   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
831   if (CADET_CHANNEL_NEW != ch->state)
832     GCT_send_channel_destroy (ch->t,
833                               ch->ctn);
834   /* Now finish our clean up */
835   channel_destroy (ch);
836 }
837
838
839 /**
840  * Destroy channel that was incoming.  Called by the
841  * local client, so no need to tell the client.
842  *
843  * @param ch channel to destroy
844  */
845 void
846 GCCH_channel_incoming_destroy (struct CadetChannel *ch)
847 {
848   if (GNUNET_YES == ch->destroy)
849   {
850     /* other end already destroyed, with the remote client gone, no need
851        to finish transmissions, just destroy immediately. */
852     channel_destroy (ch);
853     return;
854   }
855   if (NULL != ch->head_recv)
856   {
857     /* allow local client to see all data first */
858     ch->destroy = GNUNET_YES;
859     return;
860   }
861   /* Nothing left to do, just finish destruction */
862   GCT_send_channel_destroy (ch->t,
863                             ch->ctn);
864   channel_destroy (ch);
865 }
866
867
868 /**
869  * We got an acknowledgement for the creation of the channel
870  * (the port is open on the other side). Begin transmissions.
871  *
872  * @param ch channel to destroy
873  */
874 void
875 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
876 {
877   switch (ch->state)
878   {
879   case CADET_CHANNEL_NEW:
880     /* this should be impossible */
881     GNUNET_break (0);
882     break;
883   case CADET_CHANNEL_OPEN_SENT:
884     if (NULL == ch->owner)
885     {
886       /* We're not the owner, wrong direction! */
887       GNUNET_break_op (0);
888       return;
889     }
890     LOG (GNUNET_ERROR_TYPE_DEBUG,
891          "Received channel OPEN_ACK for waiting %s, entering READY state\n",
892          GCCH_2s (ch));
893     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
894     ch->retry_control_task = NULL;
895     ch->state = CADET_CHANNEL_READY;
896     /* On first connect, send client as many ACKs as we allow messages
897        to be buffered! */
898     for (unsigned int i=0;i<ch->max_pending_messages;i++)
899       send_ack_to_client (ch,
900                           ch->owner);
901     break;
902   case CADET_CHANNEL_READY:
903     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
904     LOG (GNUNET_ERROR_TYPE_DEBUG,
905          "Received duplicate channel OPEN_ACK for %s\n",
906          GCCH_2s (ch));
907     GNUNET_STATISTICS_update (stats,
908                               "# duplicate CREATE_ACKs",
909                               1,
910                               GNUNET_NO);
911     break;
912   }
913 }
914
915
916 /**
917  * Test if element @a e1 comes before element @a e2.
918  *
919  * TODO: use opportunity to create generic list insertion sort
920  * logic in container!
921  *
922  * @param cls closure, our `struct CadetChannel`
923  * @param e1 an element of to sort
924  * @param e2 another element to sort
925  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
926  */
927 static int
928 is_before (void *cls,
929            void *e1,
930            void *e2)
931 {
932   struct CadetOutOfOrderMessage *m1 = e1;
933   struct CadetOutOfOrderMessage *m2 = e2;
934   uint32_t v1 = ntohl (m1->mid.mid);
935   uint32_t v2 = ntohl (m2->mid.mid);
936   uint32_t delta;
937
938   delta = v1 - v2;
939   if (delta > (uint32_t) INT_MAX)
940   {
941     /* in overflow range, we can safely assume we wrapped around */
942     return GNUNET_NO;
943   }
944   else
945   {
946     return GNUNET_YES;
947   }
948 }
949
950
951 /**
952  * We got payload data for a channel.  Pass it on to the client
953  * and send an ACK to the other end (once flow control allows it!)
954  *
955  * @param ch channel that got data
956  */
957 void
958 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
959                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
960 {
961   struct GNUNET_MQ_Envelope *env;
962   struct GNUNET_CADET_LocalData *ld;
963   struct CadetOutOfOrderMessage *com;
964   size_t payload_size;
965
966   payload_size = ntohs (msg->header.size) - sizeof (*msg);
967   LOG (GNUNET_ERROR_TYPE_DEBUG,
968        "Receicved %u bytes of application data on %s\n",
969        (unsigned int) payload_size,
970        GCCH_2s (ch));
971   env = GNUNET_MQ_msg_extra (ld,
972                              payload_size,
973                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
974   ld->ccn = ch->ccn;
975   GNUNET_memcpy (&ld[1],
976                  &msg[1],
977                  payload_size);
978   if ( (GNUNET_YES == ch->client_ready) &&
979        ( (GNUNET_YES == ch->out_of_order) ||
980          (msg->mid.mid == ch->mid_recv.mid) ) )
981   {
982     GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
983                         env);
984     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
985     ch->mid_futures >>= 1;
986   }
987   else
988   {
989     /* FIXME-SECURITY: if the element is WAY too far ahead,
990        drop it (can't buffer too much!) */
991     com = GNUNET_new (struct CadetOutOfOrderMessage);
992     com->mid = msg->mid;
993     com->env = env;
994     /* sort into list ordered by "is_before" */
995     if ( (NULL == ch->head_recv) ||
996          (GNUNET_YES == is_before (ch,
997                                    com,
998                                    ch->head_recv)) )
999     {
1000       GNUNET_CONTAINER_DLL_insert (ch->head_recv,
1001                                    ch->tail_recv,
1002                                    com);
1003     }
1004     else
1005     {
1006       struct CadetOutOfOrderMessage *pos;
1007
1008       for (pos = ch->head_recv;
1009            NULL != pos;
1010            pos = pos->next)
1011       {
1012         if (GNUNET_YES !=
1013             is_before (ch,
1014                        pos,
1015                        com))
1016           break;
1017       }
1018       if (NULL == pos)
1019         GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
1020                                           ch->tail_recv,
1021                                           com);
1022       else
1023         GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
1024                                            ch->tail_recv,
1025                                            com,
1026                                            pos->prev);
1027     }
1028   }
1029 }
1030
1031
1032 /**
1033  * We got an acknowledgement for payload data for a channel.
1034  * Possibly resume transmissions.
1035  *
1036  * @param ch channel that got the ack
1037  * @param ack details about what was received
1038  */
1039 void
1040 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1041                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1042 {
1043   struct CadetReliableMessage *crm;
1044
1045   if (GNUNET_NO == ch->reliable)
1046   {
1047     /* not expecting ACKs on unreliable channel, odd */
1048     GNUNET_break_op (0);
1049     return;
1050   }
1051   for (crm = ch->head_sent;
1052         NULL != crm;
1053        crm = crm->next)
1054     if (ack->mid.mid == crm->data_message.mid.mid)
1055       break;
1056   if (NULL == crm)
1057   {
1058     /* ACK for message we already dropped, might have been a
1059        duplicate ACK? Ignore. */
1060     GNUNET_STATISTICS_update (stats,
1061                               "# duplicate DATA_ACKs",
1062                               1,
1063                               GNUNET_NO);
1064     return;
1065   }
1066   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1067                                ch->tail_sent,
1068                                crm);
1069   ch->pending_messages--;
1070   GNUNET_free (crm);
1071   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1072   send_ack_to_client (ch,
1073                       (NULL == ch->owner) ? ch->dest : ch->owner);
1074 }
1075
1076
1077 /**
1078  * Destroy channel, based on the other peer closing the
1079  * connection.  Also needs to remove this channel from
1080  * the tunnel.
1081  *
1082  * @param ch channel to destroy
1083  */
1084 void
1085 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1086 {
1087   LOG (GNUNET_ERROR_TYPE_DEBUG,
1088        "Received remote channel DESTROY for %s\n",
1089        GCCH_2s (ch));
1090   ch->destroy = GNUNET_YES;
1091   GSC_handle_remote_channel_destroy ((NULL != ch->owner) ? ch->owner : ch->dest,
1092                                      ch->ccn,
1093                                      ch);
1094   channel_destroy (ch);
1095 }
1096
1097
1098 /**
1099  * Function called once the tunnel has sent one of our messages.
1100  * If the message is unreliable, simply frees the `crm`. If the
1101  * message was reliable, calculate retransmission time and
1102  * wait for ACK (or retransmit).
1103  *
1104  * @param cls the `struct CadetReliableMessage` that was sent
1105  */
1106 static void
1107 data_sent_cb (void *cls);
1108
1109
1110 /**
1111  * We need to retry a transmission, the last one took too long to
1112  * be acknowledged.
1113  *
1114  * @param cls the `struct CadetChannel` where we need to retransmit
1115  */
1116 static void
1117 retry_transmission (void *cls)
1118 {
1119   struct CadetChannel *ch = cls;
1120   struct CadetReliableMessage *crm = ch->head_sent;
1121
1122   ch->retry_data_task = NULL;
1123   GNUNET_assert (NULL == crm->qe);
1124   crm->qe = GCT_send (ch->t,
1125                       &crm->data_message.header,
1126                       &data_sent_cb,
1127                       crm);
1128 }
1129
1130
1131 /**
1132  * Check if we can now allow the client to transmit, and if so,
1133  * let the client know about it.
1134  *
1135  * @param ch channel to check
1136  */
1137 static void
1138 GCCH_check_allow_client (struct CadetChannel *ch)
1139 {
1140   struct GNUNET_MQ_Envelope *env;
1141   struct GNUNET_CADET_LocalAck *msg;
1142
1143   if (GNUNET_YES == ch->client_allowed)
1144     return; /* client already allowed! */
1145   if (CADET_CHANNEL_READY != ch->state)
1146   {
1147     /* destination did not yet ACK our CREATE! */
1148     LOG (GNUNET_ERROR_TYPE_DEBUG,
1149          "%s not yet ready, throttling client until ACK.\n",
1150          GCCH_2s (ch));
1151     return;
1152   }
1153   if (ch->pending_messages > ch->max_pending_messages)
1154   {
1155     /* Too many messages in queue. */
1156     LOG (GNUNET_ERROR_TYPE_DEBUG,
1157          "Message queue still too long on %s, throttling client until ACK.\n",
1158          GCCH_2s (ch));
1159     return;
1160   }
1161   if ( (NULL != ch->head_sent) &&
1162        (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
1163   {
1164     LOG (GNUNET_ERROR_TYPE_DEBUG,
1165          "Gap in ACKs too big on %s, throttling client until ACK.\n",
1166          GCCH_2s (ch));
1167     return;
1168   }
1169   ch->client_allowed = GNUNET_YES;
1170
1171
1172   LOG (GNUNET_ERROR_TYPE_DEBUG,
1173        "Sending local ack to %s client\n",
1174        GCCH_2s (ch));
1175   env = GNUNET_MQ_msg (msg,
1176                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1177   msg->ccn = ch->ccn;
1178   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1179                       env);
1180 }
1181
1182
1183 /**
1184  * Function called once the tunnel has sent one of our messages.
1185  * If the message is unreliable, simply frees the `crm`. If the
1186  * message was reliable, calculate retransmission time and
1187  * wait for ACK (or retransmit).
1188  *
1189  * @param cls the `struct CadetReliableMessage` that was sent
1190  */
1191 static void
1192 data_sent_cb (void *cls)
1193 {
1194   struct CadetReliableMessage *crm = cls;
1195   struct CadetChannel *ch = crm->ch;
1196   struct CadetReliableMessage *off;
1197
1198   crm->qe = NULL;
1199   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1200                                ch->tail_sent,
1201                                crm);
1202   if (GNUNET_NO == ch->reliable)
1203   {
1204     GNUNET_free (crm);
1205     ch->pending_messages--;
1206     GCCH_check_allow_client (ch);
1207     return;
1208   }
1209   if (0 == crm->retry_delay.rel_value_us)
1210     crm->retry_delay = ch->expected_delay;
1211   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1212
1213   /* find position for re-insertion into the DLL */
1214   if ( (NULL == ch->head_sent) ||
1215        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1216   {
1217     /* insert at HEAD, also (re)schedule retry task! */
1218     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1219                                  ch->tail_sent,
1220                                  crm);
1221     if (NULL != ch->retry_data_task)
1222       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1223     ch->retry_data_task
1224       = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1225                                       &retry_transmission,
1226                                       ch);
1227     return;
1228   }
1229   for (off = ch->head_sent; NULL != off; off = off->next)
1230     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1231       break;
1232   if (NULL == off)
1233   {
1234     /* insert at tail */
1235     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1236                                       ch->tail_sent,
1237                                       crm);
1238   }
1239   else
1240   {
1241     /* insert before off */
1242     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1243                                        ch->tail_sent,
1244                                        off->prev,
1245                                        crm);
1246   }
1247 }
1248
1249
1250 /**
1251  * Handle data given by a client.
1252  *
1253  * Check whether the client is allowed to send in this tunnel, save if
1254  * channel is reliable and send an ACK to the client if there is still
1255  * buffer space in the tunnel.
1256  *
1257  * @param ch Channel.
1258  * @param message payload to transmit.
1259  * @return #GNUNET_OK if everything goes well,
1260  *         #GNUNET_SYSERR in case of an error.
1261  */
1262 int
1263 GCCH_handle_local_data (struct CadetChannel *ch,
1264                         const struct GNUNET_MessageHeader *message)
1265 {
1266   uint16_t payload_size = ntohs (message->size);
1267   struct CadetReliableMessage *crm;
1268
1269   if (GNUNET_NO == ch->client_allowed)
1270   {
1271     GNUNET_break_op (0);
1272     return GNUNET_SYSERR;
1273   }
1274   ch->client_allowed = GNUNET_NO;
1275   ch->pending_messages++;
1276
1277   /* Everything is correct, send the message. */
1278   crm = GNUNET_malloc (sizeof (*crm) + payload_size);
1279   crm->ch = ch;
1280   crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size);
1281   crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1282   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1283   crm->data_message.mid = ch->mid_send;
1284   crm->data_message.ctn = ch->ctn;
1285   GNUNET_memcpy (&crm[1],
1286                  message,
1287                  payload_size);
1288   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1289                                ch->tail_sent,
1290                                crm);
1291   LOG (GNUNET_ERROR_TYPE_DEBUG,
1292        "Sending %u bytes from local client to %s\n",
1293        payload_size,
1294        GCCH_2s (ch));
1295   crm->qe = GCT_send (ch->t,
1296                       &crm->data_message.header,
1297                       &data_sent_cb,
1298                       crm);
1299   GCCH_check_allow_client (ch);
1300   return GNUNET_OK;
1301 }
1302
1303
1304 /**
1305  * Try to deliver messages to the local client, if it is ready for more.
1306  *
1307  * @param ch channel to process
1308  */
1309 static void
1310 send_client_buffered_data (struct CadetChannel *ch)
1311 {
1312   struct CadetOutOfOrderMessage *com;
1313
1314   if (GNUNET_NO == ch->client_ready)
1315     return; /* client not ready */
1316   com = ch->head_recv;
1317   if (NULL == com)
1318     return; /* none pending */
1319   if ( (com->mid.mid != ch->mid_recv.mid) &&
1320        (GNUNET_NO == ch->out_of_order) )
1321     return; /* missing next one in-order */
1322
1323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324               "Passing payload message to client on %s\n",
1325               GCCH_2s (ch));
1326
1327   /* all good, pass next message to client */
1328   GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1329                                ch->tail_recv,
1330                                com);
1331   /* FIXME: if unreliable, this is not aggressive
1332      enough, as it would be OK to have lost some! */
1333   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1334   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1335   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1336                       com->env);
1337   GNUNET_free (com);
1338   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1339        (GNUNET_YES == ch->reliable) )
1340   {
1341     /* The next 15 messages were also already received (0xFF), this
1342        suggests that the sender may be blocked on flow control
1343        urgently waiting for an ACK from us. (As we have an inherent
1344        maximum of 64 bits, and 15 is getting too close for comfort.)
1345        So we should send one now. */
1346     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1347                 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1348                 GCCH_2s (ch));
1349     if (GNUNET_YES == ch->reliable)
1350       send_channel_data_ack (ch);
1351   }
1352
1353   if (NULL != ch->head_recv)
1354     return;
1355   if (GNUNET_NO == ch->destroy)
1356     return;
1357   GCT_send_channel_destroy (ch->t,
1358                             ch->ctn);
1359   channel_destroy (ch);
1360 }
1361
1362
1363 /**
1364  * Handle ACK from client on local channel.
1365  *
1366  * @param ch channel to destroy
1367  */
1368 void
1369 GCCH_handle_local_ack (struct CadetChannel *ch)
1370 {
1371   ch->client_ready = GNUNET_YES;
1372   send_client_buffered_data (ch);
1373 }
1374
1375
1376 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1377
1378
1379 /**
1380  * Log channel info.
1381  *
1382  * @param ch Channel.
1383  * @param level Debug level to use.
1384  */
1385 void
1386 GCCH_debug (struct CadetChannel *ch,
1387             enum GNUNET_ErrorType level)
1388 {
1389   int do_log;
1390
1391   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1392                                        "cadet-chn",
1393                                        __FILE__, __FUNCTION__, __LINE__);
1394   if (0 == do_log)
1395     return;
1396
1397   if (NULL == ch)
1398   {
1399     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1400     return;
1401   }
1402   LOG2 (level,
1403         "CHN %s:%X (%p)\n",
1404         GCT_2s (ch->t),
1405         ch->ctn,
1406         ch);
1407   if (NULL != ch->owner)
1408   {
1409     LOG2 (level,
1410           "CHN origin %s ready %s local-id: %u\n",
1411           GSC_2s (ch->owner),
1412           ch->client_ready ? "YES" : "NO",
1413           ntohl (ch->ccn.channel_of_client));
1414   }
1415   if (NULL != ch->dest)
1416   {
1417     LOG2 (level,
1418           "CHN destination %s ready %s local-id: %u\n",
1419           GSC_2s (ch->dest),
1420           ch->client_ready ? "YES" : "NO",
1421           ntohl (ch->ccn.channel_of_client));
1422   }
1423   LOG2 (level,
1424         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1425         ntohl (ch->mid_recv.mid),
1426         (unsigned long long) ch->mid_futures,
1427         ntohl (ch->mid_send.mid));
1428 }
1429
1430
1431
1432 /* end of gnunet-service-cadet-new_channel.c */