handle duplicate DATA packets
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - FIXME: send ACKs back to loopback clients!
29  *
30  * - introduce shutdown so we can have half-closed channels, modify
31  *   destroy to include MID to have FIN-ACK equivalents, etc.
32  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
33  * - check that '0xFFULL' really is sufficient for flow control!
34  * - revisit handling of 'unreliable' traffic!
35  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
36  * - figure out flow control without ACKs (unreliable traffic!)
37  */
38 #include "platform.h"
39 #include "gnunet_util_lib.h"
40 #include "cadet.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-cadet-new.h"
43 #include "gnunet-service-cadet-new_channel.h"
44 #include "gnunet-service-cadet-new_connection.h"
45 #include "gnunet-service-cadet-new_tunnels.h"
46 #include "gnunet-service-cadet-new_peer.h"
47 #include "gnunet-service-cadet-new_paths.h"
48
49 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50
51 /**
52  * How long do we initially wait before retransmitting?
53  */
54 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55
56 /**
57  * How long do we wait before dropping state about incoming
58  * connection to closed port?
59  */
60 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
61
62
63 /**
64  * All the states a connection can be in.
65  */
66 enum CadetChannelState
67 {
68   /**
69    * Uninitialized status, should never appear in operation.
70    */
71   CADET_CHANNEL_NEW,
72
73   /**
74    * Connection create message sent, waiting for ACK.
75    */
76   CADET_CHANNEL_OPEN_SENT,
77
78   /**
79    * Connection confirmed, ready to carry traffic.
80    */
81   CADET_CHANNEL_READY
82 };
83
84
85 /**
86  * Info needed to retry a message in case it gets lost.
87  * Note that we DO use this structure also for unreliable
88  * messages.
89  */
90 struct CadetReliableMessage
91 {
92   /**
93    * Double linked list, FIFO style
94    */
95   struct CadetReliableMessage *next;
96
97   /**
98    * Double linked list, FIFO style
99    */
100   struct CadetReliableMessage *prev;
101
102   /**
103    * Which channel is this message in?
104    */
105   struct CadetChannel *ch;
106
107   /**
108    * Entry in the tunnels queue for this message, NULL if it has left
109    * the tunnel.  Used to cancel transmission in case we receive an
110    * ACK in time.
111    */
112   struct CadetTunnelQueueEntry *qe;
113
114   /**
115    * How soon should we retry if we fail to get an ACK?
116    * Messages in the queue are sorted by this value.
117    */
118   struct GNUNET_TIME_Absolute next_retry;
119
120   /**
121    * How long do we wait for an ACK after transmission?
122    * Use for the back-off calculation.
123    */
124   struct GNUNET_TIME_Relative retry_delay;
125
126   /**
127    * Data message we are trying to send.
128    */
129   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
130
131 };
132
133
134 /**
135  * List of received out-of-order data messages.
136  */
137 struct CadetOutOfOrderMessage
138 {
139   /**
140    * Double linked list, FIFO style
141    */
142   struct CadetOutOfOrderMessage *next;
143
144   /**
145    * Double linked list, FIFO style
146    */
147   struct CadetOutOfOrderMessage *prev;
148
149   /**
150    * ID of the message (messages up to this point needed
151    * before we give this one to the client).
152    */
153   struct ChannelMessageIdentifier mid;
154
155   /**
156    * The envelope with the payload of the out-of-order message
157    */
158   struct GNUNET_MQ_Envelope *env;
159
160 };
161
162
163 /**
164  * Client endpoint of a `struct CadetChannel`.  A channel may be a
165  * loopback channel, in which case it has two of these endpoints.
166  * Note that flow control also is required in both directions.
167  */
168 struct CadetChannelClient
169 {
170   /**
171    * Client handle.  Not by itself sufficient to designate
172    * the client endpoint, as the same client handle may
173    * be used for both the owner and the destination, and
174    * we thus also need the channel ID to identify the client.
175    */
176   struct CadetClient *c;
177
178   /**
179    * Head of DLL of messages received out of order or while client was unready.
180    */
181   struct CadetOutOfOrderMessage *head_recv;
182
183   /**
184    * Tail DLL of messages received out of order or while client was unready.
185    */
186   struct CadetOutOfOrderMessage *tail_recv;
187
188   /**
189    * Local tunnel number for this client.
190    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
191    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
192    */
193   struct GNUNET_CADET_ClientChannelNumber ccn;
194
195   /**
196    * Can we send data to the client?
197    */
198   int client_ready;
199
200 };
201
202
203 /**
204  * Struct containing all information regarding a channel to a remote client.
205  */
206 struct CadetChannel
207 {
208   /**
209    * Tunnel this channel is in.
210    */
211   struct CadetTunnel *t;
212
213   /**
214    * Client owner of the tunnel, if any.
215    * (Used if this channel represends the initiating end of the tunnel.)
216    */
217   struct CadetChannelClient *owner;
218
219   /**
220    * Client destination of the tunnel, if any.
221    * (Used if this channel represents the listening end of the tunnel.)
222    */
223   struct CadetChannelClient *dest;
224
225   /**
226    * Last entry in the tunnel's queue relating to control messages
227    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
228    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
229    * transmission in case we receive updated information.
230    */
231   struct CadetTunnelQueueEntry *last_control_qe;
232
233   /**
234    * Head of DLL of messages sent and not yet ACK'd.
235    */
236   struct CadetReliableMessage *head_sent;
237
238   /**
239    * Tail of DLL of messages sent and not yet ACK'd.
240    */
241   struct CadetReliableMessage *tail_sent;
242
243   /**
244    * Task to resend/poll in case no ACK is received.
245    */
246   struct GNUNET_SCHEDULER_Task *retry_control_task;
247
248   /**
249    * Task to resend/poll in case no ACK is received.
250    */
251   struct GNUNET_SCHEDULER_Task *retry_data_task;
252
253   /**
254    * Last time the channel was used
255    */
256   struct GNUNET_TIME_Absolute timestamp;
257
258   /**
259    * Destination port of the channel.
260    */
261   struct GNUNET_HashCode port;
262
263   /**
264    * Counter for exponential backoff.
265    */
266   struct GNUNET_TIME_Relative retry_time;
267
268   /**
269    * How long does it usually take to get an ACK.
270    */
271   struct GNUNET_TIME_Relative expected_delay;
272
273   /**
274    * Bitfield of already-received messages past @e mid_recv.
275    */
276   uint64_t mid_futures;
277
278   /**
279    * Next MID expected for incoming traffic.
280    */
281   struct ChannelMessageIdentifier mid_recv;
282
283   /**
284    * Next MID to use for outgoing traffic.
285    */
286   struct ChannelMessageIdentifier mid_send;
287
288   /**
289    * Total (reliable) messages pending ACK for this channel.
290    */
291   unsigned int pending_messages;
292
293   /**
294    * Maximum (reliable) messages pending ACK for this channel
295    * before we throttle the client.
296    */
297   unsigned int max_pending_messages;
298
299   /**
300    * Number identifying this channel in its tunnel.
301    */
302   struct GNUNET_CADET_ChannelTunnelNumber ctn;
303
304   /**
305    * Channel state.
306    */
307   enum CadetChannelState state;
308
309   /**
310    * Is the tunnel bufferless (minimum latency)?
311    */
312   int nobuffer;
313
314   /**
315    * Is the tunnel reliable?
316    */
317   int reliable;
318
319   /**
320    * Is the tunnel out-of-order?
321    */
322   int out_of_order;
323
324   /**
325    * Is this channel a loopback channel, where the destination is us again?
326    */
327   int is_loopback;
328
329   /**
330    * Flag to signal the destruction of the channel.  If this is set to
331    * #GNUNET_YES the channel will be destroyed once the queue is
332    * empty.
333    */
334   int destroy;
335
336 };
337
338
339 /**
340  * Get the static string for identification of the channel.
341  *
342  * @param ch Channel.
343  *
344  * @return Static string with the channel IDs.
345  */
346 const char *
347 GCCH_2s (const struct CadetChannel *ch)
348 {
349   static char buf[128];
350
351   GNUNET_snprintf (buf,
352                    sizeof (buf),
353                    "Channel %s:%s ctn:%X(%X/%X)",
354                    (GNUNET_YES == ch->is_loopback)
355                    ? "loopback"
356                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
357                    GNUNET_h2s (&ch->port),
358                    ch->ctn,
359                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
360                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
361   return buf;
362 }
363
364
365 /**
366  * Get the channel's public ID.
367  *
368  * @param ch Channel.
369  *
370  * @return ID used to identify the channel with the remote peer.
371  */
372 struct GNUNET_CADET_ChannelTunnelNumber
373 GCCH_get_id (const struct CadetChannel *ch)
374 {
375   return ch->ctn;
376 }
377
378
379 /**
380  * Release memory associated with @a ccc
381  *
382  * @param ccc data structure to clean up
383  */
384 static void
385 free_channel_client (struct CadetChannelClient *ccc)
386 {
387   struct CadetOutOfOrderMessage *com;
388
389   while (NULL != (com = ccc->head_recv))
390   {
391     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
392                                  ccc->tail_recv,
393                                  com);
394     GNUNET_MQ_discard (com->env);
395     GNUNET_free (com);
396   }
397   GNUNET_free (ccc);
398 }
399
400
401 /**
402  * Destroy the given channel.
403  *
404  * @param ch channel to destroy
405  */
406 static void
407 channel_destroy (struct CadetChannel *ch)
408 {
409   struct CadetReliableMessage *crm;
410
411   while (NULL != (crm = ch->head_sent))
412   {
413     GNUNET_assert (ch == crm->ch);
414     if (NULL != crm->qe)
415     {
416       GCT_send_cancel (crm->qe);
417       crm->qe = NULL;
418     }
419     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
420                                  ch->tail_sent,
421                                  crm);
422     GNUNET_free (crm->data_message);
423     GNUNET_free (crm);
424   }
425   if (NULL != ch->owner)
426   {
427     free_channel_client (ch->owner);
428     ch->owner = NULL;
429   }
430   if (NULL != ch->dest)
431   {
432     free_channel_client (ch->dest);
433     ch->dest = NULL;
434   }
435   if (NULL != ch->last_control_qe)
436   {
437     GCT_send_cancel (ch->last_control_qe);
438     ch->last_control_qe = NULL;
439   }
440   if (NULL != ch->retry_data_task)
441   {
442     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
443     ch->retry_data_task = NULL;
444   }
445   if (NULL != ch->retry_control_task)
446   {
447     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
448     ch->retry_control_task = NULL;
449   }
450   if (GNUNET_NO == ch->is_loopback)
451   {
452     GCT_remove_channel (ch->t,
453                         ch,
454                         ch->ctn);
455     ch->t = NULL;
456   }
457   GNUNET_free (ch);
458 }
459
460
461 /**
462  * Send a channel create message.
463  *
464  * @param cls Channel for which to send.
465  */
466 static void
467 send_channel_open (void *cls);
468
469
470 /**
471  * Function called once the tunnel confirms that we sent the
472  * create message.  Delays for a bit until we retry.
473  *
474  * @param cls our `struct CadetChannel`.
475  */
476 static void
477 channel_open_sent_cb (void *cls)
478 {
479   struct CadetChannel *ch = cls;
480
481   GNUNET_assert (NULL != ch->last_control_qe);
482   ch->last_control_qe = NULL;
483   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
484   LOG (GNUNET_ERROR_TYPE_DEBUG,
485        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
486        GCCH_2s (ch),
487        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
488                                                GNUNET_YES));
489   ch->retry_control_task
490     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
491                                     &send_channel_open,
492                                     ch);
493 }
494
495
496 /**
497  * Send a channel open message.
498  *
499  * @param cls Channel for which to send.
500  */
501 static void
502 send_channel_open (void *cls)
503 {
504   struct CadetChannel *ch = cls;
505   struct GNUNET_CADET_ChannelOpenMessage msgcc;
506   uint32_t options;
507
508   ch->retry_control_task = NULL;
509   LOG (GNUNET_ERROR_TYPE_DEBUG,
510        "Sending CHANNEL_OPEN message for %s\n",
511        GCCH_2s (ch));
512   options = 0;
513   if (ch->nobuffer)
514     options |= GNUNET_CADET_OPTION_NOBUFFER;
515   if (ch->reliable)
516     options |= GNUNET_CADET_OPTION_RELIABLE;
517   if (ch->out_of_order)
518     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
519   msgcc.header.size = htons (sizeof (msgcc));
520   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
521   msgcc.opt = htonl (options);
522   msgcc.port = ch->port;
523   msgcc.ctn = ch->ctn;
524   ch->state = CADET_CHANNEL_OPEN_SENT;
525   ch->last_control_qe = GCT_send (ch->t,
526                                   &msgcc.header,
527                                   &channel_open_sent_cb,
528                                   ch);
529 }
530
531
532 /**
533  * Function called once and only once after a channel was bound
534  * to its tunnel via #GCT_add_channel() is ready for transmission.
535  * Note that this is only the case for channels that this peer
536  * initiates, as for incoming channels we assume that they are
537  * ready for transmission immediately upon receiving the open
538  * message.  Used to bootstrap the #GCT_send() process.
539  *
540  * @param ch the channel for which the tunnel is now ready
541  */
542 void
543 GCCH_tunnel_up (struct CadetChannel *ch)
544 {
545   GNUNET_assert (NULL == ch->retry_control_task);
546   LOG (GNUNET_ERROR_TYPE_DEBUG,
547        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
548        GCCH_2s (ch));
549   ch->retry_control_task
550     = GNUNET_SCHEDULER_add_now (&send_channel_open,
551                                 ch);
552 }
553
554
555 /**
556  * Create a new channel.
557  *
558  * @param owner local client owning the channel
559  * @param ccn local number of this channel at the @a owner
560  * @param destination peer to which we should build the channel
561  * @param port desired port at @a destination
562  * @param options options for the channel
563  * @return handle to the new channel
564  */
565 struct CadetChannel *
566 GCCH_channel_local_new (struct CadetClient *owner,
567                         struct GNUNET_CADET_ClientChannelNumber ccn,
568                         struct CadetPeer *destination,
569                         const struct GNUNET_HashCode *port,
570                         uint32_t options)
571 {
572   struct CadetChannel *ch;
573   struct CadetChannelClient *ccco;
574
575   ccco = GNUNET_new (struct CadetChannelClient);
576   ccco->c = owner;
577   ccco->ccn = ccn;
578   ccco->client_ready = GNUNET_YES;
579
580   ch = GNUNET_new (struct CadetChannel);
581   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
582   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
583   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
584   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
585   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
586   ch->owner = ccco;
587   ch->port = *port;
588   if (0 == memcmp (&my_full_id,
589                    GCP_get_id (destination),
590                    sizeof (struct GNUNET_PeerIdentity)))
591   {
592     struct CadetClient *c;
593
594     ch->is_loopback = GNUNET_YES;
595     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
596                                            port);
597     if (NULL == c)
598     {
599       /* port closed, wait for it to possibly open */
600       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
601                                                 port,
602                                                 ch,
603                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
604       LOG (GNUNET_ERROR_TYPE_DEBUG,
605            "Created loose incoming loopback channel to port %s\n",
606            GNUNET_h2s (&ch->port));
607     }
608     else
609     {
610       ch->dest = GNUNET_new (struct CadetChannelClient);
611       ch->dest->c = c;
612       ch->dest->client_ready = GNUNET_YES;
613       GCCH_bind (ch,
614                  ch->dest->c);
615     }
616   }
617   else
618   {
619     ch->t = GCP_get_tunnel (destination,
620                             GNUNET_YES);
621     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
622     ch->ctn = GCT_add_channel (ch->t,
623                                ch);
624   }
625   GNUNET_STATISTICS_update (stats,
626                             "# channels",
627                             1,
628                             GNUNET_NO);
629   LOG (GNUNET_ERROR_TYPE_DEBUG,
630        "Created channel to port %s at peer %s for %s using %s\n",
631        GNUNET_h2s (port),
632        GCP_2s (destination),
633        GSC_2s (owner),
634        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
635   return ch;
636 }
637
638
639 /**
640  * We had an incoming channel to a port that is closed.
641  * It has not been opened for a while, drop it.
642  *
643  * @param cls the channel to drop
644  */
645 static void
646 timeout_closed_cb (void *cls)
647 {
648   struct CadetChannel *ch = cls;
649
650   ch->retry_control_task = NULL;
651   LOG (GNUNET_ERROR_TYPE_DEBUG,
652        "Closing incoming channel to port %s from peer %s due to timeout\n",
653        GNUNET_h2s (&ch->port),
654        GCP_2s (GCT_get_destination (ch->t)));
655   channel_destroy (ch);
656 }
657
658
659 /**
660  * Create a new channel based on a request coming in over the network.
661  *
662  * @param t tunnel to the remote peer
663  * @param ctn identifier of this channel in the tunnel
664  * @param port desired local port
665  * @param options options for the channel
666  * @return handle to the new channel
667  */
668 struct CadetChannel *
669 GCCH_channel_incoming_new (struct CadetTunnel *t,
670                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
671                            const struct GNUNET_HashCode *port,
672                            uint32_t options)
673 {
674   struct CadetChannel *ch;
675   struct CadetClient *c;
676
677   ch = GNUNET_new (struct CadetChannel);
678   ch->port = *port;
679   ch->t = t;
680   ch->ctn = ctn;
681   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
682   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
683   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
684   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
685   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
686   GNUNET_STATISTICS_update (stats,
687                             "# channels",
688                             1,
689                             GNUNET_NO);
690
691   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
692                                          port);
693   if (NULL == c)
694   {
695     /* port closed, wait for it to possibly open */
696     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
697                                               port,
698                                               ch,
699                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
700     ch->retry_control_task
701       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
702                                       &timeout_closed_cb,
703                                       ch);
704     LOG (GNUNET_ERROR_TYPE_DEBUG,
705          "Created loose incoming channel to port %s from peer %s\n",
706          GNUNET_h2s (&ch->port),
707          GCP_2s (GCT_get_destination (ch->t)));
708   }
709   else
710   {
711     GCCH_bind (ch,
712                c);
713   }
714   GNUNET_STATISTICS_update (stats,
715                             "# channels",
716                             1,
717                             GNUNET_NO);
718   return ch;
719 }
720
721
722 /**
723  * Function called once the tunnel confirms that we sent the
724  * ACK message.  Just remembers it was sent, we do not expect
725  * ACKs for ACKs ;-).
726  *
727  * @param cls our `struct CadetChannel`.
728  */
729 static void
730 send_ack_cb (void *cls)
731 {
732   struct CadetChannel *ch = cls;
733
734   GNUNET_assert (NULL != ch->last_control_qe);
735   ch->last_control_qe = NULL;
736 }
737
738
739 /**
740  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
741  *
742  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
743  */
744 static void
745 send_channel_data_ack (struct CadetChannel *ch)
746 {
747   struct GNUNET_CADET_ChannelDataAckMessage msg;
748
749   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
750   msg.header.size = htons (sizeof (msg));
751   msg.ctn = ch->ctn;
752   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
753   msg.futures = GNUNET_htonll (ch->mid_futures);
754   if (NULL != ch->last_control_qe)
755     GCT_send_cancel (ch->last_control_qe);
756   ch->last_control_qe = GCT_send (ch->t,
757                                   &msg.header,
758                                   &send_ack_cb,
759                                   ch);
760 }
761
762
763 /**
764  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
765  * connection is up.
766  *
767  * @param cls the `struct CadetChannel`
768  */
769 static void
770 send_open_ack (void *cls)
771 {
772   struct CadetChannel *ch = cls;
773   struct GNUNET_CADET_ChannelManageMessage msg;
774
775   LOG (GNUNET_ERROR_TYPE_DEBUG,
776        "Sending CHANNEL_OPEN_ACK on %s\n",
777        GCCH_2s (ch));
778   ch->retry_control_task = NULL;
779   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
780   msg.header.size = htons (sizeof (msg));
781   msg.reserved = htonl (0);
782   msg.ctn = ch->ctn;
783   if (NULL != ch->last_control_qe)
784     GCT_send_cancel (ch->last_control_qe);
785   ch->last_control_qe = GCT_send (ch->t,
786                                   &msg.header,
787                                   &send_ack_cb,
788                                   ch);
789 }
790
791
792 /**
793  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
794  * this channel.  If the binding was successful, (re)transmit the
795  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
796  *
797  * @param ch channel that got the duplicate open
798  */
799 void
800 GCCH_handle_duplicate_open (struct CadetChannel *ch)
801 {
802   if (NULL == ch->dest)
803   {
804     LOG (GNUNET_ERROR_TYPE_DEBUG,
805          "Ignoring duplicate channel OPEN on %s: port is closed\n",
806          GCCH_2s (ch));
807     return;
808   }
809   if (NULL != ch->retry_control_task)
810   {
811     LOG (GNUNET_ERROR_TYPE_DEBUG,
812          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
813          GCCH_2s (ch));
814     return;
815   }
816   LOG (GNUNET_ERROR_TYPE_DEBUG,
817        "Retransmitting OPEN_ACK on %s\n",
818        GCCH_2s (ch));
819   ch->retry_control_task
820     = GNUNET_SCHEDULER_add_now (&send_open_ack,
821                                 ch);
822 }
823
824
825 /**
826  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
827  *
828  * @param ch channel the ack is for
829  * @param to_owner #GNUNET_YES to send to owner,
830  *                 #GNUNET_NO to send to dest
831  */
832 static void
833 send_ack_to_client (struct CadetChannel *ch,
834                     int to_owner)
835 {
836   struct GNUNET_MQ_Envelope *env;
837   struct GNUNET_CADET_LocalAck *ack;
838   struct CadetChannelClient *ccc;
839
840   env = GNUNET_MQ_msg (ack,
841                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
842   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
843   ack->ccn = ccc->ccn;
844   LOG (GNUNET_ERROR_TYPE_DEBUG,
845        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
846        GSC_2s (ccc->c),
847        (GNUNET_YES == to_owner) ? "owner" : "dest",
848        ntohl (ack->ccn.channel_of_client),
849        ch->pending_messages,
850        ch->max_pending_messages);
851   GSC_send_to_client (ccc->c,
852                       env);
853 }
854
855
856 /**
857  * A client is bound to the port that we have a channel
858  * open to.  Send the acknowledgement for the connection
859  * request and establish the link with the client.
860  *
861  * @param ch open incoming channel
862  * @param c client listening on the respective port
863  */
864 void
865 GCCH_bind (struct CadetChannel *ch,
866            struct CadetClient *c)
867 {
868   uint32_t options;
869   struct CadetChannelClient *cccd;
870
871   LOG (GNUNET_ERROR_TYPE_DEBUG,
872        "Binding %s from %s to port %s of %s\n",
873        GCCH_2s (ch),
874        GCT_2s (ch->t),
875        GNUNET_h2s (&ch->port),
876        GSC_2s (c));
877   if (NULL != ch->retry_control_task)
878   {
879     /* there might be a timeout task here */
880     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
881     ch->retry_control_task = NULL;
882   }
883   options = 0;
884   if (ch->nobuffer)
885     options |= GNUNET_CADET_OPTION_NOBUFFER;
886   if (ch->reliable)
887     options |= GNUNET_CADET_OPTION_RELIABLE;
888   if (ch->out_of_order)
889     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
890   cccd = GNUNET_new (struct CadetChannelClient);
891   ch->dest = cccd;
892   cccd->c = c;
893   cccd->client_ready = GNUNET_YES;
894   cccd->ccn = GSC_bind (c,
895                         ch,
896                         (GNUNET_YES == ch->is_loopback)
897                         ? GCP_get (&my_full_id,
898                                    GNUNET_YES)
899                         : GCT_get_destination (ch->t),
900                         &ch->port,
901                         options);
902   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
903                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
904   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
905   if (GNUNET_YES == ch->is_loopback)
906   {
907     ch->state = CADET_CHANNEL_OPEN_SENT;
908     GCCH_handle_channel_open_ack (ch);
909   }
910   else
911   {
912     /* notify other peer that we accepted the connection */
913     ch->retry_control_task
914       = GNUNET_SCHEDULER_add_now (&send_open_ack,
915                                   ch);
916   }
917   /* give client it's initial supply of ACKs */
918   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
919                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
920   for (unsigned int i=0;i<ch->max_pending_messages;i++)
921     send_ack_to_client (ch,
922                         GNUNET_NO);
923 }
924
925
926 /**
927  * Destroy locally created channel.  Called by the local client, so no
928  * need to tell the client.
929  *
930  * @param ch channel to destroy
931  * @param c client that caused the destruction
932  * @param ccn client number of the client @a c
933  */
934 void
935 GCCH_channel_local_destroy (struct CadetChannel *ch,
936                             struct CadetClient *c,
937                             struct GNUNET_CADET_ClientChannelNumber ccn)
938 {
939   LOG (GNUNET_ERROR_TYPE_DEBUG,
940        "%s asks for destruction of %s\n",
941        GSC_2s (c),
942        GCCH_2s (ch));
943   GNUNET_assert (NULL != c);
944   if ( (NULL != ch->owner) &&
945        (c == ch->owner->c) &&
946        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
947   {
948     free_channel_client (ch->owner);
949     ch->owner = NULL;
950   }
951   else if ( (NULL != ch->dest) &&
952             (c == ch->dest->c) &&
953             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
954   {
955     free_channel_client (ch->dest);
956     ch->dest = NULL;
957   }
958   else
959   {
960     GNUNET_assert (0);
961   }
962
963   if (GNUNET_YES == ch->destroy)
964   {
965     /* other end already destroyed, with the local client gone, no need
966        to finish transmissions, just destroy immediately. */
967     channel_destroy (ch);
968     return;
969   }
970   if ( (NULL != ch->head_sent) ||
971        (NULL != ch->owner) ||
972        (NULL != ch->dest) )
973   {
974     /* Wait for other end to destroy us as well,
975        and otherwise allow send queue to be transmitted first */
976     ch->destroy = GNUNET_YES;
977     return;
978   }
979   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
980   if (CADET_CHANNEL_NEW != ch->state)
981     GCT_send_channel_destroy (ch->t,
982                               ch->ctn);
983   /* Nothing left to do, just finish destruction */
984   channel_destroy (ch);
985 }
986
987
988 /**
989  * We got an acknowledgement for the creation of the channel
990  * (the port is open on the other side). Begin transmissions.
991  *
992  * @param ch channel to destroy
993  */
994 void
995 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
996 {
997   switch (ch->state)
998   {
999   case CADET_CHANNEL_NEW:
1000     /* this should be impossible */
1001     GNUNET_break (0);
1002     break;
1003   case CADET_CHANNEL_OPEN_SENT:
1004     if (NULL == ch->owner)
1005     {
1006       /* We're not the owner, wrong direction! */
1007       GNUNET_break_op (0);
1008       return;
1009     }
1010     LOG (GNUNET_ERROR_TYPE_DEBUG,
1011          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1012          GCCH_2s (ch));
1013     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1014     {
1015       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1016       ch->retry_control_task = NULL;
1017     }
1018     ch->state = CADET_CHANNEL_READY;
1019     /* On first connect, send client as many ACKs as we allow messages
1020        to be buffered! */
1021     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1022       send_ack_to_client (ch,
1023                           GNUNET_YES);
1024     break;
1025   case CADET_CHANNEL_READY:
1026     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1027     LOG (GNUNET_ERROR_TYPE_DEBUG,
1028          "Received duplicate channel OPEN_ACK for %s\n",
1029          GCCH_2s (ch));
1030     GNUNET_STATISTICS_update (stats,
1031                               "# duplicate CREATE_ACKs",
1032                               1,
1033                               GNUNET_NO);
1034     break;
1035   }
1036 }
1037
1038
1039 /**
1040  * Test if element @a e1 comes before element @a e2.
1041  *
1042  * @param cls closure, to a flag where we indicate duplicate packets
1043  * @param e1 an element of to sort
1044  * @param e2 another element to sort
1045  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1046  */
1047 static int
1048 is_before (void *cls,
1049            struct CadetOutOfOrderMessage *m1,
1050            struct CadetOutOfOrderMessage *m2)
1051 {
1052   int *duplicate = cls;
1053   uint32_t v1 = ntohl (m1->mid.mid);
1054   uint32_t v2 = ntohl (m2->mid.mid);
1055   uint32_t delta;
1056
1057   delta = v2 - v1;
1058   if (0 == delta)
1059     *duplicate = GNUNET_YES;
1060   if (delta > (uint32_t) INT_MAX)
1061   {
1062     /* in overflow range, we can safely assume we wrapped around */
1063     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064                 "%u > %u => %p > %p\n",
1065                 (unsigned int) v1,
1066                 (unsigned int) v2,
1067                 m1,
1068                 m2);
1069     return GNUNET_NO;
1070   }
1071   else
1072   {
1073     /* result is small, thus v2 > v1, thus e1 < e2 */
1074     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075                 "%u < %u => %p < %p\n",
1076                 (unsigned int) v1,
1077                 (unsigned int) v2,
1078                 m1,
1079                 m2);
1080     return GNUNET_YES;
1081   }
1082 }
1083
1084
1085 /**
1086  * We got payload data for a channel.  Pass it on to the client
1087  * and send an ACK to the other end (once flow control allows it!)
1088  *
1089  * @param ch channel that got data
1090  * @param msg message that was received
1091  */
1092 void
1093 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1094                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1095 {
1096   struct GNUNET_MQ_Envelope *env;
1097   struct GNUNET_CADET_LocalData *ld;
1098   struct CadetChannelClient *ccc;
1099   size_t payload_size;
1100
1101   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1102   if ( (GNUNET_YES == ch->destroy) &&
1103        (NULL == ch->owner) &&
1104        (NULL == ch->dest) )
1105   {
1106     /* This client is gone, but we still have messages to send to
1107        the other end (which is why @a ch is not yet dead).  However,
1108        we cannot pass messages to our client anymore. */
1109     LOG (GNUNET_ERROR_TYPE_DEBUG,
1110          "Dropping incoming payload on %s as this end is already closed\n",
1111          GCCH_2s (ch));
1112     /* FIXME: send back ACK/NACK/Closed notification
1113        to stop retransmissions! */
1114     return;
1115   }
1116   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1117   env = GNUNET_MQ_msg_extra (ld,
1118                              payload_size,
1119                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1120   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1121   GNUNET_memcpy (&ld[1],
1122                  &msg[1],
1123                  payload_size);
1124   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1125   if ( (GNUNET_YES == ccc->client_ready) &&
1126        ( (GNUNET_YES == ch->out_of_order) ||
1127          (msg->mid.mid == ch->mid_recv.mid) ) )
1128   {
1129     LOG (GNUNET_ERROR_TYPE_DEBUG,
1130          "Giving %u bytes of payload from %s to client %s\n",
1131          (unsigned int) payload_size,
1132          GCCH_2s (ch),
1133          GSC_2s (ccc->c));
1134     ccc->client_ready = GNUNET_NO;
1135     GSC_send_to_client (ccc->c,
1136                         env);
1137     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1138     ch->mid_futures >>= 1;
1139   }
1140   else
1141   {
1142     struct CadetOutOfOrderMessage *com;
1143     int duplicate;
1144
1145     /* FIXME-SECURITY: if the element is WAY too far ahead,
1146        drop it (can't buffer too much!) */
1147
1148     com = GNUNET_new (struct CadetOutOfOrderMessage);
1149     com->mid = msg->mid;
1150     com->env = env;
1151     duplicate = GNUNET_NO;
1152     GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1153                                         is_before,
1154                                         &duplicate,
1155                                         ccc->head_recv,
1156                                         ccc->tail_recv,
1157                                         com);
1158     if (GNUNET_YES == duplicate)
1159     {
1160       LOG (GNUNET_ERROR_TYPE_DEBUG,
1161            "Duplicate payload of %u bytes on %s (mid %u) dropped\n",
1162            (unsigned int) payload_size,
1163            GCCH_2s (ch),
1164            ntohl (msg->mid.mid));
1165       GNUNET_STATISTICS_update (stats,
1166                                 "# duplicate DATA",
1167                                 1,
1168                                 GNUNET_NO);
1169       GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1170                                    ccc->tail_recv,
1171                                    com);
1172       GNUNET_free (com);
1173       return;
1174     }
1175     LOG (GNUNET_ERROR_TYPE_DEBUG,
1176          "Queued %s payload of %u bytes on %s (mid %u, need %u first)\n",
1177          (GNUNET_YES == ccc->client_ready)
1178          ? "out-of-order"
1179          : "client-not-ready",
1180          (unsigned int) payload_size,
1181          GCCH_2s (ch),
1182          ntohl (msg->mid.mid),
1183          ntohl (ch->mid_recv.mid));
1184   }
1185 }
1186
1187
1188 /**
1189  * We got an acknowledgement for payload data for a channel.
1190  * Possibly resume transmissions.
1191  *
1192  * @param ch channel that got the ack
1193  * @param ack details about what was received
1194  */
1195 void
1196 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1197                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1198 {
1199   struct CadetReliableMessage *crm;
1200
1201   GNUNET_break (GNUNET_NO == ch->is_loopback);
1202   if (GNUNET_NO == ch->reliable)
1203   {
1204     /* not expecting ACKs on unreliable channel, odd */
1205     GNUNET_break_op (0);
1206     return;
1207   }
1208   for (crm = ch->head_sent;
1209         NULL != crm;
1210        crm = crm->next)
1211     if (ack->mid.mid == crm->data_message->mid.mid)
1212       break;
1213   if (NULL == crm)
1214   {
1215     /* ACK for message we already dropped, might have been a
1216        duplicate ACK? Ignore. */
1217     LOG (GNUNET_ERROR_TYPE_DEBUG,
1218          "Duplicate DATA_ACK on %s, ignoring\n",
1219          GCCH_2s (ch));
1220     GNUNET_STATISTICS_update (stats,
1221                               "# duplicate DATA_ACKs",
1222                               1,
1223                               GNUNET_NO);
1224     return;
1225   }
1226   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1227                                ch->tail_sent,
1228                                crm);
1229   GNUNET_free (crm->data_message);
1230   GNUNET_free (crm);
1231   ch->pending_messages--;
1232   send_ack_to_client (ch,
1233                       (NULL == ch->owner)
1234                       ? GNUNET_NO
1235                       : GNUNET_YES);
1236   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1237   LOG (GNUNET_ERROR_TYPE_DEBUG,
1238        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1239        GCCH_2s (ch),
1240        (unsigned int) ntohl (ack->mid.mid),
1241        ch->pending_messages);
1242   send_ack_to_client (ch,
1243                       (NULL == ch->owner)
1244                       ? GNUNET_NO
1245                       : GNUNET_YES);
1246 }
1247
1248
1249 /**
1250  * Destroy channel, based on the other peer closing the
1251  * connection.  Also needs to remove this channel from
1252  * the tunnel.
1253  *
1254  * @param ch channel to destroy
1255  */
1256 void
1257 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1258 {
1259   struct CadetChannelClient *ccc;
1260
1261   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1262   LOG (GNUNET_ERROR_TYPE_DEBUG,
1263        "Received remote channel DESTROY for %s\n",
1264        GCCH_2s (ch));
1265   if (GNUNET_YES == ch->destroy)
1266   {
1267     /* Local client already gone, this is instant-death. */
1268     channel_destroy (ch);
1269     return;
1270   }
1271   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1272   if (NULL != ccc->head_recv)
1273   {
1274     LOG (GNUNET_ERROR_TYPE_WARNING,
1275          "Lost end of transmission due to remote shutdown on %s\n",
1276          GCCH_2s (ch));
1277     /* FIXME: change API to notify client about truncated transmission! */
1278   }
1279   ch->destroy = GNUNET_YES;
1280   GSC_handle_remote_channel_destroy (ccc->c,
1281                                      ccc->ccn,
1282                                      ch);
1283   channel_destroy (ch);
1284 }
1285
1286
1287 /**
1288  * Function called once the tunnel has sent one of our messages.
1289  * If the message is unreliable, simply frees the `crm`. If the
1290  * message was reliable, calculate retransmission time and
1291  * wait for ACK (or retransmit).
1292  *
1293  * @param cls the `struct CadetReliableMessage` that was sent
1294  */
1295 static void
1296 data_sent_cb (void *cls);
1297
1298
1299 /**
1300  * We need to retry a transmission, the last one took too long to
1301  * be acknowledged.
1302  *
1303  * @param cls the `struct CadetChannel` where we need to retransmit
1304  */
1305 static void
1306 retry_transmission (void *cls)
1307 {
1308   struct CadetChannel *ch = cls;
1309   struct CadetReliableMessage *crm = ch->head_sent;
1310
1311   ch->retry_data_task = NULL;
1312   GNUNET_assert (NULL == crm->qe);
1313   crm->qe = GCT_send (ch->t,
1314                       &crm->data_message->header,
1315                       &data_sent_cb,
1316                       crm);
1317 }
1318
1319
1320 /**
1321  * Function called once the tunnel has sent one of our messages.
1322  * If the message is unreliable, simply frees the `crm`. If the
1323  * message was reliable, calculate retransmission time and
1324  * wait for ACK (or retransmit).
1325  *
1326  * @param cls the `struct CadetReliableMessage` that was sent
1327  */
1328 static void
1329 data_sent_cb (void *cls)
1330 {
1331   struct CadetReliableMessage *crm = cls;
1332   struct CadetChannel *ch = crm->ch;
1333   struct CadetReliableMessage *off;
1334
1335   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1336   crm->qe = NULL;
1337   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1338                                ch->tail_sent,
1339                                crm);
1340   if (GNUNET_NO == ch->reliable)
1341   {
1342     GNUNET_free (crm);
1343     ch->pending_messages--;
1344     send_ack_to_client (ch,
1345                         (NULL == ch->owner)
1346                         ? GNUNET_NO
1347                         : GNUNET_YES);
1348     return;
1349   }
1350   if (0 == crm->retry_delay.rel_value_us)
1351     crm->retry_delay = ch->expected_delay;
1352   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1353
1354   /* find position for re-insertion into the DLL */
1355   if ( (NULL == ch->head_sent) ||
1356        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1357   {
1358     /* insert at HEAD, also (re)schedule retry task! */
1359     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1360                                  ch->tail_sent,
1361                                  crm);
1362     if (NULL != ch->retry_data_task)
1363       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1364     ch->retry_data_task
1365       = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1366                                       &retry_transmission,
1367                                       ch);
1368     return;
1369   }
1370   for (off = ch->head_sent; NULL != off; off = off->next)
1371     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1372       break;
1373   if (NULL == off)
1374   {
1375     /* insert at tail */
1376     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1377                                       ch->tail_sent,
1378                                       crm);
1379   }
1380   else
1381   {
1382     /* insert before off */
1383     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1384                                        ch->tail_sent,
1385                                        off->prev,
1386                                        crm);
1387   }
1388 }
1389
1390
1391 /**
1392  * Handle data given by a client.
1393  *
1394  * Check whether the client is allowed to send in this tunnel, save if
1395  * channel is reliable and send an ACK to the client if there is still
1396  * buffer space in the tunnel.
1397  *
1398  * @param ch Channel.
1399  * @param sender_ccn ccn of the sender
1400  * @param buf payload to transmit.
1401  * @param buf_len number of bytes in @a buf
1402  * @return #GNUNET_OK if everything goes well,
1403  *         #GNUNET_SYSERR in case of an error.
1404  */
1405 int
1406 GCCH_handle_local_data (struct CadetChannel *ch,
1407                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1408                         const char *buf,
1409                         size_t buf_len)
1410 {
1411   struct CadetReliableMessage *crm;
1412
1413   if (ch->pending_messages > ch->max_pending_messages)
1414   {
1415     GNUNET_break (0);
1416     return GNUNET_SYSERR;
1417   }
1418   ch->pending_messages++;
1419
1420   if (GNUNET_YES == ch->is_loopback)
1421   {
1422     struct CadetChannelClient *receiver;
1423     struct GNUNET_MQ_Envelope *env;
1424     struct GNUNET_CADET_LocalData *ld;
1425     int to_owner;
1426
1427     env = GNUNET_MQ_msg_extra (ld,
1428                                buf_len,
1429                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1430     if (sender_ccn.channel_of_client ==
1431         ch->owner->ccn.channel_of_client)
1432     {
1433       receiver = ch->dest;
1434       to_owner = GNUNET_NO;
1435     }
1436     else
1437     {
1438       GNUNET_assert (sender_ccn.channel_of_client ==
1439                      ch->dest->ccn.channel_of_client);
1440       receiver = ch->owner;
1441       to_owner = GNUNET_YES;
1442     }
1443     ld->ccn = receiver->ccn;
1444     GNUNET_memcpy (&ld[1],
1445                    buf,
1446                    buf_len);
1447     /* FIXME: this does not provide for flow control! */
1448     GSC_send_to_client (receiver->c,
1449                         env);
1450     send_ack_to_client (ch,
1451                         to_owner);
1452     return GNUNET_OK;
1453   }
1454
1455   /* Everything is correct, send the message. */
1456   crm = GNUNET_malloc (sizeof (*crm));
1457   crm->ch = ch;
1458   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1459                                      + buf_len);
1460   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1461   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1462   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1463   crm->data_message->mid = ch->mid_send;
1464   crm->data_message->ctn = ch->ctn;
1465   GNUNET_memcpy (&crm->data_message[1],
1466                  buf,
1467                  buf_len);
1468   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1469                                ch->tail_sent,
1470                                crm);
1471   LOG (GNUNET_ERROR_TYPE_DEBUG,
1472        "Sending %u bytes from local client to %s\n",
1473        buf_len,
1474        GCCH_2s (ch));
1475   crm->qe = GCT_send (ch->t,
1476                       &crm->data_message->header,
1477                       &data_sent_cb,
1478                       crm);
1479   return GNUNET_OK;
1480 }
1481
1482
1483 /**
1484  * Handle ACK from client on local channel.  Means the client is ready
1485  * for more data, see if we have any for it.
1486  *
1487  * @param ch channel to destroy
1488  * @param client_ccn ccn of the client sending the ack
1489  */
1490 void
1491 GCCH_handle_local_ack (struct CadetChannel *ch,
1492                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1493 {
1494   struct CadetChannelClient *ccc;
1495   struct CadetOutOfOrderMessage *com;
1496
1497   if ( (NULL != ch->owner) &&
1498        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1499     ccc = ch->owner;
1500   else if ( (NULL != ch->dest) &&
1501             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1502     ccc = ch->dest;
1503   else
1504     GNUNET_assert (0);
1505   ccc->client_ready = GNUNET_YES;
1506   com = ccc->head_recv;
1507   if (NULL == com)
1508   {
1509     LOG (GNUNET_ERROR_TYPE_DEBUG,
1510          "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending)!\n",
1511          GSC_2s (ccc->c),
1512          ntohl (ccc->ccn.channel_of_client));
1513     return; /* none pending */
1514   }
1515   if ( (com->mid.mid != ch->mid_recv.mid) &&
1516        (GNUNET_NO == ch->out_of_order) )
1517   {
1518     LOG (GNUNET_ERROR_TYPE_DEBUG,
1519          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1520          GSC_2s (ccc->c),
1521          ntohl (ccc->ccn.channel_of_client),
1522          ntohl (com->mid.mid),
1523          ntohl (ch->mid_recv.mid));
1524     return; /* missing next one in-order */
1525   }
1526
1527   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1528               "Got LOCAL ACK, passing payload message to %s-%X on %s\n",
1529               GSC_2s (ccc->c),
1530               ntohl (ccc->ccn.channel_of_client),
1531               GCCH_2s (ch));
1532
1533   /* all good, pass next message to client */
1534   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1535                                ccc->tail_recv,
1536                                com);
1537   /* FIXME: if unreliable, this is not aggressive
1538      enough, as it would be OK to have lost some! */
1539   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1540   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1541   ccc->client_ready = GNUNET_NO;
1542   GSC_send_to_client (ccc->c,
1543                       com->env);
1544   GNUNET_free (com);
1545   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1546        (GNUNET_YES == ch->reliable) )
1547   {
1548     /* The next 15 messages were also already received (0xFF), this
1549        suggests that the sender may be blocked on flow control
1550        urgently waiting for an ACK from us. (As we have an inherent
1551        maximum of 64 bits, and 15 is getting too close for comfort.)
1552        So we should send one now. */
1553     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1554                 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1555                 GCCH_2s (ch));
1556     if (GNUNET_YES == ch->reliable)
1557       send_channel_data_ack (ch);
1558   }
1559
1560   if (NULL != ccc->head_recv)
1561     return;
1562   if (GNUNET_NO == ch->destroy)
1563     return;
1564   GCT_send_channel_destroy (ch->t,
1565                             ch->ctn);
1566   channel_destroy (ch);
1567 }
1568
1569
1570 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1571
1572
1573 /**
1574  * Log channel info.
1575  *
1576  * @param ch Channel.
1577  * @param level Debug level to use.
1578  */
1579 void
1580 GCCH_debug (struct CadetChannel *ch,
1581             enum GNUNET_ErrorType level)
1582 {
1583   int do_log;
1584
1585   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1586                                        "cadet-chn",
1587                                        __FILE__, __FUNCTION__, __LINE__);
1588   if (0 == do_log)
1589     return;
1590
1591   if (NULL == ch)
1592   {
1593     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1594     return;
1595   }
1596   LOG2 (level,
1597         "CHN %s:%X (%p)\n",
1598         GCT_2s (ch->t),
1599         ch->ctn,
1600         ch);
1601   if (NULL != ch->owner)
1602   {
1603     LOG2 (level,
1604           "CHN origin %s ready %s local-id: %u\n",
1605           GSC_2s (ch->owner->c),
1606           ch->owner->client_ready ? "YES" : "NO",
1607           ntohl (ch->owner->ccn.channel_of_client));
1608   }
1609   if (NULL != ch->dest)
1610   {
1611     LOG2 (level,
1612           "CHN destination %s ready %s local-id: %u\n",
1613           GSC_2s (ch->dest->c),
1614           ch->dest->client_ready ? "YES" : "NO",
1615           ntohl (ch->dest->ccn.channel_of_client));
1616   }
1617   LOG2 (level,
1618         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1619         ntohl (ch->mid_recv.mid),
1620         (unsigned long long) ch->mid_futures,
1621         ntohl (ch->mid_send.mid));
1622 }
1623
1624
1625
1626 /* end of gnunet-service-cadet-new_channel.c */