prevent unordered message delivery
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_channel.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 /**
21  * @file cadet/gnunet-service-cadet_channel.c
22  * @brief logical links between CADET clients
23  * @author Bartlomiej Polot
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - Congestion/flow control:
28  *   + estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
29  *     (and figure out how/where to use this!)
30  *   + figure out flow control without ACKs (unreliable traffic!)
31  * - revisit handling of 'unbuffered' traffic!
32  *   (need to push down through tunnel into connection selection)
33  * - revisit handling of 'buffered' traffic: 4 is a rather small buffer; maybe
34  *   reserve more bits in 'options' to allow for buffer size control?
35  */
36 #include "platform.h"
37 #include "cadet.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet-service-cadet_channel.h"
40 #include "gnunet-service-cadet_connection.h"
41 #include "gnunet-service-cadet_tunnels.h"
42 #include "gnunet-service-cadet_paths.h"
43
44 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
45
46 /**
47  * How long do we initially wait before retransmitting?
48  */
49 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
50
51 /**
52  * How long do we wait before dropping state about incoming
53  * connection to closed port?
54  */
55 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
56
57 /**
58  * How long do we wait at least before retransmitting ever?
59  */
60 #define MIN_RTT_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 75)
61
62 /**
63  * Maximum message ID into the future we accept for out-of-order messages.
64  * If the message is more than this into the future, we drop it.  This is
65  * important both to detect values that are actually in the past, as well
66  * as to limit adversarially triggerable memory consumption.
67  *
68  * Note that right now we have "max_pending_messages = 4" hard-coded in
69  * the logic below, so a value of 4 would suffice here. But we plan to
70  * allow larger windows in the future...
71  */
72 #define MAX_OUT_OF_ORDER_DISTANCE 1024
73
74
75 /**
76  * All the states a channel can be in.
77  */
78 enum CadetChannelState
79 {
80   /**
81    * Uninitialized status, should never appear in operation.
82    */
83   CADET_CHANNEL_NEW,
84
85   /**
86    * Channel is to a port that is not open, we're waiting for the
87    * port to be opened.
88    */
89   CADET_CHANNEL_LOOSE,
90
91   /**
92    * CHANNEL_OPEN message sent, waiting for CHANNEL_OPEN_ACK.
93    */
94   CADET_CHANNEL_OPEN_SENT,
95
96   /**
97    * Connection confirmed, ready to carry traffic.
98    */
99   CADET_CHANNEL_READY
100 };
101
102
103 /**
104  * Info needed to retry a message in case it gets lost.
105  * Note that we DO use this structure also for unreliable
106  * messages.
107  */
108 struct CadetReliableMessage
109 {
110   /**
111    * Double linked list, FIFO style
112    */
113   struct CadetReliableMessage *next;
114
115   /**
116    * Double linked list, FIFO style
117    */
118   struct CadetReliableMessage *prev;
119
120   /**
121    * Which channel is this message in?
122    */
123   struct CadetChannel *ch;
124
125   /**
126    * Entry in the tunnels queue for this message, NULL if it has left
127    * the tunnel.  Used to cancel transmission in case we receive an
128    * ACK in time.
129    */
130   struct CadetTunnelQueueEntry *qe;
131
132   /**
133    * Data message we are trying to send.
134    */
135   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
136
137   /**
138    * How soon should we retry if we fail to get an ACK?
139    * Messages in the queue are sorted by this value.
140    */
141   struct GNUNET_TIME_Absolute next_retry;
142
143   /**
144    * How long do we wait for an ACK after transmission?
145    * Use for the back-off calculation.
146    */
147   struct GNUNET_TIME_Relative retry_delay;
148
149   /**
150    * Time when we first successfully transmitted the message
151    * (that is, set @e num_transmissions to 1).
152    */
153   struct GNUNET_TIME_Absolute first_transmission_time;
154
155   /**
156    * Identifier of the connection that this message took when it
157    * was first transmitted.  Only useful if @e num_transmissions is 1.
158    */
159   struct GNUNET_CADET_ConnectionTunnelIdentifier connection_taken;
160
161   /**
162    * How often was this message transmitted?  #GNUNET_SYSERR if there
163    * was an error transmitting the message, #GNUNET_NO if it was not
164    * yet transmitted ever, otherwise the number of (re) transmissions.
165    */
166   int num_transmissions;
167
168 };
169
170
171 /**
172  * List of received out-of-order data messages.
173  */
174 struct CadetOutOfOrderMessage
175 {
176   /**
177    * Double linked list, FIFO style
178    */
179   struct CadetOutOfOrderMessage *next;
180
181   /**
182    * Double linked list, FIFO style
183    */
184   struct CadetOutOfOrderMessage *prev;
185
186   /**
187    * ID of the message (messages up to this point needed
188    * before we give this one to the client).
189    */
190   struct ChannelMessageIdentifier mid;
191
192   /**
193    * The envelope with the payload of the out-of-order message
194    */
195   struct GNUNET_MQ_Envelope *env;
196
197 };
198
199
200 /**
201  * Client endpoint of a `struct CadetChannel`.  A channel may be a
202  * loopback channel, in which case it has two of these endpoints.
203  * Note that flow control also is required in both directions.
204  */
205 struct CadetChannelClient
206 {
207   /**
208    * Client handle.  Not by itself sufficient to designate
209    * the client endpoint, as the same client handle may
210    * be used for both the owner and the destination, and
211    * we thus also need the channel ID to identify the client.
212    */
213   struct CadetClient *c;
214
215   /**
216    * Head of DLL of messages received out of order or while client was unready.
217    */
218   struct CadetOutOfOrderMessage *head_recv;
219
220   /**
221    * Tail DLL of messages received out of order or while client was unready.
222    */
223   struct CadetOutOfOrderMessage *tail_recv;
224
225   /**
226    * Local tunnel number for this client.
227    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
228    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
229    */
230   struct GNUNET_CADET_ClientChannelNumber ccn;
231
232   /**
233    * Number of entries currently in @a head_recv DLL.
234    */
235   unsigned int num_recv;
236
237   /**
238    * Can we send data to the client?
239    */
240   int client_ready;
241
242 };
243
244
245 /**
246  * Struct containing all information regarding a channel to a remote client.
247  */
248 struct CadetChannel
249 {
250   /**
251    * Tunnel this channel is in.
252    */
253   struct CadetTunnel *t;
254
255   /**
256    * Client owner of the tunnel, if any.
257    * (Used if this channel represends the initiating end of the tunnel.)
258    */
259   struct CadetChannelClient *owner;
260
261   /**
262    * Client destination of the tunnel, if any.
263    * (Used if this channel represents the listening end of the tunnel.)
264    */
265   struct CadetChannelClient *dest;
266
267   /**
268    * Last entry in the tunnel's queue relating to control messages
269    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
270    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
271    * transmission in case we receive updated information.
272    */
273   struct CadetTunnelQueueEntry *last_control_qe;
274
275   /**
276    * Head of DLL of messages sent and not yet ACK'd.
277    */
278   struct CadetReliableMessage *head_sent;
279
280   /**
281    * Tail of DLL of messages sent and not yet ACK'd.
282    */
283   struct CadetReliableMessage *tail_sent;
284
285   /**
286    * Task to resend/poll in case no ACK is received.
287    */
288   struct GNUNET_SCHEDULER_Task *retry_control_task;
289
290   /**
291    * Task to resend/poll in case no ACK is received.
292    */
293   struct GNUNET_SCHEDULER_Task *retry_data_task;
294
295   /**
296    * Last time the channel was used
297    */
298   struct GNUNET_TIME_Absolute timestamp;
299
300   /**
301    * Destination port of the channel.
302    */
303   struct GNUNET_HashCode port;
304
305   /**
306    * Hash'ed port of the channel with initiator and destination PID.
307    */
308   struct GNUNET_HashCode h_port;
309
310   /**
311    * Counter for exponential backoff.
312    */
313   struct GNUNET_TIME_Relative retry_time;
314
315   /**
316    * Bitfield of already-received messages past @e mid_recv.
317    */
318   uint64_t mid_futures;
319
320   /**
321    * Next MID expected for incoming traffic.
322    */
323   struct ChannelMessageIdentifier mid_recv;
324
325   /**
326    * Next MID to use for outgoing traffic.
327    */
328   struct ChannelMessageIdentifier mid_send;
329
330   /**
331    * Total (reliable) messages pending ACK for this channel.
332    */
333   unsigned int pending_messages;
334
335   /**
336    * Maximum (reliable) messages pending ACK for this channel
337    * before we throttle the client.
338    */
339   unsigned int max_pending_messages;
340
341   /**
342    * Number identifying this channel in its tunnel.
343    */
344   struct GNUNET_CADET_ChannelTunnelNumber ctn;
345
346   /**
347    * Channel state.
348    */
349   enum CadetChannelState state;
350
351   /**
352    * Count how many ACKs we skipped, used to prevent long
353    * sequences of ACK skipping.
354    */
355   unsigned int skip_ack_series;
356
357   /**
358    * Is the tunnel bufferless (minimum latency)?
359    */
360   int nobuffer;
361
362   /**
363    * Is the tunnel reliable?
364    */
365   int reliable;
366
367   /**
368    * Is the tunnel out-of-order?
369    */
370   int out_of_order;
371
372   /**
373    * Is this channel a loopback channel, where the destination is us again?
374    */
375   int is_loopback;
376
377   /**
378    * Flag to signal the destruction of the channel.  If this is set to
379    * #GNUNET_YES the channel will be destroyed once the queue is
380    * empty.
381    */
382   int destroy;
383
384 };
385
386
387 /**
388  * Get the static string for identification of the channel.
389  *
390  * @param ch Channel.
391  *
392  * @return Static string with the channel IDs.
393  */
394 const char *
395 GCCH_2s (const struct CadetChannel *ch)
396 {
397   static char buf[128];
398
399   GNUNET_snprintf (buf,
400                    sizeof (buf),
401                    "Channel %s:%s ctn:%X(%X/%X)",
402                    (GNUNET_YES == ch->is_loopback)
403                    ? "loopback"
404                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
405                    GNUNET_h2s (&ch->port),
406                    ch->ctn,
407                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
408                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
409   return buf;
410 }
411
412
413 /**
414  * Hash the @a port and @a initiator and @a listener to
415  * calculate the "challenge" @a h_port we send to the other
416  * peer on #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN.
417  *
418  * @param[out] h_port set to the hash of @a port, @a initiator and @a listener
419  * @param port cadet port, as seen by CADET clients
420  * @param listener peer that is listining on @a port
421  */
422 void
423 GCCH_hash_port (struct GNUNET_HashCode *h_port,
424                 const struct GNUNET_HashCode *port,
425                 const struct GNUNET_PeerIdentity *listener)
426 {
427   struct GNUNET_HashContext *hc;
428
429   hc = GNUNET_CRYPTO_hash_context_start ();
430   GNUNET_CRYPTO_hash_context_read (hc,
431                                    port,
432                                    sizeof (*port));
433   GNUNET_CRYPTO_hash_context_read (hc,
434                                    listener,
435                                    sizeof (*listener));
436   GNUNET_CRYPTO_hash_context_finish (hc,
437                                      h_port);
438   LOG (GNUNET_ERROR_TYPE_DEBUG,
439        "Calculated port hash %s\n",
440        GNUNET_h2s (h_port));
441 }
442
443
444 /**
445  * Get the channel's public ID.
446  *
447  * @param ch Channel.
448  *
449  * @return ID used to identify the channel with the remote peer.
450  */
451 struct GNUNET_CADET_ChannelTunnelNumber
452 GCCH_get_id (const struct CadetChannel *ch)
453 {
454   return ch->ctn;
455 }
456
457
458 /**
459  * Release memory associated with @a ccc
460  *
461  * @param ccc data structure to clean up
462  */
463 static void
464 free_channel_client (struct CadetChannelClient *ccc)
465 {
466   struct CadetOutOfOrderMessage *com;
467
468   while (NULL != (com = ccc->head_recv))
469   {
470     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
471                                  ccc->tail_recv,
472                                  com);
473     ccc->num_recv--;
474     GNUNET_MQ_discard (com->env);
475     GNUNET_free (com);
476   }
477   GNUNET_free (ccc);
478 }
479
480
481 /**
482  * Destroy the given channel.
483  *
484  * @param ch channel to destroy
485  */
486 static void
487 channel_destroy (struct CadetChannel *ch)
488 {
489   struct CadetReliableMessage *crm;
490
491   while (NULL != (crm = ch->head_sent))
492   {
493     GNUNET_assert (ch == crm->ch);
494     if (NULL != crm->qe)
495     {
496       GCT_send_cancel (crm->qe);
497       crm->qe = NULL;
498     }
499     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
500                                  ch->tail_sent,
501                                  crm);
502     GNUNET_free (crm->data_message);
503     GNUNET_free (crm);
504   }
505   if (CADET_CHANNEL_LOOSE == ch->state)
506   {
507     GSC_drop_loose_channel (&ch->h_port,
508                             ch);
509   }
510   if (NULL != ch->owner)
511   {
512     free_channel_client (ch->owner);
513     ch->owner = NULL;
514   }
515   if (NULL != ch->dest)
516   {
517     free_channel_client (ch->dest);
518     ch->dest = NULL;
519   }
520   if (NULL != ch->last_control_qe)
521   {
522     GCT_send_cancel (ch->last_control_qe);
523     ch->last_control_qe = NULL;
524   }
525   if (NULL != ch->retry_data_task)
526   {
527     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
528     ch->retry_data_task = NULL;
529   }
530   if (NULL != ch->retry_control_task)
531   {
532     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
533     ch->retry_control_task = NULL;
534   }
535   if (GNUNET_NO == ch->is_loopback)
536   {
537     GCT_remove_channel (ch->t,
538                         ch,
539                         ch->ctn);
540     ch->t = NULL;
541   }
542   GNUNET_free (ch);
543 }
544
545
546 /**
547  * Send a channel create message.
548  *
549  * @param cls Channel for which to send.
550  */
551 static void
552 send_channel_open (void *cls);
553
554
555 /**
556  * Function called once the tunnel confirms that we sent the
557  * create message.  Delays for a bit until we retry.
558  *
559  * @param cls our `struct CadetChannel`.
560  * @param cid identifier of the connection within the tunnel, NULL
561  *            if transmission failed
562  */
563 static void
564 channel_open_sent_cb (void *cls,
565                       const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
566 {
567   struct CadetChannel *ch = cls;
568
569   GNUNET_assert (NULL != ch->last_control_qe);
570   ch->last_control_qe = NULL;
571   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
572   LOG (GNUNET_ERROR_TYPE_DEBUG,
573        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
574        GCCH_2s (ch),
575        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
576                                                GNUNET_YES));
577   ch->retry_control_task
578     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
579                                     &send_channel_open,
580                                     ch);
581 }
582
583
584 /**
585  * Send a channel open message.
586  *
587  * @param cls Channel for which to send.
588  */
589 static void
590 send_channel_open (void *cls)
591 {
592   struct CadetChannel *ch = cls;
593   struct GNUNET_CADET_ChannelOpenMessage msgcc;
594   uint32_t options;
595
596   ch->retry_control_task = NULL;
597   LOG (GNUNET_ERROR_TYPE_DEBUG,
598        "Sending CHANNEL_OPEN message for %s\n",
599        GCCH_2s (ch));
600   options = 0;
601   if (ch->nobuffer)
602     options |= GNUNET_CADET_OPTION_NOBUFFER;
603   if (ch->reliable)
604     options |= GNUNET_CADET_OPTION_RELIABLE;
605   if (ch->out_of_order)
606     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
607   msgcc.header.size = htons (sizeof (msgcc));
608   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
609   msgcc.opt = htonl (options);
610   msgcc.h_port = ch->h_port;
611   msgcc.ctn = ch->ctn;
612   ch->state = CADET_CHANNEL_OPEN_SENT;
613   if (NULL != ch->last_control_qe)
614     GCT_send_cancel (ch->last_control_qe);
615   ch->last_control_qe = GCT_send (ch->t,
616                                   &msgcc.header,
617                                   &channel_open_sent_cb,
618                                   ch);
619   GNUNET_assert (NULL == ch->retry_control_task);
620 }
621
622
623 /**
624  * Function called once and only once after a channel was bound
625  * to its tunnel via #GCT_add_channel() is ready for transmission.
626  * Note that this is only the case for channels that this peer
627  * initiates, as for incoming channels we assume that they are
628  * ready for transmission immediately upon receiving the open
629  * message.  Used to bootstrap the #GCT_send() process.
630  *
631  * @param ch the channel for which the tunnel is now ready
632  */
633 void
634 GCCH_tunnel_up (struct CadetChannel *ch)
635 {
636   GNUNET_assert (NULL == ch->retry_control_task);
637   LOG (GNUNET_ERROR_TYPE_DEBUG,
638        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
639        GCCH_2s (ch));
640   ch->retry_control_task
641     = GNUNET_SCHEDULER_add_now (&send_channel_open,
642                                 ch);
643 }
644
645
646 /**
647  * Create a new channel.
648  *
649  * @param owner local client owning the channel
650  * @param ccn local number of this channel at the @a owner
651  * @param destination peer to which we should build the channel
652  * @param port desired port at @a destination
653  * @param options options for the channel
654  * @return handle to the new channel
655  */
656 struct CadetChannel *
657 GCCH_channel_local_new (struct CadetClient *owner,
658                         struct GNUNET_CADET_ClientChannelNumber ccn,
659                         struct CadetPeer *destination,
660                         const struct GNUNET_HashCode *port,
661                         uint32_t options)
662 {
663   struct CadetChannel *ch;
664   struct CadetChannelClient *ccco;
665
666   ccco = GNUNET_new (struct CadetChannelClient);
667   ccco->c = owner;
668   ccco->ccn = ccn;
669   ccco->client_ready = GNUNET_YES;
670
671   ch = GNUNET_new (struct CadetChannel);
672   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
673   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
674   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
675   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
676   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
677   ch->owner = ccco;
678   ch->port = *port;
679   GCCH_hash_port (&ch->h_port,
680                   port,
681                   GCP_get_id (destination));
682   if (0 == memcmp (&my_full_id,
683                    GCP_get_id (destination),
684                    sizeof (struct GNUNET_PeerIdentity)))
685   {
686     struct OpenPort *op;
687
688     ch->is_loopback = GNUNET_YES;
689     op = GNUNET_CONTAINER_multihashmap_get (open_ports,
690                                             &ch->h_port);
691     if (NULL == op)
692     {
693       /* port closed, wait for it to possibly open */
694       ch->state = CADET_CHANNEL_LOOSE;
695       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
696                                                 &ch->h_port,
697                                                 ch,
698                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
699       LOG (GNUNET_ERROR_TYPE_DEBUG,
700            "Created loose incoming loopback channel to port %s\n",
701            GNUNET_h2s (&ch->port));
702     }
703     else
704     {
705       GCCH_bind (ch,
706                  op->c,
707                  &op->port);
708     }
709   }
710   else
711   {
712     ch->t = GCP_get_tunnel (destination,
713                             GNUNET_YES);
714     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
715     ch->ctn = GCT_add_channel (ch->t,
716                                ch);
717   }
718   GNUNET_STATISTICS_update (stats,
719                             "# channels",
720                             1,
721                             GNUNET_NO);
722   LOG (GNUNET_ERROR_TYPE_DEBUG,
723        "Created channel to port %s at peer %s for %s using %s\n",
724        GNUNET_h2s (port),
725        GCP_2s (destination),
726        GSC_2s (owner),
727        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
728   return ch;
729 }
730
731
732 /**
733  * We had an incoming channel to a port that is closed.
734  * It has not been opened for a while, drop it.
735  *
736  * @param cls the channel to drop
737  */
738 static void
739 timeout_closed_cb (void *cls)
740 {
741   struct CadetChannel *ch = cls;
742
743   ch->retry_control_task = NULL;
744   LOG (GNUNET_ERROR_TYPE_DEBUG,
745        "Closing incoming channel to port %s from peer %s due to timeout\n",
746        GNUNET_h2s (&ch->port),
747        GCP_2s (GCT_get_destination (ch->t)));
748   channel_destroy (ch);
749 }
750
751
752 /**
753  * Create a new channel based on a request coming in over the network.
754  *
755  * @param t tunnel to the remote peer
756  * @param ctn identifier of this channel in the tunnel
757  * @param h_port desired hash of local port
758  * @param options options for the channel
759  * @return handle to the new channel
760  */
761 struct CadetChannel *
762 GCCH_channel_incoming_new (struct CadetTunnel *t,
763                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
764                            const struct GNUNET_HashCode *h_port,
765                            uint32_t options)
766 {
767   struct CadetChannel *ch;
768   struct OpenPort *op;
769
770   ch = GNUNET_new (struct CadetChannel);
771   ch->h_port = *h_port;
772   ch->t = t;
773   ch->ctn = ctn;
774   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
775   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
776   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
777   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
778   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
779   GNUNET_STATISTICS_update (stats,
780                             "# channels",
781                             1,
782                             GNUNET_NO);
783
784   op = GNUNET_CONTAINER_multihashmap_get (open_ports,
785                                           h_port);
786   if (NULL == op)
787   {
788     /* port closed, wait for it to possibly open */
789     ch->state = CADET_CHANNEL_LOOSE;
790     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
791                                               &ch->h_port,
792                                               ch,
793                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
794     GNUNET_assert (NULL == ch->retry_control_task);
795     ch->retry_control_task
796       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
797                                       &timeout_closed_cb,
798                                       ch);
799     LOG (GNUNET_ERROR_TYPE_DEBUG,
800          "Created loose incoming channel to port %s from peer %s\n",
801          GNUNET_h2s (&ch->port),
802          GCP_2s (GCT_get_destination (ch->t)));
803   }
804   else
805   {
806     GCCH_bind (ch,
807                op->c,
808                &op->port);
809   }
810   GNUNET_STATISTICS_update (stats,
811                             "# channels",
812                             1,
813                             GNUNET_NO);
814   return ch;
815 }
816
817
818 /**
819  * Function called once the tunnel confirms that we sent the
820  * ACK message.  Just remembers it was sent, we do not expect
821  * ACKs for ACKs ;-).
822  *
823  * @param cls our `struct CadetChannel`.
824  * @param cid identifier of the connection within the tunnel, NULL
825  *            if transmission failed
826  */
827 static void
828 send_ack_cb (void *cls,
829              const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
830 {
831   struct CadetChannel *ch = cls;
832
833   GNUNET_assert (NULL != ch->last_control_qe);
834   ch->last_control_qe = NULL;
835 }
836
837
838 /**
839  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
840  *
841  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
842  */
843 static void
844 send_channel_data_ack (struct CadetChannel *ch)
845 {
846   struct GNUNET_CADET_ChannelDataAckMessage msg;
847
848   if (GNUNET_NO == ch->reliable)
849     return; /* no ACKs */
850   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
851   msg.header.size = htons (sizeof (msg));
852   msg.ctn = ch->ctn;
853   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid));
854   msg.futures = GNUNET_htonll (ch->mid_futures);
855   LOG (GNUNET_ERROR_TYPE_DEBUG,
856        "Sending DATA_ACK %u:%llX via %s\n",
857        (unsigned int) ntohl (msg.mid.mid),
858        (unsigned long long) ch->mid_futures,
859        GCCH_2s (ch));
860   if (NULL != ch->last_control_qe)
861     GCT_send_cancel (ch->last_control_qe);
862   ch->last_control_qe = GCT_send (ch->t,
863                                   &msg.header,
864                                   &send_ack_cb,
865                                   ch);
866 }
867
868
869 /**
870  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
871  * connection is up.
872  *
873  * @param cls the `struct CadetChannel`
874  */
875 static void
876 send_open_ack (void *cls)
877 {
878   struct CadetChannel *ch = cls;
879   struct GNUNET_CADET_ChannelOpenAckMessage msg;
880
881   ch->retry_control_task = NULL;
882   LOG (GNUNET_ERROR_TYPE_DEBUG,
883        "Sending CHANNEL_OPEN_ACK on %s\n",
884        GCCH_2s (ch));
885   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
886   msg.header.size = htons (sizeof (msg));
887   msg.reserved = htonl (0);
888   msg.ctn = ch->ctn;
889   msg.port = ch->port;
890   if (NULL != ch->last_control_qe)
891     GCT_send_cancel (ch->last_control_qe);
892   ch->last_control_qe = GCT_send (ch->t,
893                                   &msg.header,
894                                   &send_ack_cb,
895                                   ch);
896 }
897
898
899 /**
900  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
901  * this channel.  If the binding was successful, (re)transmit the
902  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
903  *
904  * @param ch channel that got the duplicate open
905  * @param cti identifier of the connection that delivered the message
906  */
907 void
908 GCCH_handle_duplicate_open (struct CadetChannel *ch,
909                             const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
910 {
911   if (NULL == ch->dest)
912   {
913     LOG (GNUNET_ERROR_TYPE_DEBUG,
914          "Ignoring duplicate CHANNEL_OPEN on %s: port is closed\n",
915          GCCH_2s (ch));
916     return;
917   }
918   if (NULL != ch->retry_control_task)
919   {
920     LOG (GNUNET_ERROR_TYPE_DEBUG,
921          "Ignoring duplicate CHANNEL_OPEN on %s: control message is pending\n",
922          GCCH_2s (ch));
923     return;
924   }
925   LOG (GNUNET_ERROR_TYPE_DEBUG,
926        "Retransmitting CHANNEL_OPEN_ACK on %s\n",
927        GCCH_2s (ch));
928   ch->retry_control_task
929     = GNUNET_SCHEDULER_add_now (&send_open_ack,
930                                 ch);
931 }
932
933
934 /**
935  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
936  *
937  * @param ch channel the ack is for
938  * @param to_owner #GNUNET_YES to send to owner,
939  *                 #GNUNET_NO to send to dest
940  */
941 static void
942 send_ack_to_client (struct CadetChannel *ch,
943                     int to_owner)
944 {
945   struct GNUNET_MQ_Envelope *env;
946   struct GNUNET_CADET_LocalAck *ack;
947   struct CadetChannelClient *ccc;
948
949   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
950   if (NULL == ccc)
951   {
952     /* This can happen if we are just getting ACKs after
953        our local client already disconnected. */
954     GNUNET_assert (GNUNET_YES == ch->destroy);
955     return;
956   }
957   env = GNUNET_MQ_msg (ack,
958                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
959   ack->ccn = ccc->ccn;
960   LOG (GNUNET_ERROR_TYPE_DEBUG,
961        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
962        GSC_2s (ccc->c),
963        (GNUNET_YES == to_owner) ? "owner" : "dest",
964        ntohl (ack->ccn.channel_of_client),
965        ch->pending_messages,
966        ch->max_pending_messages);
967   GSC_send_to_client (ccc->c,
968                       env);
969 }
970
971
972 /**
973  * A client is bound to the port that we have a channel
974  * open to.  Send the acknowledgement for the connection
975  * request and establish the link with the client.
976  *
977  * @param ch open incoming channel
978  * @param c client listening on the respective @a port
979  * @param port the port @a is listening on
980  */
981 void
982 GCCH_bind (struct CadetChannel *ch,
983            struct CadetClient *c,
984            const struct GNUNET_HashCode *port)
985 {
986   uint32_t options;
987   struct CadetChannelClient *cccd;
988
989   LOG (GNUNET_ERROR_TYPE_DEBUG,
990        "Binding %s from %s to port %s of %s\n",
991        GCCH_2s (ch),
992        GCT_2s (ch->t),
993        GNUNET_h2s (&ch->port),
994        GSC_2s (c));
995   if (NULL != ch->retry_control_task)
996   {
997     /* there might be a timeout task here */
998     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
999     ch->retry_control_task = NULL;
1000   }
1001   options = 0;
1002   if (ch->nobuffer)
1003     options |= GNUNET_CADET_OPTION_NOBUFFER;
1004   if (ch->reliable)
1005     options |= GNUNET_CADET_OPTION_RELIABLE;
1006   if (ch->out_of_order)
1007     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
1008   cccd = GNUNET_new (struct CadetChannelClient);
1009   GNUNET_assert (NULL == ch->dest);
1010   ch->dest = cccd;
1011   ch->port = *port;
1012   cccd->c = c;
1013   cccd->client_ready = GNUNET_YES;
1014   cccd->ccn = GSC_bind (c,
1015                         ch,
1016                         (GNUNET_YES == ch->is_loopback)
1017                         ? GCP_get (&my_full_id,
1018                                    GNUNET_YES)
1019                         : GCT_get_destination (ch->t),
1020                         port,
1021                         options);
1022   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
1023                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1024   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
1025   if (GNUNET_YES == ch->is_loopback)
1026   {
1027     ch->state = CADET_CHANNEL_OPEN_SENT;
1028     GCCH_handle_channel_open_ack (ch,
1029                                   NULL,
1030                                   port);
1031   }
1032   else
1033   {
1034     /* notify other peer that we accepted the connection */
1035     ch->state = CADET_CHANNEL_READY;
1036     ch->retry_control_task
1037       = GNUNET_SCHEDULER_add_now (&send_open_ack,
1038                                   ch);
1039   }
1040   /* give client it's initial supply of ACKs */
1041   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
1042                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
1043   for (unsigned int i=0;i<ch->max_pending_messages;i++)
1044     send_ack_to_client (ch,
1045                         GNUNET_NO);
1046 }
1047
1048
1049 /**
1050  * One of our clients has disconnected, tell the other one that we
1051  * are finished. Done asynchronously to avoid concurrent modification
1052  * issues if this is the same client.
1053  *
1054  * @param cls the `struct CadetChannel` where one of the ends is now dead
1055  */
1056 static void
1057 signal_remote_destroy_cb (void *cls)
1058 {
1059   struct CadetChannel *ch = cls;
1060   struct CadetChannelClient *ccc;
1061
1062   /* Find which end is left... */
1063   ch->retry_control_task = NULL;
1064   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1065   GSC_handle_remote_channel_destroy (ccc->c,
1066                                      ccc->ccn,
1067                                      ch);
1068   channel_destroy (ch);
1069 }
1070
1071
1072 /**
1073  * Destroy locally created channel.  Called by the local client, so no
1074  * need to tell the client.
1075  *
1076  * @param ch channel to destroy
1077  * @param c client that caused the destruction
1078  * @param ccn client number of the client @a c
1079  */
1080 void
1081 GCCH_channel_local_destroy (struct CadetChannel *ch,
1082                             struct CadetClient *c,
1083                             struct GNUNET_CADET_ClientChannelNumber ccn)
1084 {
1085   LOG (GNUNET_ERROR_TYPE_DEBUG,
1086        "%s asks for destruction of %s\n",
1087        GSC_2s (c),
1088        GCCH_2s (ch));
1089   GNUNET_assert (NULL != c);
1090   if ( (NULL != ch->owner) &&
1091        (c == ch->owner->c) &&
1092        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
1093   {
1094     free_channel_client (ch->owner);
1095     ch->owner = NULL;
1096   }
1097   else if ( (NULL != ch->dest) &&
1098             (c == ch->dest->c) &&
1099             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
1100   {
1101     free_channel_client (ch->dest);
1102     ch->dest = NULL;
1103   }
1104   else
1105   {
1106     GNUNET_assert (0);
1107   }
1108
1109   if (GNUNET_YES == ch->destroy)
1110   {
1111     /* other end already destroyed, with the local client gone, no need
1112        to finish transmissions, just destroy immediately. */
1113     channel_destroy (ch);
1114     return;
1115   }
1116   if ( (NULL != ch->head_sent) &&
1117        ( (NULL != ch->owner) ||
1118          (NULL != ch->dest) ) )
1119   {
1120     /* Wait for other end to destroy us as well,
1121        and otherwise allow send queue to be transmitted first */
1122     ch->destroy = GNUNET_YES;
1123     return;
1124   }
1125   if ( (GNUNET_YES == ch->is_loopback) &&
1126        ( (NULL != ch->owner) ||
1127          (NULL != ch->dest) ) )
1128   {
1129     if (NULL != ch->retry_control_task)
1130       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1131     ch->retry_control_task
1132       = GNUNET_SCHEDULER_add_now (&signal_remote_destroy_cb,
1133                                   ch);
1134     return;
1135   }
1136   if (GNUNET_NO == ch->is_loopback)
1137   {
1138     /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
1139     switch (ch->state)
1140     {
1141     case CADET_CHANNEL_NEW:
1142       /* We gave up on a channel that we created as a client to a remote
1143          target, but that never went anywhere. Nothing to do here. */
1144       break;
1145     case CADET_CHANNEL_LOOSE:
1146       break;
1147     default:
1148       GCT_send_channel_destroy (ch->t,
1149                                 ch->ctn);
1150     }
1151   }
1152   /* Nothing left to do, just finish destruction */
1153   channel_destroy (ch);
1154 }
1155
1156
1157 /**
1158  * We got an acknowledgement for the creation of the channel
1159  * (the port is open on the other side).  Verify that the
1160  * other end really has the right port, and begin transmissions.
1161  *
1162  * @param ch channel to destroy
1163  * @param cti identifier of the connection that delivered the message
1164  * @param port port number (needed to verify receiver knows the port)
1165  */
1166 void
1167 GCCH_handle_channel_open_ack (struct CadetChannel *ch,
1168                               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1169                               const struct GNUNET_HashCode *port)
1170 {
1171   switch (ch->state)
1172   {
1173   case CADET_CHANNEL_NEW:
1174     /* this should be impossible */
1175     GNUNET_break (0);
1176     break;
1177   case CADET_CHANNEL_LOOSE:
1178     /* This makes no sense. */
1179     GNUNET_break_op (0);
1180     break;
1181   case CADET_CHANNEL_OPEN_SENT:
1182     if (NULL == ch->owner)
1183     {
1184       /* We're not the owner, wrong direction! */
1185       GNUNET_break_op (0);
1186       return;
1187     }
1188     if (0 != memcmp (&ch->port,
1189                      port,
1190                      sizeof (struct GNUNET_HashCode)))
1191     {
1192       /* Other peer failed to provide the right port,
1193          refuse connection. */
1194       GNUNET_break_op (0);
1195       return;
1196     }
1197     LOG (GNUNET_ERROR_TYPE_DEBUG,
1198          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1199          GCCH_2s (ch));
1200     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1201     {
1202       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1203       ch->retry_control_task = NULL;
1204     }
1205     ch->state = CADET_CHANNEL_READY;
1206     /* On first connect, send client as many ACKs as we allow messages
1207        to be buffered! */
1208     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1209       send_ack_to_client (ch,
1210                           GNUNET_YES);
1211     break;
1212   case CADET_CHANNEL_READY:
1213     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1214     LOG (GNUNET_ERROR_TYPE_DEBUG,
1215          "Received duplicate channel OPEN_ACK for %s\n",
1216          GCCH_2s (ch));
1217     GNUNET_STATISTICS_update (stats,
1218                               "# duplicate CREATE_ACKs",
1219                               1,
1220                               GNUNET_NO);
1221     break;
1222   }
1223 }
1224
1225
1226 /**
1227  * Test if element @a e1 comes before element @a e2.
1228  *
1229  * @param cls closure, to a flag where we indicate duplicate packets
1230  * @param m1 a message of to sort
1231  * @param m2 another message to sort
1232  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1233  */
1234 static int
1235 is_before (void *cls,
1236            struct CadetOutOfOrderMessage *m1,
1237            struct CadetOutOfOrderMessage *m2)
1238 {
1239   int *duplicate = cls;
1240   uint32_t v1 = ntohl (m1->mid.mid);
1241   uint32_t v2 = ntohl (m2->mid.mid);
1242   uint32_t delta;
1243
1244   delta = v2 - v1;
1245   if (0 == delta)
1246     *duplicate = GNUNET_YES;
1247   if (delta > (uint32_t) INT_MAX)
1248   {
1249     /* in overflow range, we can safely assume we wrapped around */
1250     return GNUNET_NO;
1251   }
1252   else
1253   {
1254     /* result is small, thus v2 > v1, thus m1 < m2 */
1255     return GNUNET_YES;
1256   }
1257 }
1258
1259
1260 /**
1261  * We got payload data for a channel.  Pass it on to the client
1262  * and send an ACK to the other end (once flow control allows it!)
1263  *
1264  * @param ch channel that got data
1265  * @param cti identifier of the connection that delivered the message
1266  * @param msg message that was received
1267  */
1268 void
1269 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1270                                     const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1271                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1272 {
1273   struct GNUNET_MQ_Envelope *env;
1274   struct GNUNET_CADET_LocalData *ld;
1275   struct CadetChannelClient *ccc;
1276   size_t payload_size;
1277   struct CadetOutOfOrderMessage *com;
1278   int duplicate;
1279   uint32_t mid_min;
1280   uint32_t mid_max;
1281   uint32_t mid_msg;
1282   uint32_t delta;
1283
1284   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1285   if ( (NULL == ch->owner) &&
1286        (NULL == ch->dest) )
1287   {
1288     /* This client is gone, but we still have messages to send to
1289        the other end (which is why @a ch is not yet dead).  However,
1290        we cannot pass messages to our client anymore. */
1291     LOG (GNUNET_ERROR_TYPE_DEBUG,
1292          "Dropping incoming payload on %s as this end is already closed\n",
1293          GCCH_2s (ch));
1294     /* send back DESTROY notification to stop further retransmissions! */
1295     if (GNUNET_YES == ch->destroy)
1296       GCT_send_channel_destroy (ch->t,
1297                                 ch->ctn);
1298     return;
1299   }
1300   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1301   env = GNUNET_MQ_msg_extra (ld,
1302                              payload_size,
1303                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1304   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1305   GNUNET_memcpy (&ld[1],
1306                  &msg[1],
1307                  payload_size);
1308   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1309   if (GNUNET_YES == ccc->client_ready)
1310   {
1311     /*
1312      * We ad-hoc send the message if
1313      * - The channel is out-of-order
1314      * - The channel is reliable and MID matches next expected MID
1315      * - The channel is unreliable and MID is before lowest seen MID
1316      */
1317     if ( (GNUNET_YES == ch->out_of_order) ||
1318          ((msg->mid.mid == ch->mid_recv.mid) &&
1319           (GNUNET_YES == ch->reliable)) ||
1320          ((GNUNET_NO == ch->reliable) &&
1321           (ntohl (msg->mid.mid) >= ntohl (ch->mid_recv.mid)) &&
1322           ((NULL == ccc->head_recv) ||
1323            (ntohl (msg->mid.mid) < ntohl (ccc->head_recv->mid.mid)))) )
1324     {
1325       LOG (GNUNET_ERROR_TYPE_DEBUG,
1326            "Giving %u bytes of payload with MID %u from %s to client %s\n",
1327            (unsigned int) payload_size,
1328            ntohl (msg->mid.mid),
1329            GCCH_2s (ch),
1330            GSC_2s (ccc->c));
1331       ccc->client_ready = GNUNET_NO;
1332       GSC_send_to_client (ccc->c,
1333                           env);
1334       if (GNUNET_NO == ch->out_of_order)
1335         ch->mid_recv.mid = htonl (1 + ntohl (msg->mid.mid));
1336       else
1337         ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1338       ch->mid_futures >>= 1;
1339       if ( (GNUNET_YES == ch->out_of_order) &&
1340            (GNUNET_NO == ch->reliable) )
1341       {
1342         /* possibly shift by more if we skipped messages */
1343         uint64_t delta = htonl (msg->mid.mid) - 1 - ntohl (ch->mid_recv.mid);
1344         
1345         if (delta > 63)
1346           ch->mid_futures = 0;
1347         else
1348           ch->mid_futures >>= delta;
1349         ch->mid_recv.mid = htonl (1 + ntohl (msg->mid.mid));
1350       }
1351       send_channel_data_ack (ch);
1352       return;
1353     }
1354   }
1355
1356   if (GNUNET_YES == ch->reliable)
1357   {
1358     /* check if message ought to be dropped because it is ancient/too distant/duplicate */
1359     mid_min = ntohl (ch->mid_recv.mid);
1360     mid_max = mid_min + ch->max_pending_messages;
1361     mid_msg = ntohl (msg->mid.mid);
1362     if ( ( (uint32_t) (mid_msg - mid_min) > ch->max_pending_messages) ||
1363          ( (uint32_t) (mid_max - mid_msg) > ch->max_pending_messages) )
1364     {
1365       LOG (GNUNET_ERROR_TYPE_DEBUG,
1366            "%s at %u drops ancient or far-future message %u\n",
1367            GCCH_2s (ch),
1368            (unsigned int) mid_min,
1369            ntohl (msg->mid.mid));
1370
1371       GNUNET_STATISTICS_update (stats,
1372                                 "# duplicate DATA (ancient or future)",
1373                                 1,
1374                                 GNUNET_NO);
1375       GNUNET_MQ_discard (env);
1376       send_channel_data_ack (ch);
1377       return;
1378     }
1379     /* mark bit for future ACKs */
1380     delta = mid_msg - mid_min - 1; /* overflow/underflow are OK here */
1381     if (delta < 64)
1382     {
1383       if (0 != (ch->mid_futures & (1LLU << delta)))
1384       {
1385         /* Duplicate within the queue, drop also */
1386         LOG (GNUNET_ERROR_TYPE_DEBUG,
1387              "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1388              (unsigned int) payload_size,
1389              GCCH_2s (ch),
1390              ntohl (msg->mid.mid));
1391         GNUNET_STATISTICS_update (stats,
1392                                   "# duplicate DATA",
1393                                   1,
1394                                   GNUNET_NO);
1395         GNUNET_MQ_discard (env);
1396         send_channel_data_ack (ch);
1397         return;
1398       }
1399       ch->mid_futures |= (1LLU << delta);
1400       LOG (GNUNET_ERROR_TYPE_DEBUG,
1401            "Marked bit %llX for mid %u (base: %u); now: %llX\n",
1402            (1LLU << delta),
1403            mid_msg,
1404            mid_min,
1405            ch->mid_futures);
1406     }
1407   }
1408   else /* ! ch->reliable */
1409   {
1410     struct CadetOutOfOrderMessage *next_msg;
1411
1412     /**
1413      * We always send if possible in this case.
1414      * It is guaranteed that the queued MID < received MID
1415      **/
1416     if ((NULL != ccc->head_recv) &&
1417         (GNUNET_YES == ccc->client_ready))
1418     {
1419       next_msg = ccc->head_recv;
1420       LOG (GNUNET_ERROR_TYPE_DEBUG,
1421            "Giving queued MID %u from %s to client %s\n",
1422            ntohl (next_msg->mid.mid),
1423            GCCH_2s (ch),
1424            GSC_2s (ccc->c));
1425       ccc->client_ready = GNUNET_NO;
1426       GSC_send_to_client (ccc->c,
1427                           next_msg->env);
1428       ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1429       ch->mid_futures >>= 1;
1430       send_channel_data_ack (ch);
1431       GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1432                                    ccc->tail_recv,
1433                                    next_msg);
1434       ccc->num_recv--;
1435       /* Do not process duplicate MID */
1436       if (msg->mid.mid == next_msg->mid.mid) /* Duplicate */
1437       {
1438         /* Duplicate within the queue, drop */
1439         LOG (GNUNET_ERROR_TYPE_DEBUG,
1440              "Message on %s (mid %u) dropped, duplicate\n",
1441              GCCH_2s (ch),
1442              ntohl (msg->mid.mid));
1443         GNUNET_free (next_msg);
1444         GNUNET_MQ_discard (env);
1445         return;
1446       }
1447       GNUNET_free (next_msg);
1448     }
1449
1450     if (ntohl (msg->mid.mid) < ntohl (ch->mid_recv.mid)) /* Old */
1451     {
1452       /* Duplicate within the queue, drop */
1453       LOG (GNUNET_ERROR_TYPE_DEBUG,
1454            "Message on %s (mid %u) dropped, old.\n",
1455            GCCH_2s (ch),
1456            ntohl (msg->mid.mid));
1457       GNUNET_MQ_discard (env);
1458       return;
1459     }
1460
1461     /* Channel is unreliable, so we do not ACK. But we also cannot
1462        allow buffering everything, so check if we have space... */
1463     if (ccc->num_recv >= ch->max_pending_messages)
1464     {
1465       struct CadetOutOfOrderMessage *drop;
1466
1467       /* Yep, need to drop. Drop the oldest message in
1468          the buffer. */
1469       LOG (GNUNET_ERROR_TYPE_DEBUG,
1470            "Queue full due slow client on %s, dropping oldest message\n",
1471            GCCH_2s (ch));
1472       GNUNET_STATISTICS_update (stats,
1473                                 "# messages dropped due to slow client",
1474                                 1,
1475                                 GNUNET_NO);
1476       drop = ccc->head_recv;
1477       GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1478                                    ccc->tail_recv,
1479                                    drop);
1480       ccc->num_recv--;
1481       GNUNET_MQ_discard (drop->env);
1482       GNUNET_free (drop);
1483     }
1484   }
1485
1486   /* Insert message into sorted out-of-order queue */
1487   com = GNUNET_new (struct CadetOutOfOrderMessage);
1488   com->mid = msg->mid;
1489   com->env = env;
1490   duplicate = GNUNET_NO;
1491   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1492                                       is_before,
1493                                       &duplicate,
1494                                       ccc->head_recv,
1495                                       ccc->tail_recv,
1496                                       com);
1497   ccc->num_recv++;
1498   if (GNUNET_YES == duplicate)
1499   {
1500     /* Duplicate within the queue, drop also (this is not covered by
1501        the case above if "delta" >= 64, which could be the case if
1502        max_pending_messages is also >= 64 or if our client is unready
1503        and we are seeing retransmissions of the message our client is
1504        blocked on. */
1505     LOG (GNUNET_ERROR_TYPE_DEBUG,
1506          "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1507          (unsigned int) payload_size,
1508          GCCH_2s (ch),
1509          ntohl (msg->mid.mid));
1510     GNUNET_STATISTICS_update (stats,
1511                               "# duplicate DATA",
1512                               1,
1513                               GNUNET_NO);
1514     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1515                                  ccc->tail_recv,
1516                                  com);
1517     ccc->num_recv--;
1518     GNUNET_MQ_discard (com->env);
1519     GNUNET_free (com);
1520     send_channel_data_ack (ch);
1521     return;
1522   }
1523   LOG (GNUNET_ERROR_TYPE_DEBUG,
1524        "Queued %s payload of %u bytes on %s-%X(%p) (mid %u, need %u first)\n",
1525        (GNUNET_YES == ccc->client_ready)
1526        ? "out-of-order"
1527        : "client-not-ready",
1528        (unsigned int) payload_size,
1529        GCCH_2s (ch),
1530        ntohl (ccc->ccn.channel_of_client),
1531        ccc,
1532        ntohl (msg->mid.mid),
1533        ntohl (ch->mid_recv.mid));
1534   /* NOTE: this ACK we _could_ skip, as the packet is out-of-order and
1535      the sender may already be transmitting the previous one.  Needs
1536      experimental evaluation to see if/when this ACK helps or
1537      hurts. (We might even want another option.) */
1538   send_channel_data_ack (ch);
1539 }
1540
1541
1542 /**
1543  * Function called once the tunnel has sent one of our messages.
1544  * If the message is unreliable, simply frees the `crm`. If the
1545  * message was reliable, calculate retransmission time and
1546  * wait for ACK (or retransmit).
1547  *
1548  * @param cls the `struct CadetReliableMessage` that was sent
1549  * @param cid identifier of the connection within the tunnel, NULL
1550  *            if transmission failed
1551  */
1552 static void
1553 data_sent_cb (void *cls,
1554               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid);
1555
1556
1557 /**
1558  * We need to retry a transmission, the last one took too long to
1559  * be acknowledged.
1560  *
1561  * @param cls the `struct CadetChannel` where we need to retransmit
1562  */
1563 static void
1564 retry_transmission (void *cls)
1565 {
1566   struct CadetChannel *ch = cls;
1567   struct CadetReliableMessage *crm = ch->head_sent;
1568
1569   ch->retry_data_task = NULL;
1570   GNUNET_assert (NULL == crm->qe);
1571   LOG (GNUNET_ERROR_TYPE_DEBUG,
1572        "Retrying transmission on %s of message %u\n",
1573        GCCH_2s (ch),
1574        (unsigned int) ntohl (crm->data_message->mid.mid));
1575   crm->qe = GCT_send (ch->t,
1576                       &crm->data_message->header,
1577                       &data_sent_cb,
1578                       crm);
1579   GNUNET_assert (NULL == ch->retry_data_task);
1580 }
1581
1582
1583 /**
1584  * We got an PLAINTEXT_DATA_ACK for a message in our queue, remove it from
1585  * the queue and tell our client that it can send more.
1586  *
1587  * @param ch the channel that got the PLAINTEXT_DATA_ACK
1588  * @param cti identifier of the connection that delivered the message
1589  * @param crm the message that got acknowledged
1590  */
1591 static void
1592 handle_matching_ack (struct CadetChannel *ch,
1593                      const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1594                      struct CadetReliableMessage *crm)
1595 {
1596   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1597                                ch->tail_sent,
1598                                crm);
1599   ch->pending_messages--;
1600   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1601   LOG (GNUNET_ERROR_TYPE_DEBUG,
1602        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1603        GCCH_2s (ch),
1604        (unsigned int) ntohl (crm->data_message->mid.mid),
1605        ch->pending_messages);
1606   if (NULL != crm->qe)
1607   {
1608     GCT_send_cancel (crm->qe);
1609     crm->qe = NULL;
1610   }
1611   if ( (1 == crm->num_transmissions) &&
1612        (NULL != cti) )
1613   {
1614     GCC_ack_observed (cti);
1615     if (0 == memcmp (cti,
1616                      &crm->connection_taken,
1617                      sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier)))
1618     {
1619       GCC_latency_observed (cti,
1620                             GNUNET_TIME_absolute_get_duration (crm->first_transmission_time));
1621     }
1622   }
1623   GNUNET_free (crm->data_message);
1624   GNUNET_free (crm);
1625   send_ack_to_client (ch,
1626                       (NULL == ch->owner)
1627                       ? GNUNET_NO
1628                       : GNUNET_YES);
1629 }
1630
1631
1632 /**
1633  * We got an acknowledgement for payload data for a channel.
1634  * Possibly resume transmissions.
1635  *
1636  * @param ch channel that got the ack
1637  * @param cti identifier of the connection that delivered the message
1638  * @param ack details about what was received
1639  */
1640 void
1641 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1642                                         const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti,
1643                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1644 {
1645   struct CadetReliableMessage *crm;
1646   struct CadetReliableMessage *crmn;
1647   int found;
1648   uint32_t mid_base;
1649   uint64_t mid_mask;
1650   unsigned int delta;
1651
1652   GNUNET_break (GNUNET_NO == ch->is_loopback);
1653   if (GNUNET_NO == ch->reliable)
1654   {
1655     /* not expecting ACKs on unreliable channel, odd */
1656     GNUNET_break_op (0);
1657     return;
1658   }
1659   /* mid_base is the MID of the next message that the
1660      other peer expects (i.e. that is missing!), everything
1661      LOWER (but excluding mid_base itself) was received. */
1662   mid_base = ntohl (ack->mid.mid);
1663   mid_mask = GNUNET_htonll (ack->futures);
1664   found = GNUNET_NO;
1665   for (crm = ch->head_sent;
1666        NULL != crm;
1667        crm = crmn)
1668   {
1669     crmn = crm->next;
1670     delta = (unsigned int) (ntohl (crm->data_message->mid.mid) - mid_base);
1671     if (delta >= UINT_MAX - ch->max_pending_messages)
1672     {
1673       /* overflow, means crm was a bit in the past, so this ACK counts for it. */
1674       LOG (GNUNET_ERROR_TYPE_DEBUG,
1675            "Got DATA_ACK with base %u satisfying past message %u on %s\n",
1676            (unsigned int) mid_base,
1677            ntohl (crm->data_message->mid.mid),
1678            GCCH_2s (ch));
1679       handle_matching_ack (ch,
1680                            cti,
1681                            crm);
1682       found = GNUNET_YES;
1683       continue;
1684     }
1685     delta--;
1686     if (delta >= 64)
1687       continue;
1688     LOG (GNUNET_ERROR_TYPE_DEBUG,
1689          "Testing bit %llX for mid %u (base: %u)\n",
1690          (1LLU << delta),
1691          ntohl (crm->data_message->mid.mid),
1692          mid_base);
1693     if (0 != (mid_mask & (1LLU << delta)))
1694     {
1695       LOG (GNUNET_ERROR_TYPE_DEBUG,
1696            "Got DATA_ACK with mask for %u on %s\n",
1697            ntohl (crm->data_message->mid.mid),
1698            GCCH_2s (ch));
1699       handle_matching_ack (ch,
1700                            cti,
1701                            crm);
1702       found = GNUNET_YES;
1703     }
1704   }
1705   if (GNUNET_NO == found)
1706   {
1707     /* ACK for message we already dropped, might have been a
1708        duplicate ACK? Ignore. */
1709     LOG (GNUNET_ERROR_TYPE_DEBUG,
1710          "Duplicate DATA_ACK on %s, ignoring\n",
1711          GCCH_2s (ch));
1712     GNUNET_STATISTICS_update (stats,
1713                               "# duplicate DATA_ACKs",
1714                               1,
1715                               GNUNET_NO);
1716     return;
1717   }
1718   if (NULL != ch->retry_data_task)
1719   {
1720     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1721     ch->retry_data_task = NULL;
1722   }
1723   if ( (NULL != ch->head_sent) &&
1724        (NULL == ch->head_sent->qe) )
1725     ch->retry_data_task
1726       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1727                                  &retry_transmission,
1728                                  ch);
1729 }
1730
1731
1732 /**
1733  * Destroy channel, based on the other peer closing the
1734  * connection.  Also needs to remove this channel from
1735  * the tunnel.
1736  *
1737  * @param ch channel to destroy
1738  * @param cti identifier of the connection that delivered the message,
1739  *            NULL if we are simulating receiving a destroy due to shutdown
1740  */
1741 void
1742 GCCH_handle_remote_destroy (struct CadetChannel *ch,
1743                             const struct GNUNET_CADET_ConnectionTunnelIdentifier *cti)
1744 {
1745   struct CadetChannelClient *ccc;
1746
1747   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1748   LOG (GNUNET_ERROR_TYPE_DEBUG,
1749        "Received remote channel DESTROY for %s\n",
1750        GCCH_2s (ch));
1751   if (GNUNET_YES == ch->destroy)
1752   {
1753     /* Local client already gone, this is instant-death. */
1754     channel_destroy (ch);
1755     return;
1756   }
1757   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1758   if ( (NULL != ccc) &&
1759        (NULL != ccc->head_recv) )
1760   {
1761     LOG (GNUNET_ERROR_TYPE_WARNING,
1762          "Lost end of transmission due to remote shutdown on %s\n",
1763          GCCH_2s (ch));
1764     /* FIXME: change API to notify client about truncated transmission! */
1765   }
1766   ch->destroy = GNUNET_YES;
1767   if (NULL != ccc)
1768     GSC_handle_remote_channel_destroy (ccc->c,
1769                                        ccc->ccn,
1770                                        ch);
1771   channel_destroy (ch);
1772 }
1773
1774
1775 /**
1776  * Test if element @a e1 comes before element @a e2.
1777  *
1778  * @param cls closure, to a flag where we indicate duplicate packets
1779  * @param crm1 an element of to sort
1780  * @param crm2 another element to sort
1781  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1782  */
1783 static int
1784 cmp_crm_by_next_retry (void *cls,
1785                        struct CadetReliableMessage *crm1,
1786                        struct CadetReliableMessage *crm2)
1787 {
1788   if (crm1->next_retry.abs_value_us <
1789       crm2->next_retry.abs_value_us)
1790     return GNUNET_YES;
1791   return GNUNET_NO;
1792 }
1793
1794
1795 /**
1796  * Function called once the tunnel has sent one of our messages.
1797  * If the message is unreliable, simply frees the `crm`. If the
1798  * message was reliable, calculate retransmission time and
1799  * wait for ACK (or retransmit).
1800  *
1801  * @param cls the `struct CadetReliableMessage` that was sent
1802  * @param cid identifier of the connection within the tunnel, NULL
1803  *            if transmission failed
1804  */
1805 static void
1806 data_sent_cb (void *cls,
1807               const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
1808 {
1809   struct CadetReliableMessage *crm = cls;
1810   struct CadetChannel *ch = crm->ch;
1811
1812   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1813   GNUNET_assert (NULL != crm->qe);
1814   crm->qe = NULL;
1815   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1816                                ch->tail_sent,
1817                                crm);
1818   if (GNUNET_NO == ch->reliable)
1819   {
1820     GNUNET_free (crm->data_message);
1821     GNUNET_free (crm);
1822     ch->pending_messages--;
1823     send_ack_to_client (ch,
1824                         (NULL == ch->owner)
1825                         ? GNUNET_NO
1826                         : GNUNET_YES);
1827     return;
1828   }
1829   if (NULL == cid)
1830   {
1831     /* There was an error sending. */
1832     crm->num_transmissions = GNUNET_SYSERR;
1833   }
1834   else if (GNUNET_SYSERR != crm->num_transmissions)
1835   {
1836     /* Increment transmission counter, and possibly store @a cid
1837        if this was the first transmission. */
1838     crm->num_transmissions++;
1839     if (1 == crm->num_transmissions)
1840     {
1841       crm->first_transmission_time = GNUNET_TIME_absolute_get ();
1842       crm->connection_taken = *cid;
1843       GCC_ack_expected (cid);
1844     }
1845   }
1846   if ( (0 == crm->retry_delay.rel_value_us) &&
1847        (NULL != cid) )
1848   {
1849     struct CadetConnection *cc = GCC_lookup (cid);
1850
1851     if (NULL != cc)
1852       crm->retry_delay = GCC_get_metrics (cc)->aged_latency;
1853     else
1854       crm->retry_delay = ch->retry_time;
1855   }
1856   crm->retry_delay = GNUNET_TIME_STD_BACKOFF (crm->retry_delay);
1857   crm->retry_delay = GNUNET_TIME_relative_max (crm->retry_delay,
1858                                                MIN_RTT_DELAY);
1859   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1860
1861   GNUNET_CONTAINER_DLL_insert_sorted (struct CadetReliableMessage,
1862                                       cmp_crm_by_next_retry,
1863                                       NULL,
1864                                       ch->head_sent,
1865                                       ch->tail_sent,
1866                                       crm);
1867   LOG (GNUNET_ERROR_TYPE_DEBUG,
1868        "Message %u sent, next transmission on %s in %s\n",
1869        (unsigned int) ntohl (crm->data_message->mid.mid),
1870        GCCH_2s (ch),
1871        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (ch->head_sent->next_retry),
1872                                                GNUNET_YES));
1873   if (NULL == ch->head_sent->qe)
1874   {
1875     if (NULL != ch->retry_data_task)
1876       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1877     ch->retry_data_task
1878       = GNUNET_SCHEDULER_add_at (ch->head_sent->next_retry,
1879                                  &retry_transmission,
1880                                  ch);
1881   }
1882 }
1883
1884
1885 /**
1886  * Handle data given by a client.
1887  *
1888  * Check whether the client is allowed to send in this tunnel, save if
1889  * channel is reliable and send an ACK to the client if there is still
1890  * buffer space in the tunnel.
1891  *
1892  * @param ch Channel.
1893  * @param sender_ccn ccn of the sender
1894  * @param buf payload to transmit.
1895  * @param buf_len number of bytes in @a buf
1896  * @return #GNUNET_OK if everything goes well,
1897  *         #GNUNET_SYSERR in case of an error.
1898  */
1899 int
1900 GCCH_handle_local_data (struct CadetChannel *ch,
1901                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1902                         const char *buf,
1903                         size_t buf_len)
1904 {
1905   struct CadetReliableMessage *crm;
1906
1907   if (ch->pending_messages >= ch->max_pending_messages)
1908   {
1909     GNUNET_break (0); /* Fails: #5370 */
1910     return GNUNET_SYSERR;
1911   }
1912   if (GNUNET_YES == ch->destroy)
1913   {
1914     /* we are going down, drop messages */
1915     return GNUNET_OK;
1916   }
1917   ch->pending_messages++;
1918
1919   if (GNUNET_YES == ch->is_loopback)
1920   {
1921     struct CadetChannelClient *receiver;
1922     struct GNUNET_MQ_Envelope *env;
1923     struct GNUNET_CADET_LocalData *ld;
1924     int ack_to_owner;
1925
1926     env = GNUNET_MQ_msg_extra (ld,
1927                                buf_len,
1928                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1929     if ( (NULL != ch->owner) &&
1930          (sender_ccn.channel_of_client ==
1931           ch->owner->ccn.channel_of_client) )
1932     {
1933       receiver = ch->dest;
1934       ack_to_owner = GNUNET_YES;
1935     }
1936     else if ( (NULL != ch->dest) &&
1937               (sender_ccn.channel_of_client ==
1938                ch->dest->ccn.channel_of_client) )
1939     {
1940       receiver = ch->owner;
1941       ack_to_owner = GNUNET_NO;
1942     }
1943     else
1944     {
1945       GNUNET_break (0);
1946       return GNUNET_SYSERR;
1947     }
1948     GNUNET_assert (NULL != receiver);
1949     ld->ccn = receiver->ccn;
1950     GNUNET_memcpy (&ld[1],
1951                    buf,
1952                    buf_len);
1953     if (GNUNET_YES == receiver->client_ready)
1954     {
1955       ch->pending_messages--;
1956       GSC_send_to_client (receiver->c,
1957                           env);
1958       send_ack_to_client (ch,
1959                           ack_to_owner);
1960     }
1961     else
1962     {
1963       struct CadetOutOfOrderMessage *oom;
1964
1965       oom = GNUNET_new (struct CadetOutOfOrderMessage);
1966       oom->env = env;
1967       GNUNET_CONTAINER_DLL_insert_tail (receiver->head_recv,
1968                                         receiver->tail_recv,
1969                                         oom);
1970       receiver->num_recv++;
1971     }
1972     return GNUNET_OK;
1973   }
1974
1975   /* Everything is correct, send the message. */
1976   crm = GNUNET_malloc (sizeof (*crm));
1977   crm->ch = ch;
1978   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1979                                      + buf_len);
1980   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1981   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1982   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1983   crm->data_message->mid = ch->mid_send;
1984   crm->data_message->ctn = ch->ctn;
1985   GNUNET_memcpy (&crm->data_message[1],
1986                  buf,
1987                  buf_len);
1988   GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1989                                     ch->tail_sent,
1990                                     crm);
1991   LOG (GNUNET_ERROR_TYPE_DEBUG,
1992        "Sending message %u from local client to %s with %u bytes\n",
1993        ntohl (crm->data_message->mid.mid),
1994        GCCH_2s (ch),
1995        buf_len);
1996   if (NULL != ch->retry_data_task)
1997   {
1998     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1999     ch->retry_data_task = NULL;
2000   }
2001   crm->qe = GCT_send (ch->t,
2002                       &crm->data_message->header,
2003                       &data_sent_cb,
2004                       crm);
2005   GNUNET_assert (NULL == ch->retry_data_task);
2006   return GNUNET_OK;
2007 }
2008
2009
2010 /**
2011  * Handle ACK from client on local channel.  Means the client is ready
2012  * for more data, see if we have any for it.
2013  *
2014  * @param ch channel to destroy
2015  * @param client_ccn ccn of the client sending the ack
2016  */
2017 void
2018 GCCH_handle_local_ack (struct CadetChannel *ch,
2019                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
2020 {
2021   struct CadetChannelClient *ccc;
2022   struct CadetOutOfOrderMessage *com;
2023
2024   if ( (NULL != ch->owner) &&
2025        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
2026     ccc = ch->owner;
2027   else if ( (NULL != ch->dest) &&
2028             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
2029     ccc = ch->dest;
2030   else
2031     GNUNET_assert (0);
2032   ccc->client_ready = GNUNET_YES;
2033   com = ccc->head_recv;
2034   if (NULL == com)
2035   {
2036     LOG (GNUNET_ERROR_TYPE_DEBUG,
2037          "Got LOCAL_ACK, %s-%X ready to receive more data, but none pending on %s-%X(%p)!\n",
2038          GSC_2s (ccc->c),
2039          ntohl (client_ccn.channel_of_client),
2040          GCCH_2s (ch),
2041          ntohl (ccc->ccn.channel_of_client),
2042          ccc);
2043     return; /* none pending */
2044   }
2045   if (GNUNET_YES == ch->is_loopback)
2046   {
2047     int to_owner;
2048
2049     /* Messages are always in-order, just send */
2050     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
2051                                  ccc->tail_recv,
2052                                  com);
2053     ccc->num_recv--;
2054     GSC_send_to_client (ccc->c,
2055                         com->env);
2056     /* Notify sender that we can receive more */
2057     if ( (NULL != ch->owner) &&
2058          (ccc->ccn.channel_of_client ==
2059           ch->owner->ccn.channel_of_client) )
2060     {
2061       to_owner = GNUNET_NO;
2062     }
2063     else
2064     {
2065       GNUNET_assert ( (NULL != ch->dest) &&
2066                       (ccc->ccn.channel_of_client ==
2067                        ch->dest->ccn.channel_of_client) );
2068       to_owner = GNUNET_YES;
2069     }
2070     send_ack_to_client (ch,
2071                         to_owner);
2072     GNUNET_free (com);
2073     return;
2074   }
2075
2076   if ( (com->mid.mid != ch->mid_recv.mid) &&
2077        (GNUNET_NO == ch->out_of_order) &&
2078        (GNUNET_YES == ch->reliable) )
2079   {
2080     LOG (GNUNET_ERROR_TYPE_DEBUG,
2081          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
2082          GSC_2s (ccc->c),
2083          ntohl (ccc->ccn.channel_of_client),
2084          ntohl (com->mid.mid),
2085          ntohl (ch->mid_recv.mid));
2086     return; /* missing next one in-order */
2087   }
2088
2089   LOG (GNUNET_ERROR_TYPE_DEBUG,
2090        "Got LOCAL_ACK, giving payload message %u to %s-%X on %s\n",
2091        ntohl (com->mid.mid),
2092        GSC_2s (ccc->c),
2093        ntohl (ccc->ccn.channel_of_client),
2094        GCCH_2s (ch));
2095
2096   /* all good, pass next message to client */
2097   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
2098                                ccc->tail_recv,
2099                                com);
2100   ccc->num_recv--;
2101   /* FIXME: if unreliable, this is not aggressive
2102      enough, as it would be OK to have lost some! */
2103
2104   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
2105   ch->mid_futures >>= 1; /* equivalent to division by 2 */
2106   ccc->client_ready = GNUNET_NO;
2107   GSC_send_to_client (ccc->c,
2108                       com->env);
2109   GNUNET_free (com);
2110   send_channel_data_ack (ch);
2111   if (NULL != ccc->head_recv)
2112     return;
2113   if (GNUNET_NO == ch->destroy)
2114     return;
2115   GCT_send_channel_destroy (ch->t,
2116                             ch->ctn);
2117   channel_destroy (ch);
2118 }
2119
2120
2121 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
2122
2123
2124 /**
2125  * Log channel info.
2126  *
2127  * @param ch Channel.
2128  * @param level Debug level to use.
2129  */
2130 void
2131 GCCH_debug (struct CadetChannel *ch,
2132             enum GNUNET_ErrorType level)
2133 {
2134 #if !defined(GNUNET_CULL_LOGGING)
2135   int do_log;
2136
2137   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
2138                                        "cadet-chn",
2139                                        __FILE__, __FUNCTION__, __LINE__);
2140   if (0 == do_log)
2141     return;
2142
2143   if (NULL == ch)
2144   {
2145     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
2146     return;
2147   }
2148   LOG2 (level,
2149         "CHN %s:%X (%p)\n",
2150         GCT_2s (ch->t),
2151         ch->ctn,
2152         ch);
2153   if (NULL != ch->owner)
2154   {
2155     LOG2 (level,
2156           "CHN origin %s ready %s local-id: %u\n",
2157           GSC_2s (ch->owner->c),
2158           ch->owner->client_ready ? "YES" : "NO",
2159           ntohl (ch->owner->ccn.channel_of_client));
2160   }
2161   if (NULL != ch->dest)
2162   {
2163     LOG2 (level,
2164           "CHN destination %s ready %s local-id: %u\n",
2165           GSC_2s (ch->dest->c),
2166           ch->dest->client_ready ? "YES" : "NO",
2167           ntohl (ch->dest->ccn.channel_of_client));
2168   }
2169   LOG2 (level,
2170         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
2171         ntohl (ch->mid_recv.mid),
2172         (unsigned long long) ch->mid_futures,
2173         ntohl (ch->mid_send.mid));
2174 #endif
2175 }
2176
2177
2178
2179 /* end of gnunet-service-cadet-new_channel.c */