fair, global message buffer implemented
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/gnunet-service-cadet-new_channel.c
22  * @brief logical links between CADET clients
23  * @author Bartlomiej Polot
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - Congestion/flow control:
28  *   + calculate current RTT if possible, use that for initial retransmissions
29  *     (NOTE: needs us to learn which connection the tunnel uses for the message!)
30  *   + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
31  *     (and figure out how/where to use this!)
32  *   + figure out flow control without ACKs (unreliable traffic!)
33  * - revisit handling of 'unbuffered' traffic!
34  *   (need to push down through tunnel into connection selection)
35  * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
36  *   reserve more bits in 'options' to allow for buffer size control?
37  */
38 #include "platform.h"
39 #include "gnunet_util_lib.h"
40 #include "cadet.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-cadet-new.h"
43 #include "gnunet-service-cadet-new_channel.h"
44 #include "gnunet-service-cadet-new_connection.h"
45 #include "gnunet-service-cadet-new_tunnels.h"
46 #include "gnunet-service-cadet-new_peer.h"
47 #include "gnunet-service-cadet-new_paths.h"
48
49 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50
51 /**
52  * How long do we initially wait before retransmitting?
53  */
54 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55
56 /**
57  * How long do we wait before dropping state about incoming
58  * connection to closed port?
59  */
60 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
61
62 /**
63  * How long do we wait at least before retransmitting ever?
64  */
65 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
66
67 /**
68  * Maximum message ID into the future we accept for out-of-order messages.
69  * If the message is more than this into the future, we drop it.  This is
70  * important both to detect values that are actually in the past, as well
71  * as to limit adversarially triggerable memory consumption.
72  *
73  * Note that right now we have "max_pending_messages = 4" hard-coded in
74  * the logic below, so a value of 4 would suffice here. But we plan to
75  * allow larger windows in the future...
76  */
77 #define MAX_OUT_OF_ORDER_DISTANCE 1024
78
79
80 /**
81  * All the states a connection can be in.
82  */
83 enum CadetChannelState
84 {
85   /**
86    * Uninitialized status, should never appear in operation.
87    */
88   CADET_CHANNEL_NEW,
89
90   /**
91    * Connection create message sent, waiting for ACK.
92    */
93   CADET_CHANNEL_OPEN_SENT,
94
95   /**
96    * Connection confirmed, ready to carry traffic.
97    */
98   CADET_CHANNEL_READY
99 };
100
101
102 /**
103  * Info needed to retry a message in case it gets lost.
104  * Note that we DO use this structure also for unreliable
105  * messages.
106  */
107 struct CadetReliableMessage
108 {
109   /**
110    * Double linked list, FIFO style
111    */
112   struct CadetReliableMessage *next;
113
114   /**
115    * Double linked list, FIFO style
116    */
117   struct CadetReliableMessage *prev;
118
119   /**
120    * Which channel is this message in?
121    */
122   struct CadetChannel *ch;
123
124   /**
125    * Entry in the tunnels queue for this message, NULL if it has left
126    * the tunnel.  Used to cancel transmission in case we receive an
127    * ACK in time.
128    */
129   struct CadetTunnelQueueEntry *qe;
130
131   /**
132    * Data message we are trying to send.
133    */
134   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
135
136   /**
137    * How soon should we retry if we fail to get an ACK?
138    * Messages in the queue are sorted by this value.
139    */
140   struct GNUNET_TIME_Absolute next_retry;
141
142   /**
143    * How long do we wait for an ACK after transmission?
144    * Use for the back-off calculation.
145    */
146   struct GNUNET_TIME_Relative retry_delay;
147
148   /**
149    * Time when we first successfully transmitted the message
150    * (that is, set @e num_transmissions to 1).
151    */
152   struct GNUNET_TIME_Absolute first_transmission_time;
153
154   /**
155    * Identifier of the connection that this message took when it
156    * was first transmitted.  Only useful if @e num_transmissions is 1.
157    */
158   struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken;
159
160   /**
161    * How often was this message transmitted?  #GNUNET_SYSERR if there
162    * was an error transmitting the message, #GNUNET_NO if it was not
163    * yet transmitted ever, otherwise the number of (re) transmissions.
164    */
165   int num_transmissions;
166
167 };
168
169
170 /**
171  * List of received out-of-order data messages.
172  */
173 struct CadetOutOfOrderMessage
174 {
175   /**
176    * Double linked list, FIFO style
177    */
178   struct CadetOutOfOrderMessage *next;
179
180   /**
181    * Double linked list, FIFO style
182    */
183   struct CadetOutOfOrderMessage *prev;
184
185   /**
186    * ID of the message (messages up to this point needed
187    * before we give this one to the client).
188    */
189   struct ChannelMessageIdentifier mid;
190
191   /**
192    * The envelope with the payload of the out-of-order message
193    */
194   struct GNUNET_MQ_Envelope *env;
195
196 };
197
198
199 /**
200  * Client endpoint of a `struct CadetChannel`.  A channel may be a
201  * loopback channel, in which case it has two of these endpoints.
202  * Note that flow control also is required in both directions.
203  */
204 struct CadetChannelClient
205 {
206   /**
207    * Client handle.  Not by itself sufficient to designate
208    * the client endpoint, as the same client handle may
209    * be used for both the owner and the destination, and
210    * we thus also need the channel ID to identify the client.
211    */
212   struct CadetClient *c;
213
214   /**
215    * Head of DLL of messages received out of order or while client was unready.
216    */
217   struct CadetOutOfOrderMessage *head_recv;
218
219   /**
220    * Tail DLL of messages received out of order or while client was unready.
221    */
222   struct CadetOutOfOrderMessage *tail_recv;
223
224   /**
225    * Local tunnel number for this client.
226    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
227    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
228    */
229   struct GNUNET_CADET_ClientChannelNumber ccn;
230
231   /**
232    * Number of entries currently in @a head_recv DLL.
233    */
234   unsigned int num_recv;
235
236   /**
237    * Can we send data to the client?
238    */
239   int client_ready;
240
241 };
242
243
244 /**
245  * Struct containing all information regarding a channel to a remote client.
246  */
247 struct CadetChannel
248 {
249   /**
250    * Tunnel this channel is in.
251    */
252   struct CadetTunnel *t;
253
254   /**
255    * Client owner of the tunnel, if any.
256    * (Used if this channel represends the initiating end of the tunnel.)
257    */
258   struct CadetChannelClient *owner;
259
260   /**
261    * Client destination of the tunnel, if any.
262    * (Used if this channel represents the listening end of the tunnel.)
263    */
264   struct CadetChannelClient *dest;
265
266   /**
267    * Last entry in the tunnel's queue relating to control messages
268    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
269    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
270    * transmission in case we receive updated information.
271    */
272   struct CadetTunnelQueueEntry *last_control_qe;
273
274   /**
275    * Head of DLL of messages sent and not yet ACK'd.
276    */
277   struct CadetReliableMessage *head_sent;
278
279   /**
280    * Tail of DLL of messages sent and not yet ACK'd.
281    */
282   struct CadetReliableMessage *tail_sent;
283
284   /**
285    * Task to resend/poll in case no ACK is received.
286    */
287   struct GNUNET_SCHEDULER_Task *retry_control_task;
288
289   /**
290    * Task to resend/poll in case no ACK is received.
291    */
292   struct GNUNET_SCHEDULER_Task *retry_data_task;
293
294   /**
295    * Last time the channel was used
296    */
297   struct GNUNET_TIME_Absolute timestamp;
298
299   /**
300    * Destination port of the channel.
301    */
302   struct GNUNET_HashCode port;
303
304   /**
305    * Counter for exponential backoff.
306    */
307   struct GNUNET_TIME_Relative retry_time;
308
309   /**
310    * How long does it usually take to get an ACK.
311    */
312   struct GNUNET_TIME_Relative expected_delay;
313
314   /**
315    * Bitfield of already-received messages past @e mid_recv.
316    */
317   uint64_t mid_futures;
318
319   /**
320    * Next MID expected for incoming traffic.
321    */
322   struct ChannelMessageIdentifier mid_recv;
323
324   /**
325    * Next MID to use for outgoing traffic.
326    */
327   struct ChannelMessageIdentifier mid_send;
328
329   /**
330    * Total (reliable) messages pending ACK for this channel.
331    */
332   unsigned int pending_messages;
333
334   /**
335    * Maximum (reliable) messages pending ACK for this channel
336    * before we throttle the client.
337    */
338   unsigned int max_pending_messages;
339
340   /**
341    * Number identifying this channel in its tunnel.
342    */
343   struct GNUNET_CADET_ChannelTunnelNumber ctn;
344
345   /**
346    * Channel state.
347    */
348   enum CadetChannelState state;
349
350   /**
351    * Count how many ACKs we skipped, used to prevent long
352    * sequences of ACK skipping.
353    */
354   unsigned int skip_ack_series;
355
356   /**
357    * Is the tunnel bufferless (minimum latency)?
358    */
359   int nobuffer;
360
361   /**
362    * Is the tunnel reliable?
363    */
364   int reliable;
365
366   /**
367    * Is the tunnel out-of-order?
368    */
369   int out_of_order;
370
371   /**
372    * Is this channel a loopback channel, where the destination is us again?
373    */
374   int is_loopback;
375
376   /**
377    * Flag to signal the destruction of the channel.  If this is set to
378    * #GNUNET_YES the channel will be destroyed once the queue is
379    * empty.
380    */
381   int destroy;
382
383 };
384
385
386 /**
387  * Get the static string for identification of the channel.
388  *
389  * @param ch Channel.
390  *
391  * @return Static string with the channel IDs.
392  */
393 const char *
394 GCCH_2s (const struct CadetChannel *ch)
395 {
396   static char buf[128];
397
398   GNUNET_snprintf (buf,
399                    sizeof (buf),
400                    "Channel %s:%s ctn:%X(%X/%X)",
401                    (GNUNET_YES == ch->is_loopback)
402                    ? "loopback"
403                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
404                    GNUNET_h2s (&ch->port),
405                    ch->ctn,
406                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
407                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
408   return buf;
409 }
410
411
412 /**
413  * Get the channel's public ID.
414  *
415  * @param ch Channel.
416  *
417  * @return ID used to identify the channel with the remote peer.
418  */
419 struct GNUNET_CADET_ChannelTunnelNumber
420 GCCH_get_id (const struct CadetChannel *ch)
421 {
422   return ch->ctn;
423 }
424
425
426 /**
427  * Release memory associated with @a ccc
428  *
429  * @param ccc data structure to clean up
430  */
431 static void
432 free_channel_client (struct CadetChannelClient *ccc)
433 {
434   struct CadetOutOfOrderMessage *com;
435
436   while (NULL != (com = ccc->head_recv))
437   {
438     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
439                                  ccc->tail_recv,
440                                  com);
441     ccc->num_recv--;
442     GNUNET_MQ_discard (com->env);
443     GNUNET_free (com);
444   }
445   GNUNET_free (ccc);
446 }
447
448
449 /**
450  * Destroy the given channel.
451  *
452  * @param ch channel to destroy
453  */
454 static void
455 channel_destroy (struct CadetChannel *ch)
456 {
457   struct CadetReliableMessage *crm;
458
459   while (NULL != (crm = ch->head_sent))
460   {
461     GNUNET_assert (ch == crm->ch);
462     if (NULL != crm->qe)
463     {
464       GCT_send_cancel (crm->qe);
465       crm->qe = NULL;
466     }
467     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
468                                  ch->tail_sent,
469                                  crm);
470     GNUNET_free (crm->data_message);
471     GNUNET_free (crm);
472   }
473   if (NULL != ch->owner)
474   {
475     free_channel_client (ch->owner);
476     ch->owner = NULL;
477   }
478   if (NULL != ch->dest)
479   {
480     free_channel_client (ch->dest);
481     ch->dest = NULL;
482   }
483   if (NULL != ch->last_control_qe)
484   {
485     GCT_send_cancel (ch->last_control_qe);
486     ch->last_control_qe = NULL;
487   }
488   if (NULL != ch->retry_data_task)
489   {
490     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
491     ch->retry_data_task = NULL;
492   }
493   if (NULL != ch->retry_control_task)
494   {
495     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
496     ch->retry_control_task = NULL;
497   }
498   if (GNUNET_NO == ch->is_loopback)
499   {
500     GCT_remove_channel (ch->t,
501                         ch,
502                         ch->ctn);
503     ch->t = NULL;
504   }
505   GNUNET_free (ch);
506 }
507
508
509 /**
510  * Send a channel create message.
511  *
512  * @param cls Channel for which to send.
513  */
514 static void
515 send_channel_open (void *cls);
516
517
518 /**
519  * Function called once the tunnel confirms that we sent the
520  * create message.  Delays for a bit until we retry.
521  *
522  * @param cls our `struct CadetChannel`.
523  * @param cid identifier of the connection within the tunnel, NULL
524  *            if transmission failed
525  */
526 static void
527 channel_open_sent_cb (void *cls,
528                       const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
529 {
530   struct CadetChannel *ch = cls;
531
532   GNUNET_assert (NULL != ch->last_control_qe);
533   ch->last_control_qe = NULL;
534   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
535   LOG (GNUNET_ERROR_TYPE_DEBUG,
536        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
537        GCCH_2s (ch),
538        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
539                                                GNUNET_YES));
540   ch->retry_control_task
541     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
542                                     &send_channel_open,
543                                     ch);
544 }
545
546
547 /**
548  * Send a channel open message.
549  *
550  * @param cls Channel for which to send.
551  */
552 static void
553 send_channel_open (void *cls)
554 {
555   struct CadetChannel *ch = cls;
556   struct GNUNET_CADET_ChannelOpenMessage msgcc;
557   uint32_t options;
558
559   ch->retry_control_task = NULL;
560   LOG (GNUNET_ERROR_TYPE_DEBUG,
561        "Sending CHANNEL_OPEN message for %s\n",
562        GCCH_2s (ch));
563   options = 0;
564   if (ch->nobuffer)
565     options |= GNUNET_CADET_OPTION_NOBUFFER;
566   if (ch->reliable)
567     options |= GNUNET_CADET_OPTION_RELIABLE;
568   if (ch->out_of_order)
569     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
570   msgcc.header.size = htons (sizeof (msgcc));
571   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
572   msgcc.opt = htonl (options);
573   msgcc.port = ch->port;
574   msgcc.ctn = ch->ctn;
575   ch->state = CADET_CHANNEL_OPEN_SENT;
576   ch->last_control_qe = GCT_send (ch->t,
577                                   &msgcc.header,
578                                   &channel_open_sent_cb,
579                                   ch);
580   GNUNET_assert (NULL == ch->retry_control_task);
581 }
582
583
584 /**
585  * Function called once and only once after a channel was bound
586  * to its tunnel via #GCT_add_channel() is ready for transmission.
587  * Note that this is only the case for channels that this peer
588  * initiates, as for incoming channels we assume that they are
589  * ready for transmission immediately upon receiving the open
590  * message.  Used to bootstrap the #GCT_send() process.
591  *
592  * @param ch the channel for which the tunnel is now ready
593  */
594 void
595 GCCH_tunnel_up (struct CadetChannel *ch)
596 {
597   GNUNET_assert (NULL == ch->retry_control_task);
598   LOG (GNUNET_ERROR_TYPE_DEBUG,
599        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
600        GCCH_2s (ch));
601   ch->retry_control_task
602     = GNUNET_SCHEDULER_add_now (&send_channel_open,
603                                 ch);
604 }
605
606
607 /**
608  * Create a new channel.
609  *
610  * @param owner local client owning the channel
611  * @param ccn local number of this channel at the @a owner
612  * @param destination peer to which we should build the channel
613  * @param port desired port at @a destination
614  * @param options options for the channel
615  * @return handle to the new channel
616  */
617 struct CadetChannel *
618 GCCH_channel_local_new (struct CadetClient *owner,
619                         struct GNUNET_CADET_ClientChannelNumber ccn,
620                         struct CadetPeer *destination,
621                         const struct GNUNET_HashCode *port,
622                         uint32_t options)
623 {
624   struct CadetChannel *ch;
625   struct CadetChannelClient *ccco;
626
627   ccco = GNUNET_new (struct CadetChannelClient);
628   ccco->c = owner;
629   ccco->ccn = ccn;
630   ccco->client_ready = GNUNET_YES;
631
632   ch = GNUNET_new (struct CadetChannel);
633   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
634   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
635   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
636   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
637   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
638   ch->owner = ccco;
639   ch->port = *port;
640   if (0 == memcmp (&my_full_id,
641                    GCP_get_id (destination),
642                    sizeof (struct GNUNET_PeerIdentity)))
643   {
644     struct CadetClient *c;
645
646     ch->is_loopback = GNUNET_YES;
647     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
648                                            port);
649     if (NULL == c)
650     {
651       /* port closed, wait for it to possibly open */
652       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
653                                                 port,
654                                                 ch,
655                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
656       LOG (GNUNET_ERROR_TYPE_DEBUG,
657            "Created loose incoming loopback channel to port %s\n",
658            GNUNET_h2s (&ch->port));
659     }
660     else
661     {
662       ch->dest = GNUNET_new (struct CadetChannelClient);
663       ch->dest->c = c;
664       ch->dest->client_ready = GNUNET_YES;
665       GCCH_bind (ch,
666                  ch->dest->c);
667     }
668   }
669   else
670   {
671     ch->t = GCP_get_tunnel (destination,
672                             GNUNET_YES);
673     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
674     ch->ctn = GCT_add_channel (ch->t,
675                                ch);
676   }
677   GNUNET_STATISTICS_update (stats,
678                             "# channels",
679                             1,
680                             GNUNET_NO);
681   LOG (GNUNET_ERROR_TYPE_DEBUG,
682        "Created channel to port %s at peer %s for %s using %s\n",
683        GNUNET_h2s (port),
684        GCP_2s (destination),
685        GSC_2s (owner),
686        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
687   return ch;
688 }
689
690
691 /**
692  * We had an incoming channel to a port that is closed.
693  * It has not been opened for a while, drop it.
694  *
695  * @param cls the channel to drop
696  */
697 static void
698 timeout_closed_cb (void *cls)
699 {
700   struct CadetChannel *ch = cls;
701
702   ch->retry_control_task = NULL;
703   LOG (GNUNET_ERROR_TYPE_DEBUG,
704        "Closing incoming channel to port %s from peer %s due to timeout\n",
705        GNUNET_h2s (&ch->port),
706        GCP_2s (GCT_get_destination (ch->t)));
707   channel_destroy (ch);
708 }
709
710
711 /**
712  * Create a new channel based on a request coming in over the network.
713  *
714  * @param t tunnel to the remote peer
715  * @param ctn identifier of this channel in the tunnel
716  * @param port desired local port
717  * @param options options for the channel
718  * @return handle to the new channel
719  */
720 struct CadetChannel *
721 GCCH_channel_incoming_new (struct CadetTunnel *t,
722                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
723                            const struct GNUNET_HashCode *port,
724                            uint32_t options)
725 {
726   struct CadetChannel *ch;
727   struct CadetClient *c;
728
729   ch = GNUNET_new (struct CadetChannel);
730   ch->port = *port;
731   ch->t = t;
732   ch->ctn = ctn;
733   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
734   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
735   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
736   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
737   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
738   GNUNET_STATISTICS_update (stats,
739                             "# channels",
740                             1,
741                             GNUNET_NO);
742
743   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
744                                          port);
745   if (NULL == c)
746   {
747     /* port closed, wait for it to possibly open */
748     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
749                                               port,
750                                               ch,
751                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
752     ch->retry_control_task
753       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
754                                       &timeout_closed_cb,
755                                       ch);
756     LOG (GNUNET_ERROR_TYPE_DEBUG,
757          "Created loose incoming channel to port %s from peer %s\n",
758          GNUNET_h2s (&ch->port),
759          GCP_2s (GCT_get_destination (ch->t)));
760   }
761   else
762   {
763     GCCH_bind (ch,
764                c);
765   }
766   GNUNET_STATISTICS_update (stats,
767                             "# channels",
768                             1,
769                             GNUNET_NO);
770   return ch;
771 }
772
773
774 /**
775  * Function called once the tunnel confirms that we sent the
776  * ACK message.  Just remembers it was sent, we do not expect
777  * ACKs for ACKs ;-).
778  *
779  * @param cls our `struct CadetChannel`.
780  * @param cid identifier of the connection within the tunnel, NULL
781  *            if transmission failed
782  */
783 static void
784 send_ack_cb (void *cls,
785              const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
786 {
787   struct CadetChannel *ch = cls;
788
789   GNUNET_assert (NULL != ch->last_control_qe);
790   ch->last_control_qe = NULL;
791 }
792
793
794 /**
795  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
796  *
797  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
798  */
799 static void
800 send_channel_data_ack (struct CadetChannel *ch)
801 {
802   struct GNUNET_CADET_ChannelDataAckMessage msg;
803
804   if (GNUNET_NO == ch->reliable)
805     return; /* no ACKs */
806   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
807   msg.header.size = htons (sizeof (msg));
808   msg.ctn = ch->ctn;
809   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
810   msg.futures = GNUNET_htonll (ch->mid_futures);
811   if (NULL != ch->last_control_qe)
812     GCT_send_cancel (ch->last_control_qe);
813   LOG (GNUNET_ERROR_TYPE_DEBUG,
814        "Sending DATA_ACK %u:%llX via %s\n",
815        (unsigned int) ntohl (msg.mid.mid),
816        (unsigned long long) ch->mid_futures,
817        GCCH_2s (ch));
818   ch->last_control_qe = GCT_send (ch->t,
819                                   &msg.header,
820                                   &send_ack_cb,
821                                   ch);
822 }
823
824
825 /**
826  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
827  * connection is up.
828  *
829  * @param cls the `struct CadetChannel`
830  */
831 static void
832 send_open_ack (void *cls)
833 {
834   struct CadetChannel *ch = cls;
835   struct GNUNET_CADET_ChannelManageMessage msg;
836
837   LOG (GNUNET_ERROR_TYPE_DEBUG,
838        "Sending CHANNEL_OPEN_ACK on %s\n",
839        GCCH_2s (ch));
840   ch->retry_control_task = NULL;
841   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
842   msg.header.size = htons (sizeof (msg));
843   msg.reserved = htonl (0);
844   msg.ctn = ch->ctn;
845   if (NULL != ch->last_control_qe)
846     GCT_send_cancel (ch->last_control_qe);
847   ch->last_control_qe = GCT_send (ch->t,
848                                   &msg.header,
849                                   &send_ack_cb,
850                                   ch);
851 }
852
853
854 /**
855  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
856  * this channel.  If the binding was successful, (re)transmit the
857  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
858  *
859  * @param ch channel that got the duplicate open
860  * @param cti identifier of the connection that delivered the message
861  */
862 void
863 GCCH_handle_duplicate_open (struct CadetChannel *ch,
864                             const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
865 {
866   if (NULL == ch->dest)
867   {
868     LOG (GNUNET_ERROR_TYPE_DEBUG,
869          "Ignoring duplicate channel OPEN on %s: port is closed\n",
870          GCCH_2s (ch));
871     return;
872   }
873   if (NULL != ch->retry_control_task)
874   {
875     LOG (GNUNET_ERROR_TYPE_DEBUG,
876          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
877          GCCH_2s (ch));
878     return;
879   }
880   LOG (GNUNET_ERROR_TYPE_DEBUG,
881        "Retransmitting OPEN_ACK on %s\n",
882        GCCH_2s (ch));
883   ch->retry_control_task
884     = GNUNET_SCHEDULER_add_now (&send_open_ack,
885                                 ch);
886 }
887
888
889 /**
890  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
891  *
892  * @param ch channel the ack is for
893  * @param to_owner #GNUNET_YES to send to owner,
894  *                 #GNUNET_NO to send to dest
895  */
896 static void
897 send_ack_to_client (struct CadetChannel *ch,
898                     int to_owner)
899 {
900   struct GNUNET_MQ_Envelope *env;
901   struct GNUNET_CADET_LocalAck *ack;
902   struct CadetChannelClient *ccc;
903
904   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
905   if (NULL == ccc)
906   {
907     /* This can happen if we are just getting ACKs after
908        our local client already disconnected. */
909     GNUNET_assert (GNUNET_YES == ch->destroy);
910     return;
911   }
912   env = GNUNET_MQ_msg (ack,
913                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
914   ack->ccn = ccc->ccn;
915   LOG (GNUNET_ERROR_TYPE_DEBUG,
916        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
917        GSC_2s (ccc->c),
918        (GNUNET_YES == to_owner) ? "owner" : "dest",
919        ntohl (ack->ccn.channel_of_client),
920        ch->pending_messages,
921        ch->max_pending_messages);
922   GSC_send_to_client (ccc->c,
923                       env);
924 }
925
926
927 /**
928  * A client is bound to the port that we have a channel
929  * open to.  Send the acknowledgement for the connection
930  * request and establish the link with the client.
931  *
932  * @param ch open incoming channel
933  * @param c client listening on the respective port
934  */
935 void
936 GCCH_bind (struct CadetChannel *ch,
937            struct CadetClient *c)
938 {
939   uint32_t options;
940   struct CadetChannelClient *cccd;
941
942   LOG (GNUNET_ERROR_TYPE_DEBUG,
943        "Binding %s from %s to port %s of %s\n",
944        GCCH_2s (ch),
945        GCT_2s (ch->t),
946        GNUNET_h2s (&ch->port),
947        GSC_2s (c));
948   if (NULL != ch->retry_control_task)
949   {
950     /* there might be a timeout task here */
951     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
952     ch->retry_control_task = NULL;
953   }
954   options = 0;
955   if (ch->nobuffer)
956     options |= GNUNET_CADET_OPTION_NOBUFFER;
957   if (ch->reliable)
958     options |= GNUNET_CADET_OPTION_RELIABLE;
959   if (ch->out_of_order)
960     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
961   cccd = GNUNET_new (struct CadetChannelClient);
962   ch->dest = cccd;
963   cccd->c = c;
964   cccd->client_ready = GNUNET_YES;
965   cccd->ccn = GSC_bind (c,
966                         ch,
967                         (GNUNET_YES == ch->is_loopback)
968                         ? GCP_get (&my_full_id,
969                                    GNUNET_YES)
970                         : GCT_get_destination (ch->t),
971                         &ch->port,
972                         options);
973   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
974                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
975   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
976   if (GNUNET_YES == ch->is_loopback)
977   {
978     ch->state = CADET_CHANNEL_OPEN_SENT;
979     GCCH_handle_channel_open_ack (ch,
980                                   NULL);
981   }
982   else
983   {
984     /* notify other peer that we accepted the connection */
985     ch->retry_control_task
986       = GNUNET_SCHEDULER_add_now (&send_open_ack,
987                                   ch);
988   }
989   /* give client it's initial supply of ACKs */
990   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
991                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
992   for (unsigned int i=0;i<ch->max_pending_messages;i++)
993     send_ack_to_client (ch,
994                         GNUNET_NO);
995 }
996
997
998 /**
999  * Destroy locally created channel.  Called by the local client, so no
1000  * need to tell the client.
1001  *
1002  * @param ch channel to destroy
1003  * @param c client that caused the destruction
1004  * @param ccn client number of the client @a c
1005  */
1006 void
1007 GCCH_channel_local_destroy (struct CadetChannel *ch,
1008                             struct CadetClient *c,
1009                             struct GNUNET_CADET_ClientChannelNumber ccn)
1010 {
1011   LOG (GNUNET_ERROR_TYPE_DEBUG,
1012        "%s asks for destruction of %s\n",
1013        GSC_2s (c),
1014        GCCH_2s (ch));
1015   GNUNET_assert (NULL != c);
1016   if ( (NULL != ch->owner) &&
1017        (c == ch->owner->c) &&
1018        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
1019   {
1020     free_channel_client (ch->owner);
1021     ch->owner = NULL;
1022   }
1023   else if ( (NULL != ch->dest) &&
1024             (c == ch->dest->c) &&
1025             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
1026   {
1027     free_channel_client (ch->dest);
1028     ch->dest = NULL;
1029   }
1030   else
1031   {
1032     GNUNET_assert (0);
1033   }
1034
1035   if (GNUNET_YES == ch->destroy)
1036   {
1037     /* other end already destroyed, with the local client gone, no need
1038        to finish transmissions, just destroy immediately. */
1039     channel_destroy (ch);
1040     return;
1041   }
1042   if ( (NULL != ch->head_sent) ||
1043        (NULL != ch->owner) ||
1044        (NULL != ch->dest) )
1045   {
1046     /* Wait for other end to destroy us as well,
1047        and otherwise allow send queue to be transmitted first */
1048     ch->destroy = GNUNET_YES;
1049     return;
1050   }
1051   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1052   if (CADET_CHANNEL_NEW != ch->state)
1053     GCT_send_channel_destroy (ch->t,
1054                               ch->ctn);
1055   /* Nothing left to do, just finish destruction */
1056   channel_destroy (ch);
1057 }
1058
1059
1060 /**
1061  * We got an acknowledgement for the creation of the channel
1062  * (the port is open on the other side). Begin transmissions.
1063  *
1064  * @param ch channel to destroy
1065  * @param cti identifier of the connection that delivered the message
1066  */
1067 void
1068 GCCH_handle_channel_open_ack (struct CadetChannel *ch,
1069                               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1070 {
1071   switch (ch->state)
1072   {
1073   case CADET_CHANNEL_NEW:
1074     /* this should be impossible */
1075     GNUNET_break (0);
1076     break;
1077   case CADET_CHANNEL_OPEN_SENT:
1078     if (NULL == ch->owner)
1079     {
1080       /* We're not the owner, wrong direction! */
1081       GNUNET_break_op (0);
1082       return;
1083     }
1084     LOG (GNUNET_ERROR_TYPE_DEBUG,
1085          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1086          GCCH_2s (ch));
1087     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1088     {
1089       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1090       ch->retry_control_task = NULL;
1091     }
1092     ch->state = CADET_CHANNEL_READY;
1093     /* On first connect, send client as many ACKs as we allow messages
1094        to be buffered! */
1095     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1096       send_ack_to_client (ch,
1097                           GNUNET_YES);
1098     break;
1099   case CADET_CHANNEL_READY:
1100     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1101     LOG (GNUNET_ERROR_TYPE_DEBUG,
1102          "Received duplicate channel OPEN_ACK for %s\n",
1103          GCCH_2s (ch));
1104     GNUNET_STATISTICS_update (stats,
1105                               "# duplicate CREATE_ACKs",
1106                               1,
1107                               GNUNET_NO);
1108     break;
1109   }
1110 }
1111
1112
1113 /**
1114  * Test if element @a e1 comes before element @a e2.
1115  *
1116  * @param cls closure, to a flag where we indicate duplicate packets
1117  * @param m1 a message of to sort
1118  * @param m2 another message to sort
1119  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1120  */
1121 static int
1122 is_before (void *cls,
1123            struct CadetOutOfOrderMessage *m1,
1124            struct CadetOutOfOrderMessage *m2)
1125 {
1126   int *duplicate = cls;
1127   uint32_t v1 = ntohl (m1->mid.mid);
1128   uint32_t v2 = ntohl (m2->mid.mid);
1129   uint32_t delta;
1130
1131   delta = v2 - v1;
1132   if (0 == delta)
1133     *duplicate = GNUNET_YES;
1134   if (delta > (uint32_t) INT_MAX)
1135   {
1136     /* in overflow range, we can safely assume we wrapped around */
1137     return GNUNET_NO;
1138   }
1139   else
1140   {
1141     /* result is small, thus v2 > v1, thus m1 < m2 */
1142     return GNUNET_YES;
1143   }
1144 }
1145
1146
1147 /**
1148  * We got payload data for a channel.  Pass it on to the client
1149  * and send an ACK to the other end (once flow control allows it!)
1150  *
1151  * @param ch channel that got data
1152  * @param cti identifier of the connection that delivered the message
1153  * @param msg message that was received
1154  */
1155 void
1156 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1157                                     const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1158                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1159 {
1160   struct GNUNET_MQ_Envelope *env;
1161   struct GNUNET_CADET_LocalData *ld;
1162   struct CadetChannelClient *ccc;
1163   size_t payload_size;
1164   struct CadetOutOfOrderMessage *com;
1165   int duplicate;
1166   uint32_t mid_min;
1167   uint32_t mid_max;
1168   uint32_t mid_msg;
1169   uint32_t delta;
1170
1171   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1172   if ( (GNUNET_YES == ch->destroy) &&
1173        (NULL == ch->owner) &&
1174        (NULL == ch->dest) )
1175   {
1176     /* This client is gone, but we still have messages to send to
1177        the other end (which is why @a ch is not yet dead).  However,
1178        we cannot pass messages to our client anymore. */
1179     LOG (GNUNET_ERROR_TYPE_DEBUG,
1180          "Dropping incoming payload on %s as this end is already closed\n",
1181          GCCH_2s (ch));
1182     /* send back DESTROY notification to stop further retransmissions! */
1183     GCT_send_channel_destroy (ch->t,
1184                               ch->ctn);
1185     return;
1186   }
1187   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1188   env = GNUNET_MQ_msg_extra (ld,
1189                              payload_size,
1190                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1191   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1192   GNUNET_memcpy (&ld[1],
1193                  &msg[1],
1194                  payload_size);
1195   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1196   if ( (GNUNET_YES == ccc->client_ready) &&
1197        ( (GNUNET_YES == ch->out_of_order) ||
1198          (msg->mid.mid == ch->mid_recv.mid) ) )
1199   {
1200     LOG (GNUNET_ERROR_TYPE_DEBUG,
1201          "Giving %u bytes of payload with MID %u from %s to client %s\n",
1202          (unsigned int) payload_size,
1203          ntohl (msg->mid.mid),
1204          GCCH_2s (ch),
1205          GSC_2s (ccc->c));
1206     ccc->client_ready = GNUNET_NO;
1207     GSC_send_to_client (ccc->c,
1208                         env);
1209     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1210     ch->mid_futures >>= 1;
1211     send_channel_data_ack (ch);
1212     return;
1213   }
1214
1215   if (GNUNET_YES == ch->reliable)
1216   {
1217     /* check if message ought to be dropped because it is ancient/too distant/duplicate */
1218     mid_min = ntohl (ch->mid_recv.mid);
1219     mid_max = mid_min + ch->max_pending_messages;
1220     mid_msg = ntohl (msg->mid.mid);
1221     if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
1222          ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
1223     {
1224       LOG (GNUNET_ERROR_TYPE_DEBUG,
1225            "%s at %u drops ancient or far-future message %u\n",
1226            GCCH_2s (ch),
1227            (unsigned int) mid_min,
1228            ntohl (msg->mid.mid));
1229
1230       GNUNET_STATISTICS_update (stats,
1231                                 "# duplicate DATA (ancient or future)",
1232                                 1,
1233                                 GNUNET_NO);
1234       GNUNET_MQ_discard (env);
1235       send_channel_data_ack (ch);
1236       return;
1237     }
1238     /* mark bit for future ACKs */
1239     delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
1240     if (delta < 64)
1241     {
1242       if (0 != (ch->mid_futures & (1LLU << delta)))
1243       {
1244         /* Duplicate within the queue, drop also */
1245         LOG (GNUNET_ERROR_TYPE_DEBUG,
1246              "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1247              (unsigned int) payload_size,
1248              GCCH_2s (ch),
1249              ntohl (msg->mid.mid));
1250         GNUNET_STATISTICS_update (stats,
1251                                   "# duplicate DATA",
1252                                   1,
1253                                   GNUNET_NO);
1254         GNUNET_MQ_discard (env);
1255         send_channel_data_ack (ch);
1256         return;
1257       }
1258       ch->mid_futures |= (1LLU << delta);
1259       LOG (GNUNET_ERROR_TYPE_DEBUG,
1260            "Marked bit %llX for mid %u (base: %u); now: %llX\n",
1261            (1LLU << delta),
1262            mid_msg,
1263            mid_min,
1264            ch->mid_futures);
1265     }
1266   }
1267   else /* ! ch->reliable */
1268   {
1269     /* Channel is unreliable, so we do not ACK. But we also cannot
1270        allow buffering everything, so check if we have space... */
1271     if (ccc->num_recv >= ch->max_pending_messages)
1272     {
1273       struct CadetOutOfOrderMessage *drop;
1274
1275       /* Yep, need to drop. Drop the oldest message in
1276          the buffer. */
1277       LOG (GNUNET_ERROR_TYPE_DEBUG,
1278            "Queue full due slow client on %s, dropping oldest message\n",
1279            GCCH_2s (ch));
1280       GNUNET_STATISTICS_update (stats,
1281                                 "# messages dropped due to slow client",
1282                                 1,
1283                                 GNUNET_NO);
1284       drop = ccc->head_recv;
1285       GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1286                                    ccc->tail_recv,
1287                                    drop);
1288       ccc->num_recv--;
1289       GNUNET_MQ_discard (drop->env);
1290       GNUNET_free (drop);
1291     }
1292   }
1293
1294   /* Insert message into sorted out-of-order queue */
1295   com = GNUNET_new (struct CadetOutOfOrderMessage);
1296   com->mid = msg->mid;
1297   com->env = env;
1298   duplicate = GNUNET_NO;
1299   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1300                                       is_before,
1301                                       &duplicate,
1302                                       ccc->head_recv,
1303                                       ccc->tail_recv,
1304                                       com);
1305   ccc->num_recv++;
1306   if (GNUNET_YES == duplicate)
1307   {
1308     /* Duplicate within the queue, drop also (this is not covered by
1309        the case above if "delta" >= 64, which could be the case if
1310        max_pending_messages is also >= 64 or if our client is unready
1311        and we are seeing retransmissions of the message our client is
1312        blocked on. */
1313     LOG (GNUNET_ERROR_TYPE_DEBUG,
1314          "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1315          (unsigned int) payload_size,
1316          GCCH_2s (ch),
1317          ntohl (msg->mid.mid));
1318     GNUNET_STATISTICS_update (stats,
1319                               "# duplicate DATA",
1320                               1,
1321                               GNUNET_NO);
1322     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1323                                  ccc->tail_recv,
1324                                  com);
1325     ccc->num_recv--;
1326     GNUNET_MQ_discard (com->env);
1327     GNUNET_free (com);
1328     send_channel_data_ack (ch);
1329     return;
1330   }
1331   LOG (GNUNET_ERROR_TYPE_DEBUG,
1332        "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1333        (GNUNET_YES == ccc->client_ready)
1334        ? "out-of-order"
1335        : "client-not-ready",
1336        (unsigned int) payload_size,
1337        GCCH_2s (ch),
1338        ntohl (ccc->ccn.channel_of_client),
1339        ccc,
1340        ntohl (msg->mid.mid),
1341        ntohl (ch->mid_recv.mid));
1342   /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
1343      the sender may already be transmitting the previous one.  Needs
1344      experimental evaluation to see if/when this ACK helps or
1345      hurts. (We might even want another option.) */
1346   send_channel_data_ack (ch);
1347 }
1348
1349
1350 /**
1351  * Function called once the tunnel has sent one of our messages.
1352  * If the message is unreliable, simply frees the `crm`. If the
1353  * message was reliable, calculate retransmission time and
1354  * wait for ACK (or retransmit).
1355  *
1356  * @param cls the `struct CadetReliableMessage` that was sent
1357  * @param cid identifier of the connection within the tunnel, NULL
1358  *            if transmission failed
1359  */
1360 static void
1361 data_sent_cb (void *cls,
1362               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid);
1363
1364
1365 /**
1366  * We need to retry a transmission, the last one took too long to
1367  * be acknowledged.
1368  *
1369  * @param cls the `struct CadetChannel` where we need to retransmit
1370  */
1371 static void
1372 retry_transmission (void *cls)
1373 {
1374   struct CadetChannel *ch = cls;
1375   struct CadetReliableMessage *crm = ch->head_sent;
1376
1377   ch->retry_data_task = NULL;
1378   GNUNET_assert (NULL == crm->qe);
1379   LOG (GNUNET_ERROR_TYPE_DEBUG,
1380        "Retrying transmission on %s of message %u\n",
1381        GCCH_2s (ch),
1382        (unsigned int) ntohl (crm->data_message->mid.mid));
1383   crm->qe = GCT_send (ch->t,
1384                       &crm->data_message->header,
1385                       &data_sent_cb,
1386                       crm);
1387   GNUNET_assert (NULL == ch->retry_data_task);
1388 }
1389
1390
1391 /**
1392  * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1393  * the queue and tell our client that it can send more.
1394  *
1395  * @param ch the channel that got the PLAINTEXT_DATA_ACK
1396  * @param cti identifier of the connection that delivered the message
1397  * @param crm the message that got acknowledged
1398  */
1399 static void
1400 handle_matching_ack (struct CadetChannel *ch,
1401                      const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1402                      struct CadetReliableMessage *crm)
1403 {
1404   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1405                                ch->tail_sent,
1406                                crm);
1407   ch->pending_messages--;
1408   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1409   LOG (GNUNET_ERROR_TYPE_DEBUG,
1410        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1411        GCCH_2s (ch),
1412        (unsigned int) ntohl (crm->data_message->mid.mid),
1413        ch->pending_messages);
1414   if (NULL != crm->qe)
1415   {
1416     GCT_send_cancel (crm->qe);
1417     crm->qe = NULL;
1418   }
1419   if ( (1 == crm->num_transmissions) &&
1420        (NULL != cti) )
1421   {
1422     GCC_ack_observed (cti);
1423     if (0 == memcmp (cti,
1424                      &crm->connection_taken,
1425                      sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier)))
1426     {
1427       GCC_latency_observed (cti,
1428                             GNUNET_TIME_absolute_get_duration (crm->first_transmission_time));
1429     }
1430   }
1431   GNUNET_free (crm->data_message);
1432   GNUNET_free (crm);
1433   send_ack_to_client (ch,
1434                       (NULL == ch->owner)
1435                       ? GNUNET_NO
1436                       : GNUNET_YES);
1437 }
1438
1439
1440 /**
1441  * We got an acknowledgement for payload data for a channel.
1442  * Possibly resume transmissions.
1443  *
1444  * @param ch channel that got the ack
1445  * @param cti identifier of the connection that delivered the message
1446  * @param ack details about what was received
1447  */
1448 void
1449 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1450                                         const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1451                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1452 {
1453   struct CadetReliableMessage *crm;
1454   struct CadetReliableMessage *crmn;
1455   int found;
1456   uint32_t mid_base;
1457   uint64_t mid_mask;
1458   unsigned int delta;
1459
1460   GNUNET_break (GNUNET_NO == ch->is_loopback);
1461   if (GNUNET_NO == ch->reliable)
1462   {
1463     /* not expecting ACKs on unreliable channel, odd */
1464     GNUNET_break_op (0);
1465     return;
1466   }
1467   /* mid_base is the MID of the next message that the
1468      other peer expects (i.e. that is missing!), everything
1469      LOWER (but excluding mid_base itself) was received. */
1470   mid_base = ntohl (ack->mid.mid);
1471   mid_mask = GNUNET_htonll (ack->futures);
1472   found = GNUNET_NO;
1473   for (crm = ch->head_sent;
1474         NULL != crm;
1475        crm = crmn)
1476   {
1477     crmn = crm->next;
1478     delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
1479     if (delta >= UINT_MAX - ch->max_pending_messages)
1480     {
1481       /* overflow, means crm was a bit in the past, so this ACK counts for it. */
1482       LOG (GNUNET_ERROR_TYPE_DEBUG,
1483            "Got DATA_ACK with base %u satisfying past message %u on %s\n",
1484            (unsigned int) mid_base,
1485            ntohl (crm->data_message->mid.mid),
1486            GCCH_2s (ch));
1487       handle_matching_ack (ch,
1488                            cti,
1489                            crm);
1490       found = GNUNET_YES;
1491       continue;
1492     }
1493     delta--;
1494     if (delta >= 64)
1495       continue;
1496     LOG (GNUNET_ERROR_TYPE_DEBUG,
1497          "Testing bit %llX for mid %u (base: %u)\n",
1498          (1LLU << delta),
1499          ntohl (crm->data_message->mid.mid),
1500          mid_base);
1501     if (0 != (mid_mask & (1LLU << delta)))
1502     {
1503       LOG (GNUNET_ERROR_TYPE_DEBUG,
1504            "Got DATA_ACK with mask for %u on %s\n",
1505            ntohl (crm->data_message->mid.mid),
1506            GCCH_2s (ch));
1507       handle_matching_ack (ch,
1508                            cti,
1509                            crm);
1510       found = GNUNET_YES;
1511     }
1512   }
1513   if (GNUNET_NO == found)
1514   {
1515     /* ACK for message we already dropped, might have been a
1516        duplicate ACK? Ignore. */
1517     LOG (GNUNET_ERROR_TYPE_DEBUG,
1518          "Duplicate DATA_ACK on %s, ignoring\n",
1519          GCCH_2s (ch));
1520     GNUNET_STATISTICS_update (stats,
1521                               "# duplicate DATA_ACKs",
1522                               1,
1523                               GNUNET_NO);
1524     return;
1525   }
1526   if (NULL != ch->retry_data_task)
1527   {
1528     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1529     ch->retry_data_task = NULL;
1530   }
1531   if ( (NULL != ch->head_sent) &&
1532        (NULL == ch->head_sent->qe) )
1533     ch->retry_data_task
1534       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1535                                  &retry_transmission,
1536                                  ch);
1537 }
1538
1539
1540 /**
1541  * Destroy channel, based on the other peer closing the
1542  * connection.  Also needs to remove this channel from
1543  * the tunnel.
1544  *
1545  * @param ch channel to destroy
1546  * @param cti identifier of the connection that delivered the message,
1547  *            NULL if we are simulating receiving a destroy due to shutdown
1548  */
1549 void
1550 GCCH_handle_remote_destroy (struct CadetChannel *ch,
1551                             const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1552 {
1553   struct CadetChannelClient *ccc;
1554
1555   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1556   LOG (GNUNET_ERROR_TYPE_DEBUG,
1557        "Received remote channel DESTROY for %s\n",
1558        GCCH_2s (ch));
1559   if (GNUNET_YES == ch->destroy)
1560   {
1561     /* Local client already gone, this is instant-death. */
1562     channel_destroy (ch);
1563     return;
1564   }
1565   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1566   if (NULL != ccc->head_recv)
1567   {
1568     LOG (GNUNET_ERROR_TYPE_WARNING,
1569          "Lost end of transmission due to remote shutdown on %s\n",
1570          GCCH_2s (ch));
1571     /* FIXME: change API to notify client about truncated transmission! */
1572   }
1573   ch->destroy = GNUNET_YES;
1574   GSC_handle_remote_channel_destroy (ccc->c,
1575                                      ccc->ccn,
1576                                      ch);
1577   channel_destroy (ch);
1578 }
1579
1580
1581 /**
1582  * Test if element @a e1 comes before element @a e2.
1583  *
1584  * @param cls closure, to a flag where we indicate duplicate packets
1585  * @param crm1 an element of to sort
1586  * @param crm2 another element to sort
1587  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1588  */
1589 static int
1590 cmp_crm_by_next_retry (void *cls,
1591                        struct CadetReliableMessage *crm1,
1592                        struct CadetReliableMessage *crm2)
1593 {
1594   if (crm1->next_retry.abs_value_us <
1595       crm2->next_retry.abs_value_us)
1596     return GNUNET_YES;
1597   return GNUNET_NO;
1598 }
1599
1600
1601 /**
1602  * Function called once the tunnel has sent one of our messages.
1603  * If the message is unreliable, simply frees the `crm`. If the
1604  * message was reliable, calculate retransmission time and
1605  * wait for ACK (or retransmit).
1606  *
1607  * @param cls the `struct CadetReliableMessage` that was sent
1608  * @param cid identifier of the connection within the tunnel, NULL
1609  *            if transmission failed
1610  */
1611 static void
1612 data_sent_cb (void *cls,
1613               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
1614 {
1615   struct CadetReliableMessage *crm = cls;
1616   struct CadetChannel *ch = crm->ch;
1617
1618   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1619   GNUNET_assert (NULL != crm->qe);
1620   crm->qe = NULL;
1621   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1622                                ch->tail_sent,
1623                                crm);
1624   if (GNUNET_NO == ch->reliable)
1625   {
1626     GNUNET_free (crm->data_message);
1627     GNUNET_free (crm);
1628     ch->pending_messages--;
1629     send_ack_to_client (ch,
1630                         (NULL == ch->owner)
1631                         ? GNUNET_NO
1632                         : GNUNET_YES);
1633     return;
1634   }
1635   if (NULL == cid)
1636   {
1637     /* There was an error sending. */
1638     crm->num_transmissions = GNUNET_SYSERR;
1639   }
1640   else if (GNUNET_SYSERR != crm->num_transmissions)
1641   {
1642     /* Increment transmission counter, and possibly store @a cid
1643        if this was the first transmission. */
1644     crm->num_transmissions++;
1645     if (1 == crm->num_transmissions)
1646     {
1647       crm->first_transmission_time = GNUNET_TIME_absolute_get ();
1648       crm->connection_taken = *cid;
1649       GCC_ack_expected (cid);
1650     }
1651   }
1652   if (0 == crm->retry_delay.rel_value_us)
1653     crm->retry_delay = ch->expected_delay;
1654   else
1655     crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1656   crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1657                                                MIN_RTT_DELAY);
1658   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1659
1660   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1661                                       cmp_crm_by_next_retry,
1662                                       NULL,
1663                                       ch->head_sent,
1664                                       ch->tail_sent,
1665                                       crm);
1666   LOG (GNUNET_ERROR_TYPE_DEBUG,
1667        "Message %u sent, next transmission on %s in %s\n",
1668        (unsigned int) ntohl (crm->data_message->mid.mid),
1669        GCCH_2s (ch),
1670        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1671                                                GNUNET_YES));
1672   if (NULL == ch->head_sent->qe)
1673   {
1674     if (NULL != ch->retry_data_task)
1675       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1676     ch->retry_data_task
1677       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1678                                  &retry_transmission,
1679                                  ch);
1680   }
1681 }
1682
1683
1684 /**
1685  * Handle data given by a client.
1686  *
1687  * Check whether the client is allowed to send in this tunnel, save if
1688  * channel is reliable and send an ACK to the client if there is still
1689  * buffer space in the tunnel.
1690  *
1691  * @param ch Channel.
1692  * @param sender_ccn ccn of the sender
1693  * @param buf payload to transmit.
1694  * @param buf_len number of bytes in @a buf
1695  * @return #GNUNET_OK if everything goes well,
1696  *         #GNUNET_SYSERR in case of an error.
1697  */
1698 int
1699 GCCH_handle_local_data (struct CadetChannel *ch,
1700                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1701                         const char *buf,
1702                         size_t buf_len)
1703 {
1704   struct CadetReliableMessage *crm;
1705
1706   if (ch->pending_messages > ch->max_pending_messages)
1707   {
1708     GNUNET_break (0);
1709     return GNUNET_SYSERR;
1710   }
1711   ch->pending_messages++;
1712
1713   if (GNUNET_YES == ch->is_loopback)
1714   {
1715     struct CadetChannelClient *receiver;
1716     struct GNUNET_MQ_Envelope *env;
1717     struct GNUNET_CADET_LocalData *ld;
1718     int to_owner;
1719
1720     env = GNUNET_MQ_msg_extra (ld,
1721                                buf_len,
1722                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1723     if (sender_ccn.channel_of_client ==
1724         ch->owner->ccn.channel_of_client)
1725     {
1726       receiver = ch->dest;
1727       to_owner = GNUNET_NO;
1728     }
1729     else
1730     {
1731       GNUNET_assert (sender_ccn.channel_of_client ==
1732                      ch->dest->ccn.channel_of_client);
1733       receiver = ch->owner;
1734       to_owner = GNUNET_YES;
1735     }
1736     ld->ccn = receiver->ccn;
1737     GNUNET_memcpy (&ld[1],
1738                    buf,
1739                    buf_len);
1740     if (GNUNET_YES == receiver->client_ready)
1741     {
1742       GSC_send_to_client (receiver->c,
1743                           env);
1744       send_ack_to_client (ch,
1745                           to_owner);
1746     }
1747     else
1748     {
1749       struct CadetOutOfOrderMessage *oom;
1750
1751       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1752       oom->env = env;
1753       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1754                                         receiver->tail_recv,
1755                                         oom);
1756       receiver->num_recv++;
1757     }
1758     return GNUNET_OK;
1759   }
1760
1761   /* Everything is correct, send the message. */
1762   crm = GNUNET_malloc (sizeof (*crm));
1763   crm->ch = ch;
1764   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1765                                      + buf_len);
1766   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1767   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1768   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1769   crm->data_message->mid = ch->mid_send;
1770   crm->data_message->ctn = ch->ctn;
1771   GNUNET_memcpy (&crm->data_message[1],
1772                  buf,
1773                  buf_len);
1774   GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1775                                     ch->tail_sent,
1776                                     crm);
1777   LOG (GNUNET_ERROR_TYPE_DEBUG,
1778        "Sending message %u from local client to %s with %u bytes\n",
1779        ntohl (crm->data_message->mid.mid),
1780        GCCH_2s (ch),
1781        buf_len);
1782   if (NULL != ch->retry_data_task)
1783   {
1784     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1785     ch->retry_data_task = NULL;
1786   }
1787   crm->qe = GCT_send (ch->t,
1788                       &crm->data_message->header,
1789                       &data_sent_cb,
1790                       crm);
1791   GNUNET_assert (NULL == ch->retry_data_task);
1792   return GNUNET_OK;
1793 }
1794
1795
1796 /**
1797  * Handle ACK from client on local channel.  Means the client is ready
1798  * for more data, see if we have any for it.
1799  *
1800  * @param ch channel to destroy
1801  * @param client_ccn ccn of the client sending the ack
1802  */
1803 void
1804 GCCH_handle_local_ack (struct CadetChannel *ch,
1805                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1806 {
1807   struct CadetChannelClient *ccc;
1808   struct CadetOutOfOrderMessage *com;
1809
1810   if ( (NULL != ch->owner) &&
1811        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1812     ccc = ch->owner;
1813   else if ( (NULL != ch->dest) &&
1814             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1815     ccc = ch->dest;
1816   else
1817     GNUNET_assert (0);
1818   ccc->client_ready = GNUNET_YES;
1819   com = ccc->head_recv;
1820   if (NULL == com)
1821   {
1822     LOG (GNUNET_ERROR_TYPE_DEBUG,
1823          "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
1824          GSC_2s (ccc->c),
1825          ntohl (client_ccn.channel_of_client),
1826          GCCH_2s (ch),
1827          ntohl (ccc->ccn.channel_of_client),
1828          ccc);
1829     return; /* none pending */
1830   }
1831   if (GNUNET_YES == ch->is_loopback)
1832   {
1833     int to_owner;
1834
1835     /* Messages are always in-order, just send */
1836     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1837                                  ccc->tail_recv,
1838                                  com);
1839     ccc->num_recv--;
1840     GSC_send_to_client (ccc->c,
1841                         com->env);
1842     /* Notify sender that we can receive more */
1843     if (ccc->ccn.channel_of_client ==
1844         ch->owner->ccn.channel_of_client)
1845     {
1846       to_owner = GNUNET_NO;
1847     }
1848     else
1849     {
1850       GNUNET_assert (ccc->ccn.channel_of_client ==
1851                      ch->dest->ccn.channel_of_client);
1852       to_owner = GNUNET_YES;
1853     }
1854     send_ack_to_client (ch,
1855                         to_owner);
1856     GNUNET_free (com);
1857     return;
1858   }
1859
1860   if ( (com->mid.mid != ch->mid_recv.mid) &&
1861        (GNUNET_NO == ch->out_of_order) &&
1862        (GNUNET_YES == ch->reliable) )
1863   {
1864     LOG (GNUNET_ERROR_TYPE_DEBUG,
1865          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1866          GSC_2s (ccc->c),
1867          ntohl (ccc->ccn.channel_of_client),
1868          ntohl (com->mid.mid),
1869          ntohl (ch->mid_recv.mid));
1870     return; /* missing next one in-order */
1871   }
1872
1873   LOG (GNUNET_ERROR_TYPE_DEBUG,
1874        "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
1875        ntohl (com->mid.mid),
1876        GSC_2s (ccc->c),
1877        ntohl (ccc->ccn.channel_of_client),
1878        GCCH_2s (ch));
1879
1880   /* all good, pass next message to client */
1881   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1882                                ccc->tail_recv,
1883                                com);
1884   ccc->num_recv--;
1885   /* FIXME: if unreliable, this is not aggressive
1886      enough, as it would be OK to have lost some! */
1887
1888   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1889   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1890   ccc->client_ready = GNUNET_NO;
1891   GSC_send_to_client (ccc->c,
1892                       com->env);
1893   GNUNET_free (com);
1894   send_channel_data_ack (ch);
1895   if (NULL != ccc->head_recv)
1896     return;
1897   if (GNUNET_NO == ch->destroy)
1898     return;
1899   GCT_send_channel_destroy (ch->t,
1900                             ch->ctn);
1901   channel_destroy (ch);
1902 }
1903
1904
1905 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1906
1907
1908 /**
1909  * Log channel info.
1910  *
1911  * @param ch Channel.
1912  * @param level Debug level to use.
1913  */
1914 void
1915 GCCH_debug (struct CadetChannel *ch,
1916             enum GNUNET_ErrorType level)
1917 {
1918   int do_log;
1919
1920   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1921                                        "cadet-chn",
1922                                        __FILE__, __FUNCTION__, __LINE__);
1923   if (0 == do_log)
1924     return;
1925
1926   if (NULL == ch)
1927   {
1928     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1929     return;
1930   }
1931   LOG2 (level,
1932         "CHN %s:%X (%p)\n",
1933         GCT_2s (ch->t),
1934         ch->ctn,
1935         ch);
1936   if (NULL != ch->owner)
1937   {
1938     LOG2 (level,
1939           "CHN origin %s ready %s local-id: %u\n",
1940           GSC_2s (ch->owner->c),
1941           ch->owner->client_ready ? "YES" : "NO",
1942           ntohl (ch->owner->ccn.channel_of_client));
1943   }
1944   if (NULL != ch->dest)
1945   {
1946     LOG2 (level,
1947           "CHN destination %s ready %s local-id: %u\n",
1948           GSC_2s (ch->dest->c),
1949           ch->dest->client_ready ? "YES" : "NO",
1950           ntohl (ch->dest->ccn.channel_of_client));
1951   }
1952   LOG2 (level,
1953         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1954         ntohl (ch->mid_recv.mid),
1955         (unsigned long long) ch->mid_futures,
1956         ntohl (ch->mid_send.mid));
1957 }
1958
1959
1960
1961 /* end of gnunet-service-cadet-new_channel.c */