misc bugfixes
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - introduce shutdown so we can have half-closed channels, modify
29  *   destroy to include MID to have FIN-ACK equivalents, etc.
30  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31  * - check that '0xFFULL' really is sufficient for flow control!
32  * - revisit handling of 'unreliable' traffic!
33  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34  * - figure out flow control without ACKs (unreliable traffic!)
35  */
36 #include "platform.h"
37 #include "gnunet_util_lib.h"
38 #include "cadet.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
46
47 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
48
49 /**
50  * How long do we initially wait before retransmitting?
51  */
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
53
54 /**
55  * How long do we wait before dropping state about incoming
56  * connection to closed port?
57  */
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
59
60
61 /**
62  * All the states a connection can be in.
63  */
64 enum CadetChannelState
65 {
66   /**
67    * Uninitialized status, should never appear in operation.
68    */
69   CADET_CHANNEL_NEW,
70
71   /**
72    * Connection create message sent, waiting for ACK.
73    */
74   CADET_CHANNEL_OPEN_SENT,
75
76   /**
77    * Connection confirmed, ready to carry traffic.
78    */
79   CADET_CHANNEL_READY
80 };
81
82
83 /**
84  * Info needed to retry a message in case it gets lost.
85  * Note that we DO use this structure also for unreliable
86  * messages.
87  */
88 struct CadetReliableMessage
89 {
90   /**
91    * Double linked list, FIFO style
92    */
93   struct CadetReliableMessage *next;
94
95   /**
96    * Double linked list, FIFO style
97    */
98   struct CadetReliableMessage *prev;
99
100   /**
101    * Which channel is this message in?
102    */
103   struct CadetChannel *ch;
104
105   /**
106    * Entry in the tunnels queue for this message, NULL if it has left
107    * the tunnel.  Used to cancel transmission in case we receive an
108    * ACK in time.
109    */
110   struct CadetTunnelQueueEntry *qe;
111
112   /**
113    * How soon should we retry if we fail to get an ACK?
114    * Messages in the queue are sorted by this value.
115    */
116   struct GNUNET_TIME_Absolute next_retry;
117
118   /**
119    * How long do we wait for an ACK after transmission?
120    * Use for the back-off calculation.
121    */
122   struct GNUNET_TIME_Relative retry_delay;
123
124   /**
125    * Data message we are trying to send.
126    */
127   struct GNUNET_CADET_ChannelAppDataMessage data_message;
128
129   /* followed by variable-size payload */
130 };
131
132
133 /**
134  * List of received out-of-order data messages.
135  */
136 struct CadetOutOfOrderMessage
137 {
138   /**
139    * Double linked list, FIFO style
140    */
141   struct CadetOutOfOrderMessage *next;
142
143   /**
144    * Double linked list, FIFO style
145    */
146   struct CadetOutOfOrderMessage *prev;
147
148   /**
149    * ID of the message (messages up to this point needed
150    * before we give this one to the client).
151    */
152   struct ChannelMessageIdentifier mid;
153
154   /**
155    * The envelope with the payload of the out-of-order message
156    */
157   struct GNUNET_MQ_Envelope *env;
158
159 };
160
161
162 /**
163  * Struct containing all information regarding a channel to a remote client.
164  */
165 struct CadetChannel
166 {
167   /**
168    * Tunnel this channel is in.
169    */
170   struct CadetTunnel *t;
171
172   /**
173    * Last entry in the tunnel's queue relating to control messages
174    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
175    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
176    * transmission in case we receive updated information.
177    */
178   struct CadetTunnelQueueEntry *last_control_qe;
179
180   /**
181    * Client owner of the tunnel, if any.
182    * (Used if this channel represends the initiating end of the tunnel.)
183    */
184   struct CadetClient *owner;
185
186   /**
187    * Client destination of the tunnel, if any.
188    * (Used if this channel represents the listening end of the tunnel.)
189    */
190   struct CadetClient *dest;
191
192   /**
193    * Head of DLL of messages sent and not yet ACK'd.
194    */
195   struct CadetReliableMessage *head_sent;
196
197   /**
198    * Tail of DLL of messages sent and not yet ACK'd.
199    */
200   struct CadetReliableMessage *tail_sent;
201
202   /**
203    * Head of DLL of messages received out of order or while client was unready.
204    */
205   struct CadetOutOfOrderMessage *head_recv;
206
207   /**
208    * Tail DLL of messages received out of order or while client was unready.
209    */
210   struct CadetOutOfOrderMessage *tail_recv;
211
212   /**
213    * Task to resend/poll in case no ACK is received.
214    */
215   struct GNUNET_SCHEDULER_Task *retry_task;
216
217   /**
218    * Last time the channel was used
219    */
220   struct GNUNET_TIME_Absolute timestamp;
221
222   /**
223    * Destination port of the channel.
224    */
225   struct GNUNET_HashCode port;
226
227   /**
228    * Counter for exponential backoff.
229    */
230   struct GNUNET_TIME_Relative retry_time;
231
232   /**
233    * How long does it usually take to get an ACK.
234    */
235   struct GNUNET_TIME_Relative expected_delay;
236
237   /**
238    * Bitfield of already-received messages past @e mid_recv.
239    */
240   uint64_t mid_futures;
241
242   /**
243    * Next MID expected for incoming traffic.
244    */
245   struct ChannelMessageIdentifier mid_recv;
246
247   /**
248    * Next MID to use for outgoing traffic.
249    */
250   struct ChannelMessageIdentifier mid_send;
251
252   /**
253    * Total (reliable) messages pending ACK for this channel.
254    */
255   unsigned int pending_messages;
256
257   /**
258    * Maximum (reliable) messages pending ACK for this channel
259    * before we throttle the client.
260    */
261   unsigned int max_pending_messages;
262
263   /**
264    * Number identifying this channel in its tunnel.
265    */
266   struct GNUNET_CADET_ChannelTunnelNumber ctn;
267
268   /**
269    * Local tunnel number for local client owning the channel.
270    * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
271    */
272   struct GNUNET_CADET_ClientChannelNumber ccn;
273
274   /**
275    * Channel state.
276    */
277   enum CadetChannelState state;
278
279   /**
280    * Can we send data to the client?
281    */
282   int client_ready;
283
284   /**
285    * Can the client send data to us?
286    */
287   int client_allowed;
288
289   /**
290    * Is the tunnel bufferless (minimum latency)?
291    */
292   int nobuffer;
293
294   /**
295    * Is the tunnel reliable?
296    */
297   int reliable;
298
299   /**
300    * Is the tunnel out-of-order?
301    */
302   int out_of_order;
303
304   /**
305    * Flag to signal the destruction of the channel.  If this is set to
306    * #GNUNET_YES the channel will be destroyed once the queue is
307    * empty.
308    */
309   int destroy;
310
311 };
312
313
314 /**
315  * Get the static string for identification of the channel.
316  *
317  * @param ch Channel.
318  *
319  * @return Static string with the channel IDs.
320  */
321 const char *
322 GCCH_2s (const struct CadetChannel *ch)
323 {
324   static char buf[128];
325
326   GNUNET_snprintf (buf,
327                    sizeof (buf),
328                    "Channel %s:%s ctn:%X(%X)",
329                    GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
330                    GNUNET_h2s (&ch->port),
331                    ch->ctn,
332                    ntohl (ch->ccn.channel_of_client));
333   return buf;
334 }
335
336
337 /**
338  * Get the channel's public ID.
339  *
340  * @param ch Channel.
341  *
342  * @return ID used to identify the channel with the remote peer.
343  */
344 struct GNUNET_CADET_ChannelTunnelNumber
345 GCCH_get_id (const struct CadetChannel *ch)
346 {
347   return ch->ctn;
348 }
349
350
351 /**
352  * Destroy the given channel.
353  *
354  * @param ch channel to destroy
355  */
356 static void
357 channel_destroy (struct CadetChannel *ch)
358 {
359   struct CadetReliableMessage *crm;
360   struct CadetOutOfOrderMessage *com;
361
362   while (NULL != (crm = ch->head_sent))
363   {
364     GNUNET_assert (ch == crm->ch);
365     if (NULL != crm->qe)
366     {
367       GCT_send_cancel (crm->qe);
368       crm->qe = NULL;
369     }
370     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
371                                  ch->tail_sent,
372                                  crm);
373     GNUNET_free (crm);
374   }
375   while (NULL != (com = ch->head_recv))
376   {
377     GNUNET_CONTAINER_DLL_remove (ch->head_recv,
378                                  ch->tail_recv,
379                                  com);
380     GNUNET_MQ_discard (com->env);
381     GNUNET_free (com);
382   }
383   if (NULL != ch->last_control_qe)
384   {
385     GCT_send_cancel (ch->last_control_qe);
386     ch->last_control_qe = NULL;
387   }
388   if (NULL != ch->retry_task)
389   {
390     GNUNET_SCHEDULER_cancel (ch->retry_task);
391     ch->retry_task = NULL;
392   }
393   GCT_remove_channel (ch->t,
394                       ch,
395                       ch->ctn);
396   GNUNET_free (ch);
397 }
398
399
400 /**
401  * Send a channel create message.
402  *
403  * @param cls Channel for which to send.
404  */
405 static void
406 send_channel_open (void *cls);
407
408
409 /**
410  * Function called once the tunnel confirms that we sent the
411  * create message.  Delays for a bit until we retry.
412  *
413  * @param cls our `struct CadetChannel`.
414  */
415 static void
416 channel_open_sent_cb (void *cls)
417 {
418   struct CadetChannel *ch = cls;
419
420   ch->last_control_qe = NULL;
421   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
422   ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
423                                                  &send_channel_open,
424                                                  ch);
425 }
426
427
428 /**
429  * Send a channel open message.
430  *
431  * @param cls Channel for which to send.
432  */
433 static void
434 send_channel_open (void *cls)
435 {
436   struct CadetChannel *ch = cls;
437   struct GNUNET_CADET_ChannelOpenMessage msgcc;
438   uint32_t options;
439
440   ch->retry_task = NULL;
441   LOG (GNUNET_ERROR_TYPE_DEBUG,
442        "Sending CHANNEL_OPEN message for %s\n",
443        GCCH_2s (ch));
444   options = 0;
445   if (ch->nobuffer)
446     options |= GNUNET_CADET_OPTION_NOBUFFER;
447   if (ch->reliable)
448     options |= GNUNET_CADET_OPTION_RELIABLE;
449   if (ch->out_of_order)
450     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
451   msgcc.header.size = htons (sizeof (msgcc));
452   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
453   msgcc.opt = htonl (options);
454   msgcc.port = ch->port;
455   msgcc.ctn = ch->ctn;
456   ch->state = CADET_CHANNEL_OPEN_SENT;
457   ch->last_control_qe = GCT_send (ch->t,
458                                   &msgcc.header,
459                                   &channel_open_sent_cb,
460                                   ch);
461 }
462
463
464 /**
465  * Function called once and only once after a channel was bound
466  * to its tunnel via #GCT_add_channel() is ready for transmission.
467  * Note that this is only the case for channels that this peer
468  * initiates, as for incoming channels we assume that they are
469  * ready for transmission immediately upon receiving the open
470  * message.  Used to bootstrap the #GCT_send() process.
471  *
472  * @param ch the channel for which the tunnel is now ready
473  */
474 void
475 GCCH_tunnel_up (struct CadetChannel *ch)
476 {
477   GNUNET_assert (NULL == ch->retry_task);
478   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_channel_open,
479                                              ch);
480 }
481
482
483 /**
484  * Create a new channel.
485  *
486  * @param owner local client owning the channel
487  * @param ccn local number of this channel at the @a owner
488  * @param destination peer to which we should build the channel
489  * @param port desired port at @a destination
490  * @param options options for the channel
491  * @return handle to the new channel
492  */
493 struct CadetChannel *
494 GCCH_channel_local_new (struct CadetClient *owner,
495                         struct GNUNET_CADET_ClientChannelNumber ccn,
496                         struct CadetPeer *destination,
497                         const struct GNUNET_HashCode *port,
498                         uint32_t options)
499 {
500   struct CadetChannel *ch;
501
502   ch = GNUNET_new (struct CadetChannel);
503   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
504   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
505   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
506   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
507   ch->owner = owner;
508   ch->ccn = ccn;
509   ch->port = *port;
510   ch->t = GCP_get_tunnel (destination,
511                           GNUNET_YES);
512   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
513   ch->ctn = GCT_add_channel (ch->t,
514                              ch);
515   GNUNET_STATISTICS_update (stats,
516                             "# channels",
517                             1,
518                             GNUNET_NO);
519   LOG (GNUNET_ERROR_TYPE_DEBUG,
520        "Created channel to port %s at peer %s for client %s using tunnel %s\n",
521        GNUNET_h2s (port),
522        GCP_2s (destination),
523        GSC_2s (owner),
524        GCT_2s (ch->t));
525   return ch;
526 }
527
528
529 /**
530  * We had an incoming channel to a port that is closed.
531  * It has not been opened for a while, drop it.
532  *
533  * @param cls the channel to drop
534  */
535 static void
536 timeout_closed_cb (void *cls)
537 {
538   struct CadetChannel *ch = cls;
539
540   ch->retry_task = NULL;
541   LOG (GNUNET_ERROR_TYPE_DEBUG,
542        "Closing incoming channel to port %s from peer %s due to timeout\n",
543        GNUNET_h2s (&ch->port),
544        GCP_2s (GCT_get_destination (ch->t)));
545   channel_destroy (ch);
546 }
547
548
549 /**
550  * Create a new channel based on a request coming in over the network.
551  *
552  * @param t tunnel to the remote peer
553  * @param ctn identifier of this channel in the tunnel
554  * @param port desired local port
555  * @param options options for the channel
556  * @return handle to the new channel
557  */
558 struct CadetChannel *
559 GCCH_channel_incoming_new (struct CadetTunnel *t,
560                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
561                            const struct GNUNET_HashCode *port,
562                            uint32_t options)
563 {
564   struct CadetChannel *ch;
565   struct CadetClient *c;
566
567   ch = GNUNET_new (struct CadetChannel);
568   ch->port = *port;
569   ch->t = t;
570   ch->ctn = ctn;
571   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
572   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
573   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
574   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
575   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
576   GNUNET_STATISTICS_update (stats,
577                             "# channels",
578                             1,
579                             GNUNET_NO);
580
581   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
582                                          port);
583   if (NULL == c)
584   {
585     /* port closed, wait for it to possibly open */
586     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
587                                               port,
588                                               ch,
589                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
590     ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
591                                                    &timeout_closed_cb,
592                                                    ch);
593     LOG (GNUNET_ERROR_TYPE_DEBUG,
594          "Created loose incoming channel to port %s from peer %s\n",
595          GNUNET_h2s (&ch->port),
596          GCP_2s (GCT_get_destination (ch->t)));
597   }
598   else
599   {
600     GCCH_bind (ch,
601                c);
602   }
603   GNUNET_STATISTICS_update (stats,
604                             "# channels",
605                             1,
606                             GNUNET_NO);
607   return ch;
608 }
609
610
611 /**
612  * Function called once the tunnel confirms that we sent the
613  * ACK message.  Just remembers it was sent, we do not expect
614  * ACKs for ACKs ;-).
615  *
616  * @param cls our `struct CadetChannel`.
617  */
618 static void
619 send_ack_cb (void *cls)
620 {
621   struct CadetChannel *ch = cls;
622
623   ch->last_control_qe = NULL;
624 }
625
626
627 /**
628  * Compute and send the current ACK to the other peer.
629  *
630  * @param ch channel to send the ACK for
631  */
632 static void
633 send_channel_data_ack (struct CadetChannel *ch)
634 {
635   struct GNUNET_CADET_ChannelDataAckMessage msg;
636
637   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
638   msg.header.size = htons (sizeof (msg));
639   msg.ctn = ch->ctn;
640   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
641   msg.futures = GNUNET_htonll (ch->mid_futures);
642   if (NULL != ch->last_control_qe)
643     GCT_send_cancel (ch->last_control_qe);
644   ch->last_control_qe = GCT_send (ch->t,
645                                   &msg.header,
646                                   &send_ack_cb,
647                                   ch);
648 }
649
650
651 /**
652  * Send our initial ACK to the client confirming that the
653  * connection is up.
654  *
655  * @param cls the `struct CadetChannel`
656  */
657 static void
658 send_connect_ack (void *cls)
659 {
660   struct CadetChannel *ch = cls;
661
662   ch->retry_task = NULL;
663   send_channel_data_ack (ch);
664 }
665
666
667 /**
668  * Send a LOCAL ACK to the client to solicit more messages.
669  *
670  * @param ch channel the ack is for
671  * @param c client to send the ACK to
672  */
673 static void
674 send_ack_to_client (struct CadetChannel *ch,
675                     struct CadetClient *c)
676 {
677   struct GNUNET_MQ_Envelope *env;
678   struct GNUNET_CADET_LocalAck *ack;
679
680   env = GNUNET_MQ_msg (ack,
681                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
682   ack->ccn = ch->ccn;
683   GSC_send_to_client (c,
684                       env);
685 }
686
687
688 /**
689  * A client is bound to the port that we have a channel
690  * open to.  Send the acknowledgement for the connection
691  * request and establish the link with the client.
692  *
693  * @param ch open incoming channel
694  * @param c client listening on the respective port
695  */
696 void
697 GCCH_bind (struct CadetChannel *ch,
698            struct CadetClient *c)
699 {
700   struct GNUNET_MQ_Envelope *env;
701   struct GNUNET_CADET_LocalChannelCreateMessage *tcm;
702   uint32_t options;
703
704   LOG (GNUNET_ERROR_TYPE_DEBUG,
705        "Binding %s from tunnel %s to port %s of client %s\n",
706        GCCH_2s (ch),
707        GCT_2s (ch->t),
708        GNUNET_h2s (&ch->port),
709        GSC_2s (c));
710   if (NULL != ch->retry_task)
711   {
712     /* there might be a timeout task here */
713     GNUNET_SCHEDULER_cancel (ch->retry_task);
714     ch->retry_task = NULL;
715   }
716   options = 0;
717   if (ch->nobuffer)
718     options |= GNUNET_CADET_OPTION_NOBUFFER;
719   if (ch->reliable)
720     options |= GNUNET_CADET_OPTION_RELIABLE;
721   if (ch->out_of_order)
722     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
723   ch->dest = c;
724   ch->ccn = GSC_bind (c,
725                       ch,
726                       GCT_get_destination (ch->t),
727                       &ch->port,
728                       options);
729   ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
730
731   /* notify other peer that we accepted the connection */
732   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
733                                              ch);
734   /* give client it's initial supply of ACKs */
735   env = GNUNET_MQ_msg (tcm,
736                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
737   tcm->ccn = ch->ccn;
738   tcm->peer = *GCP_get_id (GCT_get_destination (ch->t));
739   tcm->port = ch->port;
740   tcm->opt = htonl (options);
741   GSC_send_to_client (ch->dest,
742                       env);
743   for (unsigned int i=0;i<ch->max_pending_messages;i++)
744     send_ack_to_client (ch,
745                         ch->owner);
746 }
747
748
749 /**
750  * Destroy locally created channel.  Called by the
751  * local client, so no need to tell the client.
752  *
753  * @param ch channel to destroy
754  */
755 void
756 GCCH_channel_local_destroy (struct CadetChannel *ch)
757 {
758   if (GNUNET_YES == ch->destroy)
759   {
760     /* other end already destroyed, with the local client gone, no need
761        to finish transmissions, just destroy immediately. */
762     channel_destroy (ch);
763     return;
764   }
765   if (NULL != ch->head_sent)
766   {
767     /* allow send queue to train first */
768     ch->destroy = GNUNET_YES;
769     return;
770   }
771   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
772   if (CADET_CHANNEL_NEW != ch->state)
773     GCT_send_channel_destroy (ch->t,
774                               ch->ctn);
775   /* Now finish our clean up */
776   channel_destroy (ch);
777 }
778
779
780 /**
781  * Destroy channel that was incoming.  Called by the
782  * local client, so no need to tell the client.
783  *
784  * @param ch channel to destroy
785  */
786 void
787 GCCH_channel_incoming_destroy (struct CadetChannel *ch)
788 {
789   if (GNUNET_YES == ch->destroy)
790   {
791     /* other end already destroyed, with the remote client gone, no need
792        to finish transmissions, just destroy immediately. */
793     channel_destroy (ch);
794     return;
795   }
796   if (NULL != ch->head_recv)
797   {
798     /* allow local client to see all data first */
799     ch->destroy = GNUNET_YES;
800     return;
801   }
802   /* Nothing left to do, just finish destruction */
803   GCT_send_channel_destroy (ch->t,
804                             ch->ctn);
805   channel_destroy (ch);
806 }
807
808
809 /**
810  * We got an acknowledgement for the creation of the channel
811  * (the port is open on the other side). Begin transmissions.
812  *
813  * @param ch channel to destroy
814  */
815 void
816 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
817 {
818   switch (ch->state)
819   {
820   case CADET_CHANNEL_NEW:
821     /* this should be impossible */
822     GNUNET_break (0);
823     break;
824   case CADET_CHANNEL_OPEN_SENT:
825     if (NULL == ch->owner)
826     {
827       /* We're not the owner, wrong direction! */
828       GNUNET_break_op (0);
829       return;
830     }
831     LOG (GNUNET_ERROR_TYPE_DEBUG,
832          "Received channel OPEN_ACK for waiting %s, entering READY state\n",
833          GCCH_2s (ch));
834     GNUNET_SCHEDULER_cancel (ch->retry_task);
835     ch->retry_task = NULL;
836     ch->state = CADET_CHANNEL_READY;
837     /* On first connect, send client as many ACKs as we allow messages
838        to be buffered! */
839     for (unsigned int i=0;i<ch->max_pending_messages;i++)
840       send_ack_to_client (ch,
841                           ch->owner);
842     break;
843   case CADET_CHANNEL_READY:
844     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
845     LOG (GNUNET_ERROR_TYPE_DEBUG,
846          "Received duplicate channel OPEN_ACK for %s\n",
847          GCCH_2s (ch));
848     GNUNET_STATISTICS_update (stats,
849                               "# duplicate CREATE_ACKs",
850                               1,
851                               GNUNET_NO);
852     break;
853   }
854 }
855
856
857 /**
858  * Test if element @a e1 comes before element @a e2.
859  *
860  * TODO: use opportunity to create generic list insertion sort
861  * logic in container!
862  *
863  * @param cls closure, our `struct CadetChannel`
864  * @param e1 an element of to sort
865  * @param e2 another element to sort
866  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
867  */
868 static int
869 is_before (void *cls,
870            void *e1,
871            void *e2)
872 {
873   struct CadetOutOfOrderMessage *m1 = e1;
874   struct CadetOutOfOrderMessage *m2 = e2;
875   uint32_t v1 = ntohl (m1->mid.mid);
876   uint32_t v2 = ntohl (m2->mid.mid);
877   uint32_t delta;
878
879   delta = v1 - v2;
880   if (delta > (uint32_t) INT_MAX)
881   {
882     /* in overflow range, we can safely assume we wrapped around */
883     return GNUNET_NO;
884   }
885   else
886   {
887     return GNUNET_YES;
888   }
889 }
890
891
892 /**
893  * We got payload data for a channel.  Pass it on to the client
894  * and send an ACK to the other end (once flow control allows it!)
895  *
896  * @param ch channel that got data
897  */
898 void
899 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
900                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
901 {
902   struct GNUNET_MQ_Envelope *env;
903   struct GNUNET_CADET_LocalData *ld;
904   struct CadetOutOfOrderMessage *com;
905   size_t payload_size;
906
907   payload_size = ntohs (msg->header.size) - sizeof (*msg);
908   LOG (GNUNET_ERROR_TYPE_DEBUG,
909        "Receicved %u bytes of application data on %s\n",
910        (unsigned int) payload_size,
911        GCCH_2s (ch));
912   env = GNUNET_MQ_msg_extra (ld,
913                              payload_size,
914                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
915   ld->ccn = ch->ccn;
916   GNUNET_memcpy (&ld[1],
917                  &msg[1],
918                  payload_size);
919   if ( (GNUNET_YES == ch->client_ready) &&
920        ( (GNUNET_YES == ch->out_of_order) ||
921          (msg->mid.mid == ch->mid_recv.mid) ) )
922   {
923     GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
924                         env);
925     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
926     ch->mid_futures >>= 1;
927   }
928   else
929   {
930     /* FIXME-SECURITY: if the element is WAY too far ahead,
931        drop it (can't buffer too much!) */
932     com = GNUNET_new (struct CadetOutOfOrderMessage);
933     com->mid = msg->mid;
934     com->env = env;
935     /* sort into list ordered by "is_before" */
936     if ( (NULL == ch->head_recv) ||
937          (GNUNET_YES == is_before (ch,
938                                    com,
939                                    ch->head_recv)) )
940     {
941       GNUNET_CONTAINER_DLL_insert (ch->head_recv,
942                                    ch->tail_recv,
943                                    com);
944     }
945     else
946     {
947       struct CadetOutOfOrderMessage *pos;
948
949       for (pos = ch->head_recv;
950            NULL != pos;
951            pos = pos->next)
952       {
953         if (GNUNET_YES !=
954             is_before (ch,
955                        pos,
956                        com))
957           break;
958       }
959       if (NULL == pos)
960         GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
961                                           ch->tail_recv,
962                                           com);
963       else
964         GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
965                                            ch->tail_recv,
966                                            com,
967                                            pos->prev);
968     }
969   }
970 }
971
972
973 /**
974  * We got an acknowledgement for payload data for a channel.
975  * Possibly resume transmissions.
976  *
977  * @param ch channel that got the ack
978  * @param ack details about what was received
979  */
980 void
981 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
982                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
983 {
984   struct CadetReliableMessage *crm;
985
986   if (GNUNET_NO == ch->reliable)
987   {
988     /* not expecting ACKs on unreliable channel, odd */
989     GNUNET_break_op (0);
990     return;
991   }
992   for (crm = ch->head_sent;
993         NULL != crm;
994        crm = crm->next)
995     if (ack->mid.mid == crm->data_message.mid.mid)
996       break;
997   if (NULL == crm)
998   {
999     /* ACK for message we already dropped, might have been a
1000        duplicate ACK? Ignore. */
1001     GNUNET_STATISTICS_update (stats,
1002                               "# duplicate DATA_ACKs",
1003                               1,
1004                               GNUNET_NO);
1005     return;
1006   }
1007   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1008                                ch->tail_sent,
1009                                crm);
1010   ch->pending_messages--;
1011   GNUNET_free (crm);
1012   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1013   send_ack_to_client (ch,
1014                       (NULL == ch->owner) ? ch->dest : ch->owner);
1015 }
1016
1017
1018 /**
1019  * Destroy channel, based on the other peer closing the
1020  * connection.  Also needs to remove this channel from
1021  * the tunnel.
1022  *
1023  * @param ch channel to destroy
1024  */
1025 void
1026 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1027 {
1028   struct GNUNET_MQ_Envelope *env;
1029   struct GNUNET_CADET_LocalChannelDestroyMessage *tdm;
1030
1031   LOG (GNUNET_ERROR_TYPE_DEBUG,
1032        "Received remote channel DESTROY for %s\n",
1033        GCCH_2s (ch));
1034   ch->destroy = GNUNET_YES;
1035   env = GNUNET_MQ_msg (tdm,
1036                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
1037   tdm->ccn = ch->ccn;
1038   GSC_send_to_client ((NULL != ch->owner) ? ch->owner : ch->dest,
1039                       env);
1040   channel_destroy (ch);
1041 }
1042
1043
1044 /**
1045  * Function called once the tunnel has sent one of our messages.
1046  * If the message is unreliable, simply frees the `crm`. If the
1047  * message was reliable, calculate retransmission time and
1048  * wait for ACK (or retransmit).
1049  *
1050  * @param cls the `struct CadetReliableMessage` that was sent
1051  */
1052 static void
1053 data_sent_cb (void *cls);
1054
1055
1056 /**
1057  * We need to retry a transmission, the last one took too long to
1058  * be acknowledged.
1059  *
1060  * @param cls the `struct CadetChannel` where we need to retransmit
1061  */
1062 static void
1063 retry_transmission (void *cls)
1064 {
1065   struct CadetChannel *ch = cls;
1066   struct CadetReliableMessage *crm = ch->head_sent;
1067
1068   ch->retry_task = NULL;
1069   GNUNET_assert (NULL == crm->qe);
1070   crm->qe = GCT_send (ch->t,
1071                       &crm->data_message.header,
1072                       &data_sent_cb,
1073                       crm);
1074 }
1075
1076
1077 /**
1078  * Check if we can now allow the client to transmit, and if so,
1079  * let the client know about it.
1080  *
1081  * @param ch channel to check
1082  */
1083 static void
1084 GCCH_check_allow_client (struct CadetChannel *ch)
1085 {
1086   struct GNUNET_MQ_Envelope *env;
1087   struct GNUNET_CADET_LocalAck *msg;
1088
1089   if (GNUNET_YES == ch->client_allowed)
1090     return; /* client already allowed! */
1091   if (CADET_CHANNEL_READY != ch->state)
1092   {
1093     /* destination did not yet ACK our CREATE! */
1094     LOG (GNUNET_ERROR_TYPE_DEBUG,
1095          "%s not yet ready, throttling client until ACK.\n",
1096          GCCH_2s (ch));
1097     return;
1098   }
1099   if (ch->pending_messages > ch->max_pending_messages)
1100   {
1101     /* Too many messages in queue. */
1102     LOG (GNUNET_ERROR_TYPE_DEBUG,
1103          "Message queue still too long on %s, throttling client until ACK.\n",
1104          GCCH_2s (ch));
1105     return;
1106   }
1107   if ( (NULL != ch->head_sent) &&
1108        (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
1109   {
1110     LOG (GNUNET_ERROR_TYPE_DEBUG,
1111          "Gap in ACKs too big on %s, throttling client until ACK.\n",
1112          GCCH_2s (ch));
1113     return;
1114   }
1115   ch->client_allowed = GNUNET_YES;
1116
1117
1118   LOG (GNUNET_ERROR_TYPE_DEBUG,
1119        "Sending local ack to %s client\n",
1120        GCCH_2s (ch));
1121   env = GNUNET_MQ_msg (msg,
1122                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1123   msg->ccn = ch->ccn;
1124   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1125                       env);
1126 }
1127
1128
1129 /**
1130  * Function called once the tunnel has sent one of our messages.
1131  * If the message is unreliable, simply frees the `crm`. If the
1132  * message was reliable, calculate retransmission time and
1133  * wait for ACK (or retransmit).
1134  *
1135  * @param cls the `struct CadetReliableMessage` that was sent
1136  */
1137 static void
1138 data_sent_cb (void *cls)
1139 {
1140   struct CadetReliableMessage *crm = cls;
1141   struct CadetChannel *ch = crm->ch;
1142   struct CadetReliableMessage *off;
1143
1144   crm->qe = NULL;
1145   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1146                                ch->tail_sent,
1147                                crm);
1148   if (GNUNET_NO == ch->reliable)
1149   {
1150     GNUNET_free (crm);
1151     ch->pending_messages--;
1152     GCCH_check_allow_client (ch);
1153     return;
1154   }
1155   if (0 == crm->retry_delay.rel_value_us)
1156     crm->retry_delay = ch->expected_delay;
1157   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1158
1159   /* find position for re-insertion into the DLL */
1160   if ( (NULL == ch->head_sent) ||
1161        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1162   {
1163     /* insert at HEAD, also (re)schedule retry task! */
1164     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1165                                  ch->tail_sent,
1166                                  crm);
1167     if (NULL != ch->retry_task)
1168       GNUNET_SCHEDULER_cancel (ch->retry_task);
1169     ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1170                                                    &retry_transmission,
1171                                                    ch);
1172     return;
1173   }
1174   for (off = ch->head_sent; NULL != off; off = off->next)
1175     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1176       break;
1177   if (NULL == off)
1178   {
1179     /* insert at tail */
1180     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1181                                       ch->tail_sent,
1182                                       crm);
1183   }
1184   else
1185   {
1186     /* insert before off */
1187     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1188                                        ch->tail_sent,
1189                                        off->prev,
1190                                        crm);
1191   }
1192 }
1193
1194
1195 /**
1196  * Handle data given by a client.
1197  *
1198  * Check whether the client is allowed to send in this tunnel, save if
1199  * channel is reliable and send an ACK to the client if there is still
1200  * buffer space in the tunnel.
1201  *
1202  * @param ch Channel.
1203  * @param message payload to transmit.
1204  * @return #GNUNET_OK if everything goes well,
1205  *         #GNUNET_SYSERR in case of an error.
1206  */
1207 int
1208 GCCH_handle_local_data (struct CadetChannel *ch,
1209                         const struct GNUNET_MessageHeader *message)
1210 {
1211   uint16_t payload_size = ntohs (message->size);
1212   struct CadetReliableMessage *crm;
1213
1214   if (GNUNET_NO == ch->client_allowed)
1215   {
1216     GNUNET_break_op (0);
1217     return GNUNET_SYSERR;
1218   }
1219   ch->client_allowed = GNUNET_NO;
1220   ch->pending_messages++;
1221
1222   /* Everything is correct, send the message. */
1223   crm = GNUNET_malloc (sizeof (*crm) + payload_size);
1224   crm->ch = ch;
1225   crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size);
1226   crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1227   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1228   crm->data_message.mid = ch->mid_send;
1229   crm->data_message.ctn = ch->ctn;
1230   GNUNET_memcpy (&crm[1],
1231                  message,
1232                  payload_size);
1233   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1234                                ch->tail_sent,
1235                                crm);
1236   LOG (GNUNET_ERROR_TYPE_DEBUG,
1237        "Sending %u bytes from local client to %s\n",
1238        payload_size,
1239        GCCH_2s (ch));
1240   crm->qe = GCT_send (ch->t,
1241                       &crm->data_message.header,
1242                       &data_sent_cb,
1243                       crm);
1244   GCCH_check_allow_client (ch);
1245   return GNUNET_OK;
1246 }
1247
1248
1249 /**
1250  * Try to deliver messages to the local client, if it is ready for more.
1251  *
1252  * @param ch channel to process
1253  */
1254 static void
1255 send_client_buffered_data (struct CadetChannel *ch)
1256 {
1257   struct CadetOutOfOrderMessage *com;
1258
1259   if (GNUNET_NO == ch->client_ready)
1260     return; /* client not ready */
1261   com = ch->head_recv;
1262   if (NULL == com)
1263     return; /* none pending */
1264   if ( (com->mid.mid != ch->mid_recv.mid) &&
1265        (GNUNET_NO == ch->out_of_order) )
1266     return; /* missing next one in-order */
1267
1268   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269               "Passing payload message to client on %s\n",
1270               GCCH_2s (ch));
1271
1272   /* all good, pass next message to client */
1273   GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1274                                ch->tail_recv,
1275                                com);
1276   /* FIXME: if unreliable, this is not aggressive
1277      enough, as it would be OK to have lost some! */
1278   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1279   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1280   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1281                       com->env);
1282   GNUNET_free (com);
1283   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1284        (GNUNET_YES == ch->reliable) )
1285   {
1286     /* The next 15 messages were also already received (0xFF), this
1287        suggests that the sender may be blocked on flow control
1288        urgently waiting for an ACK from us. (As we have an inherent
1289        maximum of 64 bits, and 15 is getting too close for comfort.)
1290        So we should send one now. */
1291     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1292                 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1293                 GCCH_2s (ch));
1294     if (GNUNET_YES == ch->reliable)
1295       send_channel_data_ack (ch);
1296   }
1297
1298   if (NULL != ch->head_recv)
1299     return;
1300   if (GNUNET_NO == ch->destroy)
1301     return;
1302   GCT_send_channel_destroy (ch->t,
1303                             ch->ctn);
1304   channel_destroy (ch);
1305 }
1306
1307
1308 /**
1309  * Handle ACK from client on local channel.
1310  *
1311  * @param ch channel to destroy
1312  */
1313 void
1314 GCCH_handle_local_ack (struct CadetChannel *ch)
1315 {
1316   ch->client_ready = GNUNET_YES;
1317   send_client_buffered_data (ch);
1318 }
1319
1320
1321 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1322
1323
1324 /**
1325  * Log channel info.
1326  *
1327  * @param ch Channel.
1328  * @param level Debug level to use.
1329  */
1330 void
1331 GCCH_debug (struct CadetChannel *ch,
1332             enum GNUNET_ErrorType level)
1333 {
1334   int do_log;
1335
1336   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1337                                        "cadet-chn",
1338                                        __FILE__, __FUNCTION__, __LINE__);
1339   if (0 == do_log)
1340     return;
1341
1342   if (NULL == ch)
1343   {
1344     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1345     return;
1346   }
1347   LOG2 (level,
1348         "CHN %s:%X (%p)\n",
1349         GCT_2s (ch->t),
1350         ch->ctn,
1351         ch);
1352   if (NULL != ch->owner)
1353   {
1354     LOG2 (level,
1355           "CHN origin %s ready %s local-id: %u\n",
1356           GSC_2s (ch->owner),
1357           ch->client_ready ? "YES" : "NO",
1358           ntohl (ch->ccn.channel_of_client));
1359   }
1360   if (NULL != ch->dest)
1361   {
1362     LOG2 (level,
1363           "CHN destination %s ready %s local-id: %u\n",
1364           GSC_2s (ch->dest),
1365           ch->client_ready ? "YES" : "NO",
1366           ntohl (ch->ccn.channel_of_client));
1367   }
1368   LOG2 (level,
1369         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1370         ntohl (ch->mid_recv.mid),
1371         (unsigned long long) ch->mid_futures,
1372         ntohl (ch->mid_send.mid));
1373 }
1374
1375
1376
1377 /* end of gnunet-service-cadet-new_channel.c */