renames to avoid ambiguity
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - introduce shutdown so we can have half-closed channels, modify
29  *   destroy to include MID to have FIN-ACK equivalents, etc.
30  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31  * - check that '0xFFULL' really is sufficient for flow control!
32  * - revisit handling of 'unreliable' traffic!
33  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
34  * - figure out flow control without ACKs (unreliable traffic!)
35  */
36 #include "platform.h"
37 #include "gnunet_util_lib.h"
38 #include "cadet.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet-service-cadet-new.h"
41 #include "gnunet-service-cadet-new_channel.h"
42 #include "gnunet-service-cadet-new_connection.h"
43 #include "gnunet-service-cadet-new_tunnels.h"
44 #include "gnunet-service-cadet-new_peer.h"
45 #include "gnunet-service-cadet-new_paths.h"
46
47 #define LOG(level, ...) GNUNET_log (level,__VA_ARGS__)
48
49 /**
50  * How long do we initially wait before retransmitting?
51  */
52 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
53
54 /**
55  * How long do we wait before dropping state about incoming
56  * connection to closed port?
57  */
58 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
59
60
61 /**
62  * All the states a connection can be in.
63  */
64 enum CadetChannelState
65 {
66   /**
67    * Uninitialized status, should never appear in operation.
68    */
69   CADET_CHANNEL_NEW,
70
71   /**
72    * Connection create message sent, waiting for ACK.
73    */
74   CADET_CHANNEL_CREATE_SENT,
75
76   /**
77    * Connection confirmed, ready to carry traffic.
78    */
79   CADET_CHANNEL_READY
80 };
81
82
83 /**
84  * Info needed to retry a message in case it gets lost.
85  * Note that we DO use this structure also for unreliable
86  * messages.
87  */
88 struct CadetReliableMessage
89 {
90   /**
91    * Double linked list, FIFO style
92    */
93   struct CadetReliableMessage *next;
94
95   /**
96    * Double linked list, FIFO style
97    */
98   struct CadetReliableMessage *prev;
99
100   /**
101    * Which channel is this message in?
102    */
103   struct CadetChannel *ch;
104
105   /**
106    * Entry in the tunnels queue for this message, NULL if it has left
107    * the tunnel.  Used to cancel transmission in case we receive an
108    * ACK in time.
109    */
110   struct CadetTunnelQueueEntry *qe;
111
112   /**
113    * How soon should we retry if we fail to get an ACK?
114    * Messages in the queue are sorted by this value.
115    */
116   struct GNUNET_TIME_Absolute next_retry;
117
118   /**
119    * How long do we wait for an ACK after transmission?
120    * Use for the back-off calculation.
121    */
122   struct GNUNET_TIME_Relative retry_delay;
123
124   /**
125    * Data message we are trying to send.
126    */
127   struct GNUNET_CADET_ChannelAppDataMessage data_message;
128
129   /* followed by variable-size payload */
130 };
131
132
133 /**
134  * List of received out-of-order data messages.
135  */
136 struct CadetOutOfOrderMessage
137 {
138   /**
139    * Double linked list, FIFO style
140    */
141   struct CadetOutOfOrderMessage *next;
142
143   /**
144    * Double linked list, FIFO style
145    */
146   struct CadetOutOfOrderMessage *prev;
147
148   /**
149    * ID of the message (messages up to this point needed
150    * before we give this one to the client).
151    */
152   struct ChannelMessageIdentifier mid;
153
154   /**
155    * The envelope with the payload of the out-of-order message
156    */
157   struct GNUNET_MQ_Envelope *env;
158
159 };
160
161
162 /**
163  * Struct containing all information regarding a channel to a remote client.
164  */
165 struct CadetChannel
166 {
167   /**
168    * Tunnel this channel is in.
169    */
170   struct CadetTunnel *t;
171
172   /**
173    * Last entry in the tunnel's queue relating to control messages
174    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
175    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
176    * transmission in case we receive updated information.
177    */
178   struct CadetTunnelQueueEntry *last_control_qe;
179
180   /**
181    * Client owner of the tunnel, if any.
182    * (Used if this channel represends the initiating end of the tunnel.)
183    */
184   struct CadetClient *owner;
185
186   /**
187    * Client destination of the tunnel, if any.
188    * (Used if this channel represents the listening end of the tunnel.)
189    */
190   struct CadetClient *dest;
191
192   /**
193    * Head of DLL of messages sent and not yet ACK'd.
194    */
195   struct CadetReliableMessage *head_sent;
196
197   /**
198    * Tail of DLL of messages sent and not yet ACK'd.
199    */
200   struct CadetReliableMessage *tail_sent;
201
202   /**
203    * Head of DLL of messages received out of order or while client was unready.
204    */
205   struct CadetOutOfOrderMessage *head_recv;
206
207   /**
208    * Tail DLL of messages received out of order or while client was unready.
209    */
210   struct CadetOutOfOrderMessage *tail_recv;
211
212   /**
213    * Task to resend/poll in case no ACK is received.
214    */
215   struct GNUNET_SCHEDULER_Task *retry_task;
216
217   /**
218    * Last time the channel was used
219    */
220   struct GNUNET_TIME_Absolute timestamp;
221
222   /**
223    * Destination port of the channel.
224    */
225   struct GNUNET_HashCode port;
226
227   /**
228    * Counter for exponential backoff.
229    */
230   struct GNUNET_TIME_Relative retry_time;
231
232   /**
233    * How long does it usually take to get an ACK.
234    */
235   struct GNUNET_TIME_Relative expected_delay;
236
237   /**
238    * Bitfield of already-received messages past @e mid_recv.
239    */
240   uint64_t mid_futures;
241
242   /**
243    * Next MID expected for incoming traffic.
244    */
245   struct ChannelMessageIdentifier mid_recv;
246
247   /**
248    * Next MID to use for outgoing traffic.
249    */
250   struct ChannelMessageIdentifier mid_send;
251
252   /**
253    * Total (reliable) messages pending ACK for this channel.
254    */
255   unsigned int pending_messages;
256
257   /**
258    * Maximum (reliable) messages pending ACK for this channel
259    * before we throttle the client.
260    */
261   unsigned int max_pending_messages;
262
263   /**
264    * Number identifying this channel in its tunnel.
265    */
266   struct GNUNET_CADET_ChannelTunnelNumber chid;
267
268   /**
269    * Local tunnel number for local client owning the channel.
270    * ( >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
271    */
272   struct GNUNET_CADET_ClientChannelNumber lid;
273
274   /**
275    * Channel state.
276    */
277   enum CadetChannelState state;
278
279   /**
280    * Can we send data to the client?
281    */
282   int client_ready;
283
284   /**
285    * Can the client send data to us?
286    */
287   int client_allowed;
288
289   /**
290    * Is the tunnel bufferless (minimum latency)?
291    */
292   int nobuffer;
293
294   /**
295    * Is the tunnel reliable?
296    */
297   int reliable;
298
299   /**
300    * Is the tunnel out-of-order?
301    */
302   int out_of_order;
303
304   /**
305    * Flag to signal the destruction of the channel.  If this is set to
306    * #GNUNET_YES the channel will be destroyed once the queue is
307    * empty.
308    */
309   int destroy;
310
311 };
312
313
314 /**
315  * Get the static string for identification of the channel.
316  *
317  * @param ch Channel.
318  *
319  * @return Static string with the channel IDs.
320  */
321 const char *
322 GCCH_2s (const struct CadetChannel *ch)
323 {
324   static char buf[128];
325
326   if (NULL == ch)
327     return "(NULL Channel)";
328   GNUNET_snprintf (buf,
329                    sizeof (buf),
330                    "%s:%s chid:%X (%X)",
331                    GCT_2s (ch->t),
332                    GNUNET_h2s (&ch->port),
333                    ch->chid,
334                    ntohl (ch->lid.channel_of_client));
335   return buf;
336 }
337
338
339 /**
340  * Get the channel's public ID.
341  *
342  * @param ch Channel.
343  *
344  * @return ID used to identify the channel with the remote peer.
345  */
346 struct GNUNET_CADET_ChannelTunnelNumber
347 GCCH_get_id (const struct CadetChannel *ch)
348 {
349   return ch->chid;
350 }
351
352
353 /**
354  * Destroy the given channel.
355  *
356  * @param ch channel to destroy
357  */
358 static void
359 channel_destroy (struct CadetChannel *ch)
360 {
361   struct CadetReliableMessage *crm;
362   struct CadetOutOfOrderMessage *com;
363
364   while (NULL != (crm = ch->head_sent))
365   {
366     GNUNET_assert (ch == crm->ch);
367     if (NULL != crm->qe)
368     {
369       GCT_send_cancel (crm->qe);
370       crm->qe = NULL;
371     }
372     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
373                                  ch->tail_sent,
374                                  crm);
375     GNUNET_free (crm);
376   }
377   while (NULL != (com = ch->head_recv))
378   {
379     GNUNET_CONTAINER_DLL_remove (ch->head_recv,
380                                  ch->tail_recv,
381                                  com);
382     GNUNET_MQ_discard (com->env);
383     GNUNET_free (com);
384   }
385   if (NULL != ch->last_control_qe)
386   {
387     GCT_send_cancel (ch->last_control_qe);
388     ch->last_control_qe = NULL;
389   }
390   if (NULL != ch->retry_task)
391   {
392     GNUNET_SCHEDULER_cancel (ch->retry_task);
393     ch->retry_task = NULL;
394   }
395   GCT_remove_channel (ch->t,
396                       ch,
397                       ch->chid);
398   GNUNET_free (ch);
399 }
400
401
402 /**
403  * Send a channel create message.
404  *
405  * @param cls Channel for which to send.
406  */
407 static void
408 send_create (void *cls);
409
410
411 /**
412  * Function called once the tunnel confirms that we sent the
413  * create message.  Delays for a bit until we retry.
414  *
415  * @param cls our `struct CadetChannel`.
416  */
417 static void
418 create_sent_cb (void *cls)
419 {
420   struct CadetChannel *ch = cls;
421
422   ch->last_control_qe = NULL;
423   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
424   ch->retry_task = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
425                                                  &send_create,
426                                                  ch);
427 }
428
429
430 /**
431  * Send a channel create message.
432  *
433  * @param cls Channel for which to send.
434  */
435 static void
436 send_create (void *cls)
437 {
438   struct CadetChannel *ch = cls;
439   struct GNUNET_CADET_ChannelOpenMessage msgcc;
440   uint32_t options;
441
442   options = 0;
443   if (ch->nobuffer)
444     options |= GNUNET_CADET_OPTION_NOBUFFER;
445   if (ch->reliable)
446     options |= GNUNET_CADET_OPTION_RELIABLE;
447   if (ch->out_of_order)
448     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
449   msgcc.header.size = htons (sizeof (msgcc));
450   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
451   msgcc.opt = htonl (options);
452   msgcc.port = ch->port;
453   msgcc.chid = ch->chid;
454   ch->state = CADET_CHANNEL_CREATE_SENT;
455   ch->last_control_qe = GCT_send (ch->t,
456                                   &msgcc.header,
457                                   &create_sent_cb,
458                                   ch);
459 }
460
461
462 /**
463  * Create a new channel.
464  *
465  * @param owner local client owning the channel
466  * @param owner_id local chid of this channel at the @a owner
467  * @param destination peer to which we should build the channel
468  * @param port desired port at @a destination
469  * @param options options for the channel
470  * @return handle to the new channel
471  */
472 struct CadetChannel *
473 GCCH_channel_local_new (struct CadetClient *owner,
474                         struct GNUNET_CADET_ClientChannelNumber owner_id,
475                         struct CadetPeer *destination,
476                         const struct GNUNET_HashCode *port,
477                         uint32_t options)
478 {
479   struct CadetChannel *ch;
480
481   ch = GNUNET_new (struct CadetChannel);
482   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
483   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
484   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
485   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
486   ch->owner = owner;
487   ch->lid = owner_id;
488   ch->port = *port;
489   ch->t = GCP_get_tunnel (destination,
490                           GNUNET_YES);
491   ch->chid = GCT_add_channel (ch->t,
492                               ch);
493   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
494   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create,
495                                              ch);
496   GNUNET_STATISTICS_update (stats,
497                             "# channels",
498                             1,
499                             GNUNET_NO);
500   return ch;
501 }
502
503
504 /**
505  * We had an incoming channel to a port that is closed.
506  * It has not been opened for a while, drop it.
507  *
508  * @param cls the channel to drop
509  */
510 static void
511 timeout_closed_cb (void *cls)
512 {
513   struct CadetChannel *ch = cls;
514
515   ch->retry_task = NULL;
516   channel_destroy (ch);
517 }
518
519
520 /**
521  * Create a new channel based on a request coming in over the network.
522  *
523  * @param t tunnel to the remote peer
524  * @param chid identifier of this channel in the tunnel
525  * @param port desired local port
526  * @param options options for the channel
527  * @return handle to the new channel
528  */
529 struct CadetChannel *
530 GCCH_channel_incoming_new (struct CadetTunnel *t,
531                            struct GNUNET_CADET_ChannelTunnelNumber chid,
532                            const struct GNUNET_HashCode *port,
533                            uint32_t options)
534 {
535   struct CadetChannel *ch;
536   struct CadetClient *c;
537
538   ch = GNUNET_new (struct CadetChannel);
539   ch->port = *port;
540   ch->t = t;
541   ch->chid = chid;
542   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
543   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
544   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
545   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
546   ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
547   GNUNET_STATISTICS_update (stats,
548                             "# channels",
549                             1,
550                             GNUNET_NO);
551
552   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
553                                          port);
554   if (NULL == c)
555   {
556     /* port closed, wait for it to possibly open */
557     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
558                                               port,
559                                               ch,
560                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
561     ch->retry_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
562                                                    &timeout_closed_cb,
563                                                    ch);
564   }
565   else
566   {
567     GCCH_bind (ch,
568                c);
569   }
570   GNUNET_STATISTICS_update (stats,
571                             "# channels",
572                             1,
573                             GNUNET_NO);
574   return ch;
575 }
576
577
578 /**
579  * Function called once the tunnel confirms that we sent the
580  * ACK message.  Just remembers it was sent, we do not expect
581  * ACKs for ACKs ;-).
582  *
583  * @param cls our `struct CadetChannel`.
584  */
585 static void
586 send_ack_cb (void *cls)
587 {
588   struct CadetChannel *ch = cls;
589
590   ch->last_control_qe = NULL;
591 }
592
593
594 /**
595  * Compute and send the current ACK to the other peer.
596  *
597  * @param ch channel to send the ACK for
598  */
599 static void
600 send_channel_ack (struct CadetChannel *ch)
601 {
602   struct GNUNET_CADET_ChannelDataAckMessage msg;
603
604   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
605   msg.header.size = htons (sizeof (msg));
606   msg.chid = ch->chid;
607   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
608   msg.futures = GNUNET_htonll (ch->mid_futures);
609   if (NULL != ch->last_control_qe)
610     GCT_send_cancel (ch->last_control_qe);
611   ch->last_control_qe = GCT_send (ch->t,
612                                   &msg.header,
613                                   &send_ack_cb,
614                                   ch);
615 }
616
617
618 /**
619  * Send our initial ACK to the client confirming that the
620  * connection is up.
621  *
622  * @param cls the `struct CadetChannel`
623  */
624 static void
625 send_connect_ack (void *cls)
626 {
627   struct CadetChannel *ch = cls;
628
629   ch->retry_task = NULL;
630   send_channel_ack (ch);
631 }
632
633
634 /**
635  * Send a LOCAL ACK to the client to solicit more messages.
636  *
637  * @param ch channel the ack is for
638  * @param c client to send the ACK to
639  */
640 static void
641 send_ack_to_client (struct CadetChannel *ch,
642                     struct CadetClient *c)
643 {
644   struct GNUNET_MQ_Envelope *env;
645   struct GNUNET_CADET_LocalAck *ack;
646
647   env = GNUNET_MQ_msg (ack,
648                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
649   ack->channel_id = ch->lid;
650   GSC_send_to_client (c,
651                       env);
652 }
653
654
655 /**
656  * A client is bound to the port that we have a channel
657  * open to.  Send the acknowledgement for the connection
658  * request and establish the link with the client.
659  *
660  * @param ch open incoming channel
661  * @param c client listening on the respective port
662  */
663 void
664 GCCH_bind (struct CadetChannel *ch,
665            struct CadetClient *c)
666 {
667   struct GNUNET_MQ_Envelope *env;
668   struct GNUNET_CADET_LocalChannelCreateMessage *tcm;
669   uint32_t options;
670
671   if (NULL != ch->retry_task)
672   {
673     /* there might be a timeout task here */
674     GNUNET_SCHEDULER_cancel (ch->retry_task);
675     ch->retry_task = NULL;
676   }
677   options = 0;
678   if (ch->nobuffer)
679     options |= GNUNET_CADET_OPTION_NOBUFFER;
680   if (ch->reliable)
681     options |= GNUNET_CADET_OPTION_RELIABLE;
682   if (ch->out_of_order)
683     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
684   ch->dest = c;
685   ch->lid = GSC_bind (c,
686                       ch,
687                       GCT_get_destination (ch->t),
688                       &ch->port,
689                       options);
690   ch->mid_recv.mid = htonl (1); /* The CONNECT counts as message 0! */
691
692   /* notify other peer that we accepted the connection */
693   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
694                                              ch);
695   /* give client it's initial supply of ACKs */
696   env = GNUNET_MQ_msg (tcm,
697                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
698   tcm->channel_id = ch->lid;
699   tcm->peer = *GCP_get_id (GCT_get_destination (ch->t));
700   tcm->port = ch->port;
701   tcm->opt = htonl (options);
702   GSC_send_to_client (ch->dest,
703                       env);
704   for (unsigned int i=0;i<ch->max_pending_messages;i++)
705     send_ack_to_client (ch,
706                         ch->owner);
707 }
708
709
710 /**
711  * Destroy locally created channel.  Called by the
712  * local client, so no need to tell the client.
713  *
714  * @param ch channel to destroy
715  */
716 void
717 GCCH_channel_local_destroy (struct CadetChannel *ch)
718 {
719   if (GNUNET_YES == ch->destroy)
720   {
721     /* other end already destroyed, with the local client gone, no need
722        to finish transmissions, just destroy immediately. */
723     channel_destroy (ch);
724     return;
725   }
726   if (NULL != ch->head_sent)
727   {
728     /* allow send queue to train first */
729     ch->destroy = GNUNET_YES;
730     return;
731   }
732   /* Nothing left to do, just finish destruction */
733   GCT_send_channel_destroy (ch->t,
734                             ch->chid);
735   channel_destroy (ch);
736 }
737
738
739 /**
740  * Destroy channel that was incoming.  Called by the
741  * local client, so no need to tell the client.
742  *
743  * @param ch channel to destroy
744  */
745 void
746 GCCH_channel_incoming_destroy (struct CadetChannel *ch)
747 {
748   if (GNUNET_YES == ch->destroy)
749   {
750     /* other end already destroyed, with the remote client gone, no need
751        to finish transmissions, just destroy immediately. */
752     channel_destroy (ch);
753     return;
754   }
755   if (NULL != ch->head_recv)
756   {
757     /* allow local client to see all data first */
758     ch->destroy = GNUNET_YES;
759     return;
760   }
761   /* Nothing left to do, just finish destruction */
762   GCT_send_channel_destroy (ch->t,
763                             ch->chid);
764   channel_destroy (ch);
765 }
766
767
768 /**
769  * We got an acknowledgement for the creation of the channel
770  * (the port is open on the other side). Begin transmissions.
771  *
772  * @param ch channel to destroy
773  */
774 void
775 GCCH_handle_channel_create_ack (struct CadetChannel *ch)
776 {
777   switch (ch->state)
778   {
779   case CADET_CHANNEL_NEW:
780     /* this should be impossible */
781     GNUNET_break (0);
782     break;
783   case CADET_CHANNEL_CREATE_SENT:
784     if (NULL == ch->owner)
785     {
786       /* We're not the owner, wrong direction! */
787       GNUNET_break_op (0);
788       return;
789     }
790     ch->state = CADET_CHANNEL_READY;
791     /* On first connect, send client as many ACKs as we allow messages
792        to be buffered! */
793     for (unsigned int i=0;i<ch->max_pending_messages;i++)
794       send_ack_to_client (ch,
795                           ch->owner);
796     break;
797   case CADET_CHANNEL_READY:
798     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
799     GNUNET_STATISTICS_update (stats,
800                               "# duplicate CREATE_ACKs",
801                               1,
802                               GNUNET_NO);
803     break;
804   }
805 }
806
807
808 /**
809  * Test if element @a e1 comes before element @a e2.
810  *
811  * TODO: use opportunity to create generic list insertion sort
812  * logic in container!
813  *
814  * @param cls closure, our `struct CadetChannel`
815  * @param e1 an element of to sort
816  * @param e2 another element to sort
817  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
818  */
819 static int
820 is_before (void *cls,
821            void *e1,
822            void *e2)
823 {
824   struct CadetOutOfOrderMessage *m1 = e1;
825   struct CadetOutOfOrderMessage *m2 = e2;
826   uint32_t v1 = ntohl (m1->mid.mid);
827   uint32_t v2 = ntohl (m2->mid.mid);
828   uint32_t delta;
829
830   delta = v1 - v2;
831   if (delta > (uint32_t) INT_MAX)
832   {
833     /* in overflow range, we can safely assume we wrapped around */
834     return GNUNET_NO;
835   }
836   else
837   {
838     return GNUNET_YES;
839   }
840 }
841
842
843 /**
844  * We got payload data for a channel.  Pass it on to the client
845  * and send an ACK to the other end (once flow control allows it!)
846  *
847  * @param ch channel that got data
848  */
849 void
850 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
851                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
852 {
853   struct GNUNET_MQ_Envelope *env;
854   struct GNUNET_CADET_LocalData *ld;
855   struct CadetOutOfOrderMessage *com;
856   size_t payload_size;
857
858   payload_size = ntohs (msg->header.size) - sizeof (*msg);
859   env = GNUNET_MQ_msg_extra (ld,
860                              payload_size,
861                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
862   ld->channel_id = ch->lid;
863   GNUNET_memcpy (&ld[1],
864                  &msg[1],
865                  payload_size);
866   if ( (GNUNET_YES == ch->client_ready) &&
867        ( (GNUNET_YES == ch->out_of_order) ||
868          (msg->mid.mid == ch->mid_recv.mid) ) )
869   {
870     GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
871                         env);
872     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
873     ch->mid_futures >>= 1;
874   }
875   else
876   {
877     /* FIXME-SECURITY: if the element is WAY too far ahead,
878        drop it (can't buffer too much!) */
879     com = GNUNET_new (struct CadetOutOfOrderMessage);
880     com->mid = msg->mid;
881     com->env = env;
882     /* sort into list ordered by "is_before" */
883     if ( (NULL == ch->head_recv) ||
884          (GNUNET_YES == is_before (ch,
885                                    com,
886                                    ch->head_recv)) )
887     {
888       GNUNET_CONTAINER_DLL_insert (ch->head_recv,
889                                    ch->tail_recv,
890                                    com);
891     }
892     else
893     {
894       struct CadetOutOfOrderMessage *pos;
895
896       for (pos = ch->head_recv;
897            NULL != pos;
898            pos = pos->next)
899       {
900         if (GNUNET_YES !=
901             is_before (ch,
902                        pos,
903                        com))
904           break;
905       }
906       if (NULL == pos)
907         GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
908                                           ch->tail_recv,
909                                           com);
910       else
911         GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
912                                            ch->tail_recv,
913                                            com,
914                                            pos->prev);
915     }
916   }
917 }
918
919
920 /**
921  * We got an acknowledgement for payload data for a channel.
922  * Possibly resume transmissions.
923  *
924  * @param ch channel that got the ack
925  * @param ack details about what was received
926  */
927 void
928 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
929                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
930 {
931   struct CadetReliableMessage *crm;
932
933   if (GNUNET_NO == ch->reliable)
934   {
935     /* not expecting ACKs on unreliable channel, odd */
936     GNUNET_break_op (0);
937     return;
938   }
939   for (crm = ch->head_sent;
940         NULL != crm;
941        crm = crm->next)
942     if (ack->mid.mid == crm->data_message.mid.mid)
943       break;
944   if (NULL == crm)
945   {
946     /* ACK for message we already dropped, might have been a
947        duplicate ACK? Ignore. */
948     GNUNET_STATISTICS_update (stats,
949                               "# duplicate CHANNEL_DATA_ACKs",
950                               1,
951                               GNUNET_NO);
952     return;
953   }
954   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
955                                ch->tail_sent,
956                                crm);
957   ch->pending_messages--;
958   GNUNET_free (crm);
959   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
960   send_ack_to_client (ch,
961                       (NULL == ch->owner) ? ch->dest : ch->owner);
962 }
963
964
965 /**
966  * Destroy channel, based on the other peer closing the
967  * connection.  Also needs to remove this channel from
968  * the tunnel.
969  *
970  * @param ch channel to destroy
971  */
972 void
973 GCCH_handle_remote_destroy (struct CadetChannel *ch)
974 {
975   struct GNUNET_MQ_Envelope *env;
976   struct GNUNET_CADET_LocalChannelDestroyMessage *tdm;
977
978   ch->destroy = GNUNET_YES;
979   env = GNUNET_MQ_msg (tdm,
980                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
981   tdm->channel_id = ch->lid;
982   GSC_send_to_client ((NULL != ch->owner) ? ch->owner : ch->dest,
983                       env);
984   channel_destroy (ch);
985 }
986
987
988 /**
989  * Function called once the tunnel has sent one of our messages.
990  * If the message is unreliable, simply frees the `crm`. If the
991  * message was reliable, calculate retransmission time and
992  * wait for ACK (or retransmit).
993  *
994  * @param cls the `struct CadetReliableMessage` that was sent
995  */
996 static void
997 data_sent_cb (void *cls);
998
999
1000 /**
1001  * We need to retry a transmission, the last one took too long to
1002  * be acknowledged.
1003  *
1004  * @param cls the `struct CadetChannel` where we need to retransmit
1005  */
1006 static void
1007 retry_transmission (void *cls)
1008 {
1009   struct CadetChannel *ch = cls;
1010   struct CadetReliableMessage *crm = ch->head_sent;
1011
1012   GNUNET_assert (NULL == crm->qe);
1013   crm->qe = GCT_send (ch->t,
1014                       &crm->data_message.header,
1015                       &data_sent_cb,
1016                       crm);
1017 }
1018
1019
1020 /**
1021  * Check if we can now allow the client to transmit, and if so,
1022  * let the client know about it.
1023  *
1024  * @param ch channel to check
1025  */
1026 static void
1027 GCCH_check_allow_client (struct CadetChannel *ch)
1028 {
1029   struct GNUNET_MQ_Envelope *env;
1030   struct GNUNET_CADET_LocalAck *msg;
1031
1032   if (GNUNET_YES == ch->client_allowed)
1033     return; /* client already allowed! */
1034   if (CADET_CHANNEL_READY != ch->state)
1035   {
1036     /* destination did not yet ACK our CREATE! */
1037     LOG (GNUNET_ERROR_TYPE_DEBUG,
1038          "Channel %s not yet ready, throttling client until ACK.\n",
1039          GCCH_2s (ch));
1040     return;
1041   }
1042   if (ch->pending_messages > ch->max_pending_messages)
1043   {
1044     /* Too many messages in queue. */
1045     LOG (GNUNET_ERROR_TYPE_DEBUG,
1046          "Message queue still too long on channel %s, throttling client until ACK.\n",
1047          GCCH_2s (ch));
1048     return;
1049   }
1050   if ( (NULL != ch->head_sent) &&
1051        (64 <= ntohl (ch->mid_send.mid) - ntohl (ch->head_sent->data_message.mid.mid)) )
1052   {
1053     LOG (GNUNET_ERROR_TYPE_DEBUG,
1054          "Gap in ACKs too big on channel %s, throttling client until ACK.\n",
1055          GCCH_2s (ch));
1056     return;
1057   }
1058   ch->client_allowed = GNUNET_YES;
1059
1060
1061   LOG (GNUNET_ERROR_TYPE_DEBUG,
1062        "Sending local ack to channel %s client\n",
1063        GCCH_2s (ch));
1064   env = GNUNET_MQ_msg (msg,
1065                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
1066   msg->channel_id = ch->lid;
1067   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1068                       env);
1069 }
1070
1071
1072 /**
1073  * Function called once the tunnel has sent one of our messages.
1074  * If the message is unreliable, simply frees the `crm`. If the
1075  * message was reliable, calculate retransmission time and
1076  * wait for ACK (or retransmit).
1077  *
1078  * @param cls the `struct CadetReliableMessage` that was sent
1079  */
1080 static void
1081 data_sent_cb (void *cls)
1082 {
1083   struct CadetReliableMessage *crm = cls;
1084   struct CadetChannel *ch = crm->ch;
1085   struct CadetReliableMessage *off;
1086
1087   crm->qe = NULL;
1088   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1089                                ch->tail_sent,
1090                                crm);
1091   if (GNUNET_NO == ch->reliable)
1092   {
1093     GNUNET_free (crm);
1094     ch->pending_messages--;
1095     GCCH_check_allow_client (ch);
1096     return;
1097   }
1098   if (0 == crm->retry_delay.rel_value_us)
1099     crm->retry_delay = ch->expected_delay;
1100   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1101
1102   /* find position for re-insertion into the DLL */
1103   if ( (NULL == ch->head_sent) ||
1104        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1105   {
1106     /* insert at HEAD, also (re)schedule retry task! */
1107     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1108                                  ch->tail_sent,
1109                                  crm);
1110     if (NULL != ch->retry_task)
1111       GNUNET_SCHEDULER_cancel (ch->retry_task);
1112     ch->retry_task = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1113                                                    &retry_transmission,
1114                                                    ch);
1115     return;
1116   }
1117   for (off = ch->head_sent; NULL != off; off = off->next)
1118     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1119       break;
1120   if (NULL == off)
1121   {
1122     /* insert at tail */
1123     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1124                                       ch->tail_sent,
1125                                       crm);
1126   }
1127   else
1128   {
1129     /* insert before off */
1130     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1131                                        ch->tail_sent,
1132                                        off->prev,
1133                                        crm);
1134   }
1135 }
1136
1137
1138 /**
1139  * Handle data given by a client.
1140  *
1141  * Check whether the client is allowed to send in this tunnel, save if
1142  * channel is reliable and send an ACK to the client if there is still
1143  * buffer space in the tunnel.
1144  *
1145  * @param ch Channel.
1146  * @param message payload to transmit.
1147  * @return #GNUNET_OK if everything goes well,
1148  *         #GNUNET_SYSERR in case of an error.
1149  */
1150 int
1151 GCCH_handle_local_data (struct CadetChannel *ch,
1152                         const struct GNUNET_MessageHeader *message)
1153 {
1154   uint16_t payload_size = ntohs (message->size);
1155   struct CadetReliableMessage *crm;
1156
1157   if (GNUNET_NO == ch->client_allowed)
1158   {
1159     GNUNET_break_op (0);
1160     return GNUNET_SYSERR;
1161   }
1162   ch->client_allowed = GNUNET_NO;
1163   ch->pending_messages++;
1164
1165   /* Everything is correct, send the message. */
1166   crm = GNUNET_malloc (sizeof (*crm) + payload_size);
1167   crm->ch = ch;
1168   crm->data_message.header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + payload_size);
1169   crm->data_message.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1170   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1171   crm->data_message.mid = ch->mid_send;
1172   crm->data_message.chid = ch->chid;
1173   GNUNET_memcpy (&crm[1],
1174                  message,
1175                  payload_size);
1176   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1177                                ch->tail_sent,
1178                                crm);
1179   LOG (GNUNET_ERROR_TYPE_DEBUG,
1180        "Sending %u bytes from local client to channel %s\n",
1181        payload_size,
1182        GCCH_2s (ch));
1183   crm->qe = GCT_send (ch->t,
1184                       &crm->data_message.header,
1185                       &data_sent_cb,
1186                       crm);
1187   GCCH_check_allow_client (ch);
1188   return GNUNET_OK;
1189 }
1190
1191
1192 /**
1193  * Try to deliver messages to the local client, if it is ready for more.
1194  *
1195  * @param ch channel to process
1196  */
1197 static void
1198 send_client_buffered_data (struct CadetChannel *ch)
1199 {
1200   struct CadetOutOfOrderMessage *com;
1201
1202   if (GNUNET_NO == ch->client_ready)
1203     return; /* client not ready */
1204   com = ch->head_recv;
1205   if (NULL == com)
1206     return; /* none pending */
1207   if ( (com->mid.mid != ch->mid_recv.mid) &&
1208        (GNUNET_NO == ch->out_of_order) )
1209     return; /* missing next one in-order */
1210
1211   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212               "Passing payload message to client on channel %s\n",
1213               GCCH_2s (ch));
1214
1215   /* all good, pass next message to client */
1216   GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1217                                ch->tail_recv,
1218                                com);
1219   /* FIXME: if unreliable, this is not aggressive
1220      enough, as it would be OK to have lost some! */
1221   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1222   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1223   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
1224                       com->env);
1225   GNUNET_free (com);
1226   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1227        (GNUNET_YES == ch->reliable) )
1228   {
1229     /* The next 15 messages were also already received (0xFF), this
1230        suggests that the sender may be blocked on flow control
1231        urgently waiting for an ACK from us. (As we have an inherent
1232        maximum of 64 bits, and 15 is getting too close for comfort.)
1233        So we should send one now. */
1234     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235                 "Sender on channel %s likely blocked on flow-control, sending ACK now.\n",
1236                 GCCH_2s (ch));
1237     if (GNUNET_YES == ch->reliable)
1238       send_channel_ack (ch);
1239   }
1240
1241   if (NULL != ch->head_recv)
1242     return;
1243   if (GNUNET_NO == ch->destroy)
1244     return;
1245   GCT_send_channel_destroy (ch->t,
1246                             ch->chid);
1247   channel_destroy (ch);
1248 }
1249
1250
1251 /**
1252  * Handle ACK from client on local channel.
1253  *
1254  * @param ch channel to destroy
1255  */
1256 void
1257 GCCH_handle_local_ack (struct CadetChannel *ch)
1258 {
1259   ch->client_ready = GNUNET_YES;
1260   send_client_buffered_data (ch);
1261 }
1262
1263
1264 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1265
1266
1267 /**
1268  * Log channel info.
1269  *
1270  * @param ch Channel.
1271  * @param level Debug level to use.
1272  */
1273 void
1274 GCCH_debug (struct CadetChannel *ch,
1275             enum GNUNET_ErrorType level)
1276 {
1277   int do_log;
1278
1279   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1280                                        "cadet-chn",
1281                                        __FILE__, __FUNCTION__, __LINE__);
1282   if (0 == do_log)
1283     return;
1284
1285   if (NULL == ch)
1286   {
1287     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1288     return;
1289   }
1290   LOG2 (level,
1291         "CHN Channel %s:%X (%p)\n",
1292         GCT_2s (ch->t),
1293         ch->chid,
1294         ch);
1295   if (NULL != ch->owner)
1296   {
1297     LOG2 (level,
1298           "CHN origin %s ready %s local-id: %u\n",
1299           GSC_2s (ch->owner),
1300           ch->client_ready ? "YES" : "NO",
1301           ntohl (ch->lid.channel_of_client));
1302   }
1303   if (NULL != ch->dest)
1304   {
1305     LOG2 (level,
1306           "CHN destination %s ready %s local-id: %u\n",
1307           GSC_2s (ch->dest),
1308           ch->client_ready ? "YES" : "NO",
1309           ntohl (ch->lid.channel_of_client));
1310   }
1311   LOG2 (level,
1312         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1313         ntohl (ch->mid_recv.mid),
1314         (unsigned long long) ch->mid_futures,
1315         ntohl (ch->mid_send.mid));
1316 }
1317
1318
1319
1320 /* end of gnunet-service-cadet-new_channel.c */