use new generic insertion sort logic
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_channel.c
1
2 /*
3      This file is part of GNUnet.
4      Copyright (C) 2001-2017 GNUnet e.V.
5
6      GNUnet is free software; you can redistribute it and/or modify
7      it under the terms of the GNU General Public License as published
8      by the Free Software Foundation; either version 3, or (at your
9      option) any later version.
10
11      GNUnet is distributed in the hope that it will be useful, but
12      WITHOUT ANY WARRANTY; without even the implied warranty of
13      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14      General Public License for more details.
15
16      You should have received a copy of the GNU General Public License
17      along with GNUnet; see the file COPYING.  If not, write to the
18      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19      Boston, MA 02110-1301, USA.
20 */
21 /**
22  * @file cadet/gnunet-service-cadet-new_channel.c
23  * @brief logical links between CADET clients
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * TODO:
28  * - FIXME: send ACKs back to loopback clients!
29  *
30  * - introduce shutdown so we can have half-closed channels, modify
31  *   destroy to include MID to have FIN-ACK equivalents, etc.
32  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
33  * - check that '0xFFULL' really is sufficient for flow control!
34  * - revisit handling of 'unreliable' traffic!
35  * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
36  * - figure out flow control without ACKs (unreliable traffic!)
37  */
38 #include "platform.h"
39 #include "gnunet_util_lib.h"
40 #include "cadet.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet-service-cadet-new.h"
43 #include "gnunet-service-cadet-new_channel.h"
44 #include "gnunet-service-cadet-new_connection.h"
45 #include "gnunet-service-cadet-new_tunnels.h"
46 #include "gnunet-service-cadet-new_peer.h"
47 #include "gnunet-service-cadet-new_paths.h"
48
49 #define LOG(level,...) GNUNET_log_from (level,"cadet-chn",__VA_ARGS__)
50
51 /**
52  * How long do we initially wait before retransmitting?
53  */
54 #define CADET_INITIAL_RETRANSMIT_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 250)
55
56 /**
57  * How long do we wait before dropping state about incoming
58  * connection to closed port?
59  */
60 #define TIMEOUT_CLOSED_PORT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
61
62
63 /**
64  * All the states a connection can be in.
65  */
66 enum CadetChannelState
67 {
68   /**
69    * Uninitialized status, should never appear in operation.
70    */
71   CADET_CHANNEL_NEW,
72
73   /**
74    * Connection create message sent, waiting for ACK.
75    */
76   CADET_CHANNEL_OPEN_SENT,
77
78   /**
79    * Connection confirmed, ready to carry traffic.
80    */
81   CADET_CHANNEL_READY
82 };
83
84
85 /**
86  * Info needed to retry a message in case it gets lost.
87  * Note that we DO use this structure also for unreliable
88  * messages.
89  */
90 struct CadetReliableMessage
91 {
92   /**
93    * Double linked list, FIFO style
94    */
95   struct CadetReliableMessage *next;
96
97   /**
98    * Double linked list, FIFO style
99    */
100   struct CadetReliableMessage *prev;
101
102   /**
103    * Which channel is this message in?
104    */
105   struct CadetChannel *ch;
106
107   /**
108    * Entry in the tunnels queue for this message, NULL if it has left
109    * the tunnel.  Used to cancel transmission in case we receive an
110    * ACK in time.
111    */
112   struct CadetTunnelQueueEntry *qe;
113
114   /**
115    * How soon should we retry if we fail to get an ACK?
116    * Messages in the queue are sorted by this value.
117    */
118   struct GNUNET_TIME_Absolute next_retry;
119
120   /**
121    * How long do we wait for an ACK after transmission?
122    * Use for the back-off calculation.
123    */
124   struct GNUNET_TIME_Relative retry_delay;
125
126   /**
127    * Data message we are trying to send.
128    */
129   struct GNUNET_CADET_ChannelAppDataMessage *data_message;
130
131 };
132
133
134 /**
135  * List of received out-of-order data messages.
136  */
137 struct CadetOutOfOrderMessage
138 {
139   /**
140    * Double linked list, FIFO style
141    */
142   struct CadetOutOfOrderMessage *next;
143
144   /**
145    * Double linked list, FIFO style
146    */
147   struct CadetOutOfOrderMessage *prev;
148
149   /**
150    * ID of the message (messages up to this point needed
151    * before we give this one to the client).
152    */
153   struct ChannelMessageIdentifier mid;
154
155   /**
156    * The envelope with the payload of the out-of-order message
157    */
158   struct GNUNET_MQ_Envelope *env;
159
160 };
161
162
163 /**
164  * Client endpoint of a `struct CadetChannel`.  A channel may be a
165  * loopback channel, in which case it has two of these endpoints.
166  * Note that flow control also is required in both directions.
167  */
168 struct CadetChannelClient
169 {
170   /**
171    * Client handle.  Not by itself sufficient to designate
172    * the client endpoint, as the same client handle may
173    * be used for both the owner and the destination, and
174    * we thus also need the channel ID to identify the client.
175    */
176   struct CadetClient *c;
177
178   /**
179    * Head of DLL of messages received out of order or while client was unready.
180    */
181   struct CadetOutOfOrderMessage *head_recv;
182
183   /**
184    * Tail DLL of messages received out of order or while client was unready.
185    */
186   struct CadetOutOfOrderMessage *tail_recv;
187
188   /**
189    * Local tunnel number for this client.
190    * (if owner >= #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI,
191    *  otherwise < #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
192    */
193   struct GNUNET_CADET_ClientChannelNumber ccn;
194
195   /**
196    * Can we send data to the client?
197    */
198   int client_ready;
199
200 };
201
202
203 /**
204  * Struct containing all information regarding a channel to a remote client.
205  */
206 struct CadetChannel
207 {
208   /**
209    * Tunnel this channel is in.
210    */
211   struct CadetTunnel *t;
212
213   /**
214    * Client owner of the tunnel, if any.
215    * (Used if this channel represends the initiating end of the tunnel.)
216    */
217   struct CadetChannelClient *owner;
218
219   /**
220    * Client destination of the tunnel, if any.
221    * (Used if this channel represents the listening end of the tunnel.)
222    */
223   struct CadetChannelClient *dest;
224
225   /**
226    * Last entry in the tunnel's queue relating to control messages
227    * (#GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN or
228    * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK).  Used to cancel
229    * transmission in case we receive updated information.
230    */
231   struct CadetTunnelQueueEntry *last_control_qe;
232
233   /**
234    * Head of DLL of messages sent and not yet ACK'd.
235    */
236   struct CadetReliableMessage *head_sent;
237
238   /**
239    * Tail of DLL of messages sent and not yet ACK'd.
240    */
241   struct CadetReliableMessage *tail_sent;
242
243   /**
244    * Task to resend/poll in case no ACK is received.
245    */
246   struct GNUNET_SCHEDULER_Task *retry_control_task;
247
248   /**
249    * Task to resend/poll in case no ACK is received.
250    */
251   struct GNUNET_SCHEDULER_Task *retry_data_task;
252
253   /**
254    * Last time the channel was used
255    */
256   struct GNUNET_TIME_Absolute timestamp;
257
258   /**
259    * Destination port of the channel.
260    */
261   struct GNUNET_HashCode port;
262
263   /**
264    * Counter for exponential backoff.
265    */
266   struct GNUNET_TIME_Relative retry_time;
267
268   /**
269    * How long does it usually take to get an ACK.
270    */
271   struct GNUNET_TIME_Relative expected_delay;
272
273   /**
274    * Bitfield of already-received messages past @e mid_recv.
275    */
276   uint64_t mid_futures;
277
278   /**
279    * Next MID expected for incoming traffic.
280    */
281   struct ChannelMessageIdentifier mid_recv;
282
283   /**
284    * Next MID to use for outgoing traffic.
285    */
286   struct ChannelMessageIdentifier mid_send;
287
288   /**
289    * Total (reliable) messages pending ACK for this channel.
290    */
291   unsigned int pending_messages;
292
293   /**
294    * Maximum (reliable) messages pending ACK for this channel
295    * before we throttle the client.
296    */
297   unsigned int max_pending_messages;
298
299   /**
300    * Number identifying this channel in its tunnel.
301    */
302   struct GNUNET_CADET_ChannelTunnelNumber ctn;
303
304   /**
305    * Channel state.
306    */
307   enum CadetChannelState state;
308
309   /**
310    * Is the tunnel bufferless (minimum latency)?
311    */
312   int nobuffer;
313
314   /**
315    * Is the tunnel reliable?
316    */
317   int reliable;
318
319   /**
320    * Is the tunnel out-of-order?
321    */
322   int out_of_order;
323
324   /**
325    * Is this channel a loopback channel, where the destination is us again?
326    */
327   int is_loopback;
328
329   /**
330    * Flag to signal the destruction of the channel.  If this is set to
331    * #GNUNET_YES the channel will be destroyed once the queue is
332    * empty.
333    */
334   int destroy;
335
336 };
337
338
339 /**
340  * Get the static string for identification of the channel.
341  *
342  * @param ch Channel.
343  *
344  * @return Static string with the channel IDs.
345  */
346 const char *
347 GCCH_2s (const struct CadetChannel *ch)
348 {
349   static char buf[128];
350
351   GNUNET_snprintf (buf,
352                    sizeof (buf),
353                    "Channel %s:%s ctn:%X(%X/%X)",
354                    (GNUNET_YES == ch->is_loopback)
355                    ? "loopback"
356                    : GNUNET_i2s (GCP_get_id (GCT_get_destination (ch->t))),
357                    GNUNET_h2s (&ch->port),
358                    ch->ctn,
359                    (NULL == ch->owner) ? 0 : ntohl (ch->owner->ccn.channel_of_client),
360                    (NULL == ch->dest) ? 0 : ntohl (ch->dest->ccn.channel_of_client));
361   return buf;
362 }
363
364
365 /**
366  * Get the channel's public ID.
367  *
368  * @param ch Channel.
369  *
370  * @return ID used to identify the channel with the remote peer.
371  */
372 struct GNUNET_CADET_ChannelTunnelNumber
373 GCCH_get_id (const struct CadetChannel *ch)
374 {
375   return ch->ctn;
376 }
377
378
379 /**
380  * Release memory associated with @a ccc
381  *
382  * @param ccc data structure to clean up
383  */
384 static void
385 free_channel_client (struct CadetChannelClient *ccc)
386 {
387   struct CadetOutOfOrderMessage *com;
388
389   while (NULL != (com = ccc->head_recv))
390   {
391     GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
392                                  ccc->tail_recv,
393                                  com);
394     GNUNET_MQ_discard (com->env);
395     GNUNET_free (com);
396   }
397   GNUNET_free (ccc);
398 }
399
400
401 /**
402  * Destroy the given channel.
403  *
404  * @param ch channel to destroy
405  */
406 static void
407 channel_destroy (struct CadetChannel *ch)
408 {
409   struct CadetReliableMessage *crm;
410
411   while (NULL != (crm = ch->head_sent))
412   {
413     GNUNET_assert (ch == crm->ch);
414     if (NULL != crm->qe)
415     {
416       GCT_send_cancel (crm->qe);
417       crm->qe = NULL;
418     }
419     GNUNET_CONTAINER_DLL_remove (ch->head_sent,
420                                  ch->tail_sent,
421                                  crm);
422     GNUNET_free (crm->data_message);
423     GNUNET_free (crm);
424   }
425   if (NULL != ch->owner)
426   {
427     free_channel_client (ch->owner);
428     ch->owner = NULL;
429   }
430   if (NULL != ch->dest)
431   {
432     free_channel_client (ch->dest);
433     ch->dest = NULL;
434   }
435   if (NULL != ch->last_control_qe)
436   {
437     GCT_send_cancel (ch->last_control_qe);
438     ch->last_control_qe = NULL;
439   }
440   if (NULL != ch->retry_data_task)
441   {
442     GNUNET_SCHEDULER_cancel (ch->retry_data_task);
443     ch->retry_data_task = NULL;
444   }
445   if (NULL != ch->retry_control_task)
446   {
447     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
448     ch->retry_control_task = NULL;
449   }
450   if (GNUNET_NO == ch->is_loopback)
451   {
452     GCT_remove_channel (ch->t,
453                         ch,
454                         ch->ctn);
455     ch->t = NULL;
456   }
457   GNUNET_free (ch);
458 }
459
460
461 /**
462  * Send a channel create message.
463  *
464  * @param cls Channel for which to send.
465  */
466 static void
467 send_channel_open (void *cls);
468
469
470 /**
471  * Function called once the tunnel confirms that we sent the
472  * create message.  Delays for a bit until we retry.
473  *
474  * @param cls our `struct CadetChannel`.
475  */
476 static void
477 channel_open_sent_cb (void *cls)
478 {
479   struct CadetChannel *ch = cls;
480
481   GNUNET_assert (NULL != ch->last_control_qe);
482   ch->last_control_qe = NULL;
483   ch->retry_time = GNUNET_TIME_STD_BACKOFF (ch->retry_time);
484   LOG (GNUNET_ERROR_TYPE_DEBUG,
485        "Sent CADET_CHANNEL_OPEN on %s, retrying in %s\n",
486        GCCH_2s (ch),
487        GNUNET_STRINGS_relative_time_to_string (ch->retry_time,
488                                                GNUNET_YES));
489   ch->retry_control_task
490     = GNUNET_SCHEDULER_add_delayed (ch->retry_time,
491                                     &send_channel_open,
492                                     ch);
493 }
494
495
496 /**
497  * Send a channel open message.
498  *
499  * @param cls Channel for which to send.
500  */
501 static void
502 send_channel_open (void *cls)
503 {
504   struct CadetChannel *ch = cls;
505   struct GNUNET_CADET_ChannelOpenMessage msgcc;
506   uint32_t options;
507
508   ch->retry_control_task = NULL;
509   LOG (GNUNET_ERROR_TYPE_DEBUG,
510        "Sending CHANNEL_OPEN message for %s\n",
511        GCCH_2s (ch));
512   options = 0;
513   if (ch->nobuffer)
514     options |= GNUNET_CADET_OPTION_NOBUFFER;
515   if (ch->reliable)
516     options |= GNUNET_CADET_OPTION_RELIABLE;
517   if (ch->out_of_order)
518     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
519   msgcc.header.size = htons (sizeof (msgcc));
520   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN);
521   msgcc.opt = htonl (options);
522   msgcc.port = ch->port;
523   msgcc.ctn = ch->ctn;
524   ch->state = CADET_CHANNEL_OPEN_SENT;
525   ch->last_control_qe = GCT_send (ch->t,
526                                   &msgcc.header,
527                                   &channel_open_sent_cb,
528                                   ch);
529 }
530
531
532 /**
533  * Function called once and only once after a channel was bound
534  * to its tunnel via #GCT_add_channel() is ready for transmission.
535  * Note that this is only the case for channels that this peer
536  * initiates, as for incoming channels we assume that they are
537  * ready for transmission immediately upon receiving the open
538  * message.  Used to bootstrap the #GCT_send() process.
539  *
540  * @param ch the channel for which the tunnel is now ready
541  */
542 void
543 GCCH_tunnel_up (struct CadetChannel *ch)
544 {
545   GNUNET_assert (NULL == ch->retry_control_task);
546   LOG (GNUNET_ERROR_TYPE_DEBUG,
547        "Tunnel up, sending CHANNEL_OPEN on %s now\n",
548        GCCH_2s (ch));
549   ch->retry_control_task
550     = GNUNET_SCHEDULER_add_now (&send_channel_open,
551                                 ch);
552 }
553
554
555 /**
556  * Create a new channel.
557  *
558  * @param owner local client owning the channel
559  * @param ccn local number of this channel at the @a owner
560  * @param destination peer to which we should build the channel
561  * @param port desired port at @a destination
562  * @param options options for the channel
563  * @return handle to the new channel
564  */
565 struct CadetChannel *
566 GCCH_channel_local_new (struct CadetClient *owner,
567                         struct GNUNET_CADET_ClientChannelNumber ccn,
568                         struct CadetPeer *destination,
569                         const struct GNUNET_HashCode *port,
570                         uint32_t options)
571 {
572   struct CadetChannel *ch;
573   struct CadetChannelClient *ccco;
574
575   ccco = GNUNET_new (struct CadetChannelClient);
576   ccco->c = owner;
577   ccco->ccn = ccn;
578   ccco->client_ready = GNUNET_YES;
579
580   ch = GNUNET_new (struct CadetChannel);
581   ch->mid_recv.mid = htonl (1); /* The OPEN_ACK counts as message 0! */
582   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
583   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
584   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
585   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
586   ch->owner = ccco;
587   ch->port = *port;
588   if (0 == memcmp (&my_full_id,
589                    GCP_get_id (destination),
590                    sizeof (struct GNUNET_PeerIdentity)))
591   {
592     struct CadetClient *c;
593
594     ch->is_loopback = GNUNET_YES;
595     c = GNUNET_CONTAINER_multihashmap_get (open_ports,
596                                            port);
597     if (NULL == c)
598     {
599       /* port closed, wait for it to possibly open */
600       (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
601                                                 port,
602                                                 ch,
603                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
604       LOG (GNUNET_ERROR_TYPE_DEBUG,
605            "Created loose incoming loopback channel to port %s\n",
606            GNUNET_h2s (&ch->port));
607     }
608     else
609     {
610       ch->dest = GNUNET_new (struct CadetChannelClient);
611       ch->dest->c = c;
612       ch->dest->client_ready = GNUNET_YES;
613       GCCH_bind (ch,
614                  ch->dest->c);
615     }
616   }
617   else
618   {
619     ch->t = GCP_get_tunnel (destination,
620                             GNUNET_YES);
621     ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
622     ch->ctn = GCT_add_channel (ch->t,
623                                ch);
624   }
625   GNUNET_STATISTICS_update (stats,
626                             "# channels",
627                             1,
628                             GNUNET_NO);
629   LOG (GNUNET_ERROR_TYPE_DEBUG,
630        "Created channel to port %s at peer %s for %s using %s\n",
631        GNUNET_h2s (port),
632        GCP_2s (destination),
633        GSC_2s (owner),
634        (GNUNET_YES == ch->is_loopback) ? "loopback" : GCT_2s (ch->t));
635   return ch;
636 }
637
638
639 /**
640  * We had an incoming channel to a port that is closed.
641  * It has not been opened for a while, drop it.
642  *
643  * @param cls the channel to drop
644  */
645 static void
646 timeout_closed_cb (void *cls)
647 {
648   struct CadetChannel *ch = cls;
649
650   ch->retry_control_task = NULL;
651   LOG (GNUNET_ERROR_TYPE_DEBUG,
652        "Closing incoming channel to port %s from peer %s due to timeout\n",
653        GNUNET_h2s (&ch->port),
654        GCP_2s (GCT_get_destination (ch->t)));
655   channel_destroy (ch);
656 }
657
658
659 /**
660  * Create a new channel based on a request coming in over the network.
661  *
662  * @param t tunnel to the remote peer
663  * @param ctn identifier of this channel in the tunnel
664  * @param port desired local port
665  * @param options options for the channel
666  * @return handle to the new channel
667  */
668 struct CadetChannel *
669 GCCH_channel_incoming_new (struct CadetTunnel *t,
670                            struct GNUNET_CADET_ChannelTunnelNumber ctn,
671                            const struct GNUNET_HashCode *port,
672                            uint32_t options)
673 {
674   struct CadetChannel *ch;
675   struct CadetClient *c;
676
677   ch = GNUNET_new (struct CadetChannel);
678   ch->port = *port;
679   ch->t = t;
680   ch->ctn = ctn;
681   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
682   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
683   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
684   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
685   ch->max_pending_messages = (ch->nobuffer) ? 1 : 4; /* FIXME: 4!? Do not hardcode! */
686   GNUNET_STATISTICS_update (stats,
687                             "# channels",
688                             1,
689                             GNUNET_NO);
690
691   c = GNUNET_CONTAINER_multihashmap_get (open_ports,
692                                          port);
693   if (NULL == c)
694   {
695     /* port closed, wait for it to possibly open */
696     (void) GNUNET_CONTAINER_multihashmap_put (loose_channels,
697                                               port,
698                                               ch,
699                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
700     ch->retry_control_task
701       = GNUNET_SCHEDULER_add_delayed (TIMEOUT_CLOSED_PORT,
702                                       &timeout_closed_cb,
703                                       ch);
704     LOG (GNUNET_ERROR_TYPE_DEBUG,
705          "Created loose incoming channel to port %s from peer %s\n",
706          GNUNET_h2s (&ch->port),
707          GCP_2s (GCT_get_destination (ch->t)));
708   }
709   else
710   {
711     GCCH_bind (ch,
712                c);
713   }
714   GNUNET_STATISTICS_update (stats,
715                             "# channels",
716                             1,
717                             GNUNET_NO);
718   return ch;
719 }
720
721
722 /**
723  * Function called once the tunnel confirms that we sent the
724  * ACK message.  Just remembers it was sent, we do not expect
725  * ACKs for ACKs ;-).
726  *
727  * @param cls our `struct CadetChannel`.
728  */
729 static void
730 send_ack_cb (void *cls)
731 {
732   struct CadetChannel *ch = cls;
733
734   GNUNET_assert (NULL != ch->last_control_qe);
735   ch->last_control_qe = NULL;
736 }
737
738
739 /**
740  * Compute and send the current #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK to the other peer.
741  *
742  * @param ch channel to send the #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK for
743  */
744 static void
745 send_channel_data_ack (struct CadetChannel *ch)
746 {
747   struct GNUNET_CADET_ChannelDataAckMessage msg;
748
749   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA_ACK);
750   msg.header.size = htons (sizeof (msg));
751   msg.ctn = ch->ctn;
752   msg.mid.mid = htonl (ntohl (ch->mid_recv.mid) - 1);
753   msg.futures = GNUNET_htonll (ch->mid_futures);
754   if (NULL != ch->last_control_qe)
755     GCT_send_cancel (ch->last_control_qe);
756   ch->last_control_qe = GCT_send (ch->t,
757                                   &msg.header,
758                                   &send_ack_cb,
759                                   ch);
760 }
761
762
763 /**
764  * Send our initial #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK to the client confirming that the
765  * connection is up.
766  *
767  * @param cls the `struct CadetChannel`
768  */
769 static void
770 send_open_ack (void *cls)
771 {
772   struct CadetChannel *ch = cls;
773   struct GNUNET_CADET_ChannelManageMessage msg;
774
775   LOG (GNUNET_ERROR_TYPE_DEBUG,
776        "Sending CHANNEL_OPEN_ACK on %s\n",
777        GCCH_2s (ch));
778   ch->retry_control_task = NULL;
779   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK);
780   msg.header.size = htons (sizeof (msg));
781   msg.reserved = htonl (0);
782   msg.ctn = ch->ctn;
783   if (NULL != ch->last_control_qe)
784     GCT_send_cancel (ch->last_control_qe);
785   ch->last_control_qe = GCT_send (ch->t,
786                                   &msg.header,
787                                   &send_ack_cb,
788                                   ch);
789 }
790
791
792 /**
793  * We got a #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN message again for
794  * this channel.  If the binding was successful, (re)transmit the
795  * #GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_ACK.
796  *
797  * @param ch channel that got the duplicate open
798  */
799 void
800 GCCH_handle_duplicate_open (struct CadetChannel *ch)
801 {
802   if (NULL == ch->dest)
803   {
804     LOG (GNUNET_ERROR_TYPE_DEBUG,
805          "Ignoring duplicate channel OPEN on %s: port is closed\n",
806          GCCH_2s (ch));
807     return;
808   }
809   if (NULL != ch->retry_control_task)
810   {
811     LOG (GNUNET_ERROR_TYPE_DEBUG,
812          "Ignoring duplicate channel OPEN on %s: control message is pending\n",
813          GCCH_2s (ch));
814     return;
815   }
816   LOG (GNUNET_ERROR_TYPE_DEBUG,
817        "Retransmitting OPEN_ACK on %s\n",
818        GCCH_2s (ch));
819   ch->retry_control_task
820     = GNUNET_SCHEDULER_add_now (&send_open_ack,
821                                 ch);
822 }
823
824
825 /**
826  * Send a #GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK to the client to solicit more messages.
827  *
828  * @param ch channel the ack is for
829  * @param to_owner #GNUNET_YES to send to owner,
830  *                 #GNUNET_NO to send to dest
831  */
832 static void
833 send_ack_to_client (struct CadetChannel *ch,
834                     int to_owner)
835 {
836   struct GNUNET_MQ_Envelope *env;
837   struct GNUNET_CADET_LocalAck *ack;
838   struct CadetChannelClient *ccc;
839
840   env = GNUNET_MQ_msg (ack,
841                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
842   ccc = (GNUNET_YES == to_owner) ? ch->owner : ch->dest;
843   ack->ccn = ccc->ccn;
844   LOG (GNUNET_ERROR_TYPE_DEBUG,
845        "Sending CADET_LOCAL_ACK to %s (%s) at ccn %X (%u/%u pending)\n",
846        GSC_2s (ccc->c),
847        (GNUNET_YES == to_owner) ? "owner" : "dest",
848        ntohl (ack->ccn.channel_of_client),
849        ch->pending_messages,
850        ch->max_pending_messages);
851   GSC_send_to_client (ccc->c,
852                       env);
853 }
854
855
856 /**
857  * A client is bound to the port that we have a channel
858  * open to.  Send the acknowledgement for the connection
859  * request and establish the link with the client.
860  *
861  * @param ch open incoming channel
862  * @param c client listening on the respective port
863  */
864 void
865 GCCH_bind (struct CadetChannel *ch,
866            struct CadetClient *c)
867 {
868   uint32_t options;
869   struct CadetChannelClient *cccd;
870
871   LOG (GNUNET_ERROR_TYPE_DEBUG,
872        "Binding %s from %s to port %s of %s\n",
873        GCCH_2s (ch),
874        GCT_2s (ch->t),
875        GNUNET_h2s (&ch->port),
876        GSC_2s (c));
877   if (NULL != ch->retry_control_task)
878   {
879     /* there might be a timeout task here */
880     GNUNET_SCHEDULER_cancel (ch->retry_control_task);
881     ch->retry_control_task = NULL;
882   }
883   options = 0;
884   if (ch->nobuffer)
885     options |= GNUNET_CADET_OPTION_NOBUFFER;
886   if (ch->reliable)
887     options |= GNUNET_CADET_OPTION_RELIABLE;
888   if (ch->out_of_order)
889     options |= GNUNET_CADET_OPTION_OUT_OF_ORDER;
890   cccd = GNUNET_new (struct CadetChannelClient);
891   ch->dest = cccd;
892   cccd->c = c;
893   cccd->client_ready = GNUNET_YES;
894   cccd->ccn = GSC_bind (c,
895                         ch,
896                         (GNUNET_YES == ch->is_loopback)
897                         ? GCP_get (&my_full_id,
898                                    GNUNET_YES)
899                         : GCT_get_destination (ch->t),
900                         &ch->port,
901                         options);
902   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
903                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
904   ch->mid_recv.mid = htonl (1); /* The OPEN counts as message 0! */
905   if (GNUNET_YES == ch->is_loopback)
906   {
907     ch->state = CADET_CHANNEL_OPEN_SENT;
908     GCCH_handle_channel_open_ack (ch);
909   }
910   else
911   {
912     /* notify other peer that we accepted the connection */
913     ch->retry_control_task
914       = GNUNET_SCHEDULER_add_now (&send_open_ack,
915                                   ch);
916   }
917   /* give client it's initial supply of ACKs */
918   GNUNET_assert (ntohl (cccd->ccn.channel_of_client) <
919                  GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
920   for (unsigned int i=0;i<ch->max_pending_messages;i++)
921     send_ack_to_client (ch,
922                         GNUNET_NO);
923 }
924
925
926 /**
927  * Destroy locally created channel.  Called by the local client, so no
928  * need to tell the client.
929  *
930  * @param ch channel to destroy
931  * @param c client that caused the destruction
932  * @param ccn client number of the client @a c
933  */
934 void
935 GCCH_channel_local_destroy (struct CadetChannel *ch,
936                             struct CadetClient *c,
937                             struct GNUNET_CADET_ClientChannelNumber ccn)
938 {
939   LOG (GNUNET_ERROR_TYPE_DEBUG,
940        "%s asks for destruction of %s\n",
941        GSC_2s (c),
942        GCCH_2s (ch));
943   GNUNET_assert (NULL != c);
944   if ( (NULL != ch->owner) &&
945        (c == ch->owner->c) &&
946        (ccn.channel_of_client == ch->owner->ccn.channel_of_client) )
947   {
948     free_channel_client (ch->owner);
949     ch->owner = NULL;
950   }
951   else if ( (NULL != ch->dest) &&
952             (c == ch->dest->c) &&
953             (ccn.channel_of_client == ch->dest->ccn.channel_of_client) )
954   {
955     free_channel_client (ch->dest);
956     ch->dest = NULL;
957   }
958   else
959   {
960     GNUNET_assert (0);
961   }
962
963   if (GNUNET_YES == ch->destroy)
964   {
965     /* other end already destroyed, with the local client gone, no need
966        to finish transmissions, just destroy immediately. */
967     channel_destroy (ch);
968     return;
969   }
970   if ( (NULL != ch->head_sent) ||
971        (NULL != ch->owner) ||
972        (NULL != ch->dest) )
973   {
974     /* Wait for other end to destroy us as well,
975        and otherwise allow send queue to be transmitted first */
976     ch->destroy = GNUNET_YES;
977     return;
978   }
979   /* If the we ever sent the CHANNEL_CREATE, we need to send a destroy message. */
980   if (CADET_CHANNEL_NEW != ch->state)
981     GCT_send_channel_destroy (ch->t,
982                               ch->ctn);
983   /* Nothing left to do, just finish destruction */
984   channel_destroy (ch);
985 }
986
987
988 /**
989  * We got an acknowledgement for the creation of the channel
990  * (the port is open on the other side). Begin transmissions.
991  *
992  * @param ch channel to destroy
993  */
994 void
995 GCCH_handle_channel_open_ack (struct CadetChannel *ch)
996 {
997   switch (ch->state)
998   {
999   case CADET_CHANNEL_NEW:
1000     /* this should be impossible */
1001     GNUNET_break (0);
1002     break;
1003   case CADET_CHANNEL_OPEN_SENT:
1004     if (NULL == ch->owner)
1005     {
1006       /* We're not the owner, wrong direction! */
1007       GNUNET_break_op (0);
1008       return;
1009     }
1010     LOG (GNUNET_ERROR_TYPE_DEBUG,
1011          "Received CHANNEL_OPEN_ACK for waiting %s, entering READY state\n",
1012          GCCH_2s (ch));
1013     if (NULL != ch->retry_control_task) /* can be NULL if ch->is_loopback */
1014     {
1015       GNUNET_SCHEDULER_cancel (ch->retry_control_task);
1016       ch->retry_control_task = NULL;
1017     }
1018     ch->state = CADET_CHANNEL_READY;
1019     /* On first connect, send client as many ACKs as we allow messages
1020        to be buffered! */
1021     for (unsigned int i=0;i<ch->max_pending_messages;i++)
1022       send_ack_to_client (ch,
1023                           GNUNET_YES);
1024     break;
1025   case CADET_CHANNEL_READY:
1026     /* duplicate ACK, maybe we retried the CREATE. Ignore. */
1027     LOG (GNUNET_ERROR_TYPE_DEBUG,
1028          "Received duplicate channel OPEN_ACK for %s\n",
1029          GCCH_2s (ch));
1030     GNUNET_STATISTICS_update (stats,
1031                               "# duplicate CREATE_ACKs",
1032                               1,
1033                               GNUNET_NO);
1034     break;
1035   }
1036 }
1037
1038
1039 /**
1040  * Test if element @a e1 comes before element @a e2.
1041  *
1042  * TODO: use opportunity to create generic list insertion sort
1043  * logic in container!
1044  *
1045  * @param cls closure, our `struct CadetChannel`
1046  * @param e1 an element of to sort
1047  * @param e2 another element to sort
1048  * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
1049  */
1050 static int
1051 is_before (void *cls,
1052            struct CadetOutOfOrderMessage *m1,
1053            struct CadetOutOfOrderMessage *m2)
1054 {
1055   uint32_t v1 = ntohl (m1->mid.mid);
1056   uint32_t v2 = ntohl (m2->mid.mid);
1057   uint32_t delta;
1058
1059   delta = v2 - v1;
1060   GNUNET_assert (0 != delta);
1061   if (delta > (uint32_t) INT_MAX)
1062   {
1063     /* in overflow range, we can safely assume we wrapped around */
1064     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1065                 "%u > %u => %p > %p\n",
1066                 (unsigned int) v1,
1067                 (unsigned int) v2,
1068                 m1,
1069                 m2);
1070     return GNUNET_NO;
1071   }
1072   else
1073   {
1074     /* result is small, thus v2 > v1, thus e1 < e2 */
1075     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076                 "%u < %u => %p < %p\n",
1077                 (unsigned int) v1,
1078                 (unsigned int) v2,
1079                 m1,
1080                 m2);
1081     return GNUNET_YES;
1082   }
1083 }
1084
1085
1086 /**
1087  * We got payload data for a channel.  Pass it on to the client
1088  * and send an ACK to the other end (once flow control allows it!)
1089  *
1090  * @param ch channel that got data
1091  * @param msg message that was received
1092  */
1093 void
1094 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
1095                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
1096 {
1097   struct GNUNET_MQ_Envelope *env;
1098   struct GNUNET_CADET_LocalData *ld;
1099   struct CadetChannelClient *ccc;
1100   struct CadetOutOfOrderMessage *com;
1101   size_t payload_size;
1102
1103   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1104   if ( (GNUNET_YES == ch->destroy) &&
1105        (NULL == ch->owner) &&
1106        (NULL == ch->dest) )
1107   {
1108     /* This client is gone, but we still have messages to send to
1109        the other end (which is why @a ch is not yet dead).  However,
1110        we cannot pass messages to our client anymore. */
1111     LOG (GNUNET_ERROR_TYPE_DEBUG,
1112          "Dropping incoming payload on %s as this end is already closed\n",
1113          GCCH_2s (ch));
1114     /* FIXME: send back ACK/NACK/Closed notification
1115        to stop retransmissions! */
1116     return;
1117   }
1118   payload_size = ntohs (msg->header.size) - sizeof (*msg);
1119   env = GNUNET_MQ_msg_extra (ld,
1120                              payload_size,
1121                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1122   ld->ccn = (NULL == ch->dest) ? ch->owner->ccn : ch->dest->ccn;
1123   GNUNET_memcpy (&ld[1],
1124                  &msg[1],
1125                  payload_size);
1126   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1127   if ( (GNUNET_YES == ccc->client_ready) &&
1128        ( (GNUNET_YES == ch->out_of_order) ||
1129          (msg->mid.mid == ch->mid_recv.mid) ) )
1130   {
1131     LOG (GNUNET_ERROR_TYPE_DEBUG,
1132          "Giving %u bytes of payload from %s to client %s\n",
1133          (unsigned int) payload_size,
1134          GCCH_2s (ch),
1135          GSC_2s (ccc->c));
1136     ccc->client_ready = GNUNET_NO;
1137     GSC_send_to_client (ccc->c,
1138                         env);
1139     ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
1140     ch->mid_futures >>= 1;
1141   }
1142   else
1143   {
1144     /* FIXME-SECURITY: if the element is WAY too far ahead,
1145        drop it (can't buffer too much!) */
1146     /* FIXME-SECURITY: if element is a BIT in the past and/or
1147        duplicate, just drop it! */
1148     LOG (GNUNET_ERROR_TYPE_DEBUG,
1149          "Queuing %s payload of %u bytes on %s (mid %u, need %u first)\n",
1150          (GNUNET_YES == ccc->client_ready)
1151          ? "out-of-order"
1152          : "client-not-ready",
1153          (unsigned int) payload_size,
1154          GCCH_2s (ch),
1155          ntohl (msg->mid.mid),
1156          ntohl (ch->mid_recv.mid));
1157
1158     com = GNUNET_new (struct CadetOutOfOrderMessage);
1159     com->mid = msg->mid;
1160     com->env = env;
1161
1162     GNUNET_CONTAINER_DLL_insert_sorted (struct CadetOutOfOrderMessage,
1163                                         is_before,
1164                                         NULL,
1165                                         ccc->head_recv,
1166                                         ccc->tail_recv,
1167                                         com);
1168   }
1169 }
1170
1171
1172 /**
1173  * We got an acknowledgement for payload data for a channel.
1174  * Possibly resume transmissions.
1175  *
1176  * @param ch channel that got the ack
1177  * @param ack details about what was received
1178  */
1179 void
1180 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
1181                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
1182 {
1183   struct CadetReliableMessage *crm;
1184
1185   GNUNET_break (GNUNET_NO == ch->is_loopback);
1186   if (GNUNET_NO == ch->reliable)
1187   {
1188     /* not expecting ACKs on unreliable channel, odd */
1189     GNUNET_break_op (0);
1190     return;
1191   }
1192   for (crm = ch->head_sent;
1193         NULL != crm;
1194        crm = crm->next)
1195     if (ack->mid.mid == crm->data_message->mid.mid)
1196       break;
1197   if (NULL == crm)
1198   {
1199     /* ACK for message we already dropped, might have been a
1200        duplicate ACK? Ignore. */
1201     LOG (GNUNET_ERROR_TYPE_DEBUG,
1202          "Duplicate DATA_ACK on %s, ignoring\n",
1203          GCCH_2s (ch));
1204     GNUNET_STATISTICS_update (stats,
1205                               "# duplicate DATA_ACKs",
1206                               1,
1207                               GNUNET_NO);
1208     return;
1209   }
1210   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1211                                ch->tail_sent,
1212                                crm);
1213   GNUNET_free (crm->data_message);
1214   GNUNET_free (crm);
1215   ch->pending_messages--;
1216   send_ack_to_client (ch,
1217                       (NULL == ch->owner)
1218                       ? GNUNET_NO
1219                       : GNUNET_YES);
1220   GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
1221   LOG (GNUNET_ERROR_TYPE_DEBUG,
1222        "Received DATA_ACK on %s for message %u (%u ACKs pending)\n",
1223        GCCH_2s (ch),
1224        (unsigned int) ntohl (ack->mid.mid),
1225        ch->pending_messages);
1226   send_ack_to_client (ch,
1227                       (NULL == ch->owner)
1228                       ? GNUNET_NO
1229                       : GNUNET_YES);
1230 }
1231
1232
1233 /**
1234  * Destroy channel, based on the other peer closing the
1235  * connection.  Also needs to remove this channel from
1236  * the tunnel.
1237  *
1238  * @param ch channel to destroy
1239  */
1240 void
1241 GCCH_handle_remote_destroy (struct CadetChannel *ch)
1242 {
1243   struct CadetChannelClient *ccc;
1244
1245   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1246   LOG (GNUNET_ERROR_TYPE_DEBUG,
1247        "Received remote channel DESTROY for %s\n",
1248        GCCH_2s (ch));
1249   if (GNUNET_YES == ch->destroy)
1250   {
1251     /* Local client already gone, this is instant-death. */
1252     channel_destroy (ch);
1253     return;
1254   }
1255   ccc = (NULL != ch->owner) ? ch->owner : ch->dest;
1256   if (NULL != ccc->head_recv)
1257   {
1258     LOG (GNUNET_ERROR_TYPE_WARNING,
1259          "Lost end of transmission due to remote shutdown on %s\n",
1260          GCCH_2s (ch));
1261     /* FIXME: change API to notify client about truncated transmission! */
1262   }
1263   ch->destroy = GNUNET_YES;
1264   GSC_handle_remote_channel_destroy (ccc->c,
1265                                      ccc->ccn,
1266                                      ch);
1267   channel_destroy (ch);
1268 }
1269
1270
1271 /**
1272  * Function called once the tunnel has sent one of our messages.
1273  * If the message is unreliable, simply frees the `crm`. If the
1274  * message was reliable, calculate retransmission time and
1275  * wait for ACK (or retransmit).
1276  *
1277  * @param cls the `struct CadetReliableMessage` that was sent
1278  */
1279 static void
1280 data_sent_cb (void *cls);
1281
1282
1283 /**
1284  * We need to retry a transmission, the last one took too long to
1285  * be acknowledged.
1286  *
1287  * @param cls the `struct CadetChannel` where we need to retransmit
1288  */
1289 static void
1290 retry_transmission (void *cls)
1291 {
1292   struct CadetChannel *ch = cls;
1293   struct CadetReliableMessage *crm = ch->head_sent;
1294
1295   ch->retry_data_task = NULL;
1296   GNUNET_assert (NULL == crm->qe);
1297   crm->qe = GCT_send (ch->t,
1298                       &crm->data_message->header,
1299                       &data_sent_cb,
1300                       crm);
1301 }
1302
1303
1304 /**
1305  * Function called once the tunnel has sent one of our messages.
1306  * If the message is unreliable, simply frees the `crm`. If the
1307  * message was reliable, calculate retransmission time and
1308  * wait for ACK (or retransmit).
1309  *
1310  * @param cls the `struct CadetReliableMessage` that was sent
1311  */
1312 static void
1313 data_sent_cb (void *cls)
1314 {
1315   struct CadetReliableMessage *crm = cls;
1316   struct CadetChannel *ch = crm->ch;
1317   struct CadetReliableMessage *off;
1318
1319   GNUNET_assert (GNUNET_NO == ch->is_loopback);
1320   crm->qe = NULL;
1321   GNUNET_CONTAINER_DLL_remove (ch->head_sent,
1322                                ch->tail_sent,
1323                                crm);
1324   if (GNUNET_NO == ch->reliable)
1325   {
1326     GNUNET_free (crm);
1327     ch->pending_messages--;
1328     send_ack_to_client (ch,
1329                         (NULL == ch->owner)
1330                         ? GNUNET_NO
1331                         : GNUNET_YES);
1332     return;
1333   }
1334   if (0 == crm->retry_delay.rel_value_us)
1335     crm->retry_delay = ch->expected_delay;
1336   crm->next_retry = GNUNET_TIME_relative_to_absolute (crm->retry_delay);
1337
1338   /* find position for re-insertion into the DLL */
1339   if ( (NULL == ch->head_sent) ||
1340        (crm->next_retry.abs_value_us < ch->head_sent->next_retry.abs_value_us) )
1341   {
1342     /* insert at HEAD, also (re)schedule retry task! */
1343     GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1344                                  ch->tail_sent,
1345                                  crm);
1346     if (NULL != ch->retry_data_task)
1347       GNUNET_SCHEDULER_cancel (ch->retry_data_task);
1348     ch->retry_data_task
1349       = GNUNET_SCHEDULER_add_delayed (crm->retry_delay,
1350                                       &retry_transmission,
1351                                       ch);
1352     return;
1353   }
1354   for (off = ch->head_sent; NULL != off; off = off->next)
1355     if (crm->next_retry.abs_value_us < off->next_retry.abs_value_us)
1356       break;
1357   if (NULL == off)
1358   {
1359     /* insert at tail */
1360     GNUNET_CONTAINER_DLL_insert_tail (ch->head_sent,
1361                                       ch->tail_sent,
1362                                       crm);
1363   }
1364   else
1365   {
1366     /* insert before off */
1367     GNUNET_CONTAINER_DLL_insert_after (ch->head_sent,
1368                                        ch->tail_sent,
1369                                        off->prev,
1370                                        crm);
1371   }
1372 }
1373
1374
1375 /**
1376  * Handle data given by a client.
1377  *
1378  * Check whether the client is allowed to send in this tunnel, save if
1379  * channel is reliable and send an ACK to the client if there is still
1380  * buffer space in the tunnel.
1381  *
1382  * @param ch Channel.
1383  * @param sender_ccn ccn of the sender
1384  * @param buf payload to transmit.
1385  * @param buf_len number of bytes in @a buf
1386  * @return #GNUNET_OK if everything goes well,
1387  *         #GNUNET_SYSERR in case of an error.
1388  */
1389 int
1390 GCCH_handle_local_data (struct CadetChannel *ch,
1391                         struct GNUNET_CADET_ClientChannelNumber sender_ccn,
1392                         const char *buf,
1393                         size_t buf_len)
1394 {
1395   struct CadetReliableMessage *crm;
1396
1397   if (ch->pending_messages > ch->max_pending_messages)
1398   {
1399     GNUNET_break (0);
1400     return GNUNET_SYSERR;
1401   }
1402   ch->pending_messages++;
1403
1404   if (GNUNET_YES == ch->is_loopback)
1405   {
1406     struct CadetChannelClient *receiver;
1407     struct GNUNET_MQ_Envelope *env;
1408     struct GNUNET_CADET_LocalData *ld;
1409     int to_owner;
1410
1411     env = GNUNET_MQ_msg_extra (ld,
1412                                buf_len,
1413                                GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1414     if (sender_ccn.channel_of_client ==
1415         ch->owner->ccn.channel_of_client)
1416     {
1417       receiver = ch->dest;
1418       to_owner = GNUNET_NO;
1419     }
1420     else
1421     {
1422       GNUNET_assert (sender_ccn.channel_of_client ==
1423                      ch->dest->ccn.channel_of_client);
1424       receiver = ch->owner;
1425       to_owner = GNUNET_YES;
1426     }
1427     ld->ccn = receiver->ccn;
1428     GNUNET_memcpy (&ld[1],
1429                    buf,
1430                    buf_len);
1431     /* FIXME: this does not provide for flow control! */
1432     GSC_send_to_client (receiver->c,
1433                         env);
1434     send_ack_to_client (ch,
1435                         to_owner);
1436     return GNUNET_OK;
1437   }
1438
1439   /* Everything is correct, send the message. */
1440   crm = GNUNET_malloc (sizeof (*crm));
1441   crm->ch = ch;
1442   crm->data_message = GNUNET_malloc (sizeof (struct GNUNET_CADET_ChannelAppDataMessage)
1443                                      + buf_len);
1444   crm->data_message->header.size = htons (sizeof (struct GNUNET_CADET_ChannelAppDataMessage) + buf_len);
1445   crm->data_message->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_APP_DATA);
1446   ch->mid_send.mid = htonl (ntohl (ch->mid_send.mid) + 1);
1447   crm->data_message->mid = ch->mid_send;
1448   crm->data_message->ctn = ch->ctn;
1449   GNUNET_memcpy (&crm->data_message[1],
1450                  buf,
1451                  buf_len);
1452   GNUNET_CONTAINER_DLL_insert (ch->head_sent,
1453                                ch->tail_sent,
1454                                crm);
1455   LOG (GNUNET_ERROR_TYPE_DEBUG,
1456        "Sending %u bytes from local client to %s\n",
1457        buf_len,
1458        GCCH_2s (ch));
1459   crm->qe = GCT_send (ch->t,
1460                       &crm->data_message->header,
1461                       &data_sent_cb,
1462                       crm);
1463   return GNUNET_OK;
1464 }
1465
1466
1467 /**
1468  * Handle ACK from client on local channel.  Means the client is ready
1469  * for more data, see if we have any for it.
1470  *
1471  * @param ch channel to destroy
1472  * @param client_ccn ccn of the client sending the ack
1473  */
1474 void
1475 GCCH_handle_local_ack (struct CadetChannel *ch,
1476                        struct GNUNET_CADET_ClientChannelNumber client_ccn)
1477 {
1478   struct CadetChannelClient *ccc;
1479   struct CadetOutOfOrderMessage *com;
1480
1481   if ( (NULL != ch->owner) &&
1482        (ch->owner->ccn.channel_of_client == client_ccn.channel_of_client) )
1483     ccc = ch->owner;
1484   else if ( (NULL != ch->dest) &&
1485             (ch->dest->ccn.channel_of_client == client_ccn.channel_of_client) )
1486     ccc = ch->dest;
1487   else
1488     GNUNET_assert (0);
1489   ccc->client_ready = GNUNET_YES;
1490   com = ccc->head_recv;
1491   if (NULL == com)
1492   {
1493     LOG (GNUNET_ERROR_TYPE_DEBUG,
1494          "Got LOCAL_ACK, %s-%X ready to receive more data (but none pending)!\n",
1495          GSC_2s (ccc->c),
1496          ntohl (ccc->ccn.channel_of_client));
1497     return; /* none pending */
1498   }
1499   if ( (com->mid.mid != ch->mid_recv.mid) &&
1500        (GNUNET_NO == ch->out_of_order) )
1501   {
1502     LOG (GNUNET_ERROR_TYPE_DEBUG,
1503          "Got LOCAL_ACK, %s-%X ready to receive more data (but next one is out-of-order %u vs. %u)!\n",
1504          GSC_2s (ccc->c),
1505          ntohl (ccc->ccn.channel_of_client),
1506          ntohl (com->mid.mid),
1507          ntohl (ch->mid_recv.mid));
1508     return; /* missing next one in-order */
1509   }
1510
1511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1512               "Got LOCAL ACK, passing payload message to %s-%X on %s\n",
1513               GSC_2s (ccc->c),
1514               ntohl (ccc->ccn.channel_of_client),
1515               GCCH_2s (ch));
1516
1517   /* all good, pass next message to client */
1518   GNUNET_CONTAINER_DLL_remove (ccc->head_recv,
1519                                ccc->tail_recv,
1520                                com);
1521   /* FIXME: if unreliable, this is not aggressive
1522      enough, as it would be OK to have lost some! */
1523   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1524   ch->mid_futures >>= 1; /* equivalent to division by 2 */
1525   ccc->client_ready = GNUNET_NO;
1526   GSC_send_to_client (ccc->c,
1527                       com->env);
1528   GNUNET_free (com);
1529   if ( (0xFFULL == (ch->mid_futures & 0xFFULL)) &&
1530        (GNUNET_YES == ch->reliable) )
1531   {
1532     /* The next 15 messages were also already received (0xFF), this
1533        suggests that the sender may be blocked on flow control
1534        urgently waiting for an ACK from us. (As we have an inherent
1535        maximum of 64 bits, and 15 is getting too close for comfort.)
1536        So we should send one now. */
1537     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1538                 "Sender on %s likely blocked on flow-control, sending ACK now.\n",
1539                 GCCH_2s (ch));
1540     if (GNUNET_YES == ch->reliable)
1541       send_channel_data_ack (ch);
1542   }
1543
1544   if (NULL != ccc->head_recv)
1545     return;
1546   if (GNUNET_NO == ch->destroy)
1547     return;
1548   GCT_send_channel_destroy (ch->t,
1549                             ch->ctn);
1550   channel_destroy (ch);
1551 }
1552
1553
1554 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-chn",__VA_ARGS__)
1555
1556
1557 /**
1558  * Log channel info.
1559  *
1560  * @param ch Channel.
1561  * @param level Debug level to use.
1562  */
1563 void
1564 GCCH_debug (struct CadetChannel *ch,
1565             enum GNUNET_ErrorType level)
1566 {
1567   int do_log;
1568
1569   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
1570                                        "cadet-chn",
1571                                        __FILE__, __FUNCTION__, __LINE__);
1572   if (0 == do_log)
1573     return;
1574
1575   if (NULL == ch)
1576   {
1577     LOG2 (level, "CHN *** DEBUG NULL CHANNEL ***\n");
1578     return;
1579   }
1580   LOG2 (level,
1581         "CHN %s:%X (%p)\n",
1582         GCT_2s (ch->t),
1583         ch->ctn,
1584         ch);
1585   if (NULL != ch->owner)
1586   {
1587     LOG2 (level,
1588           "CHN origin %s ready %s local-id: %u\n",
1589           GSC_2s (ch->owner->c),
1590           ch->owner->client_ready ? "YES" : "NO",
1591           ntohl (ch->owner->ccn.channel_of_client));
1592   }
1593   if (NULL != ch->dest)
1594   {
1595     LOG2 (level,
1596           "CHN destination %s ready %s local-id: %u\n",
1597           GSC_2s (ch->dest->c),
1598           ch->dest->client_ready ? "YES" : "NO",
1599           ntohl (ch->dest->ccn.channel_of_client));
1600   }
1601   LOG2 (level,
1602         "CHN  Message IDs recv: %d (%LLX), send: %d\n",
1603         ntohl (ch->mid_recv.mid),
1604         (unsigned long long) ch->mid_futures,
1605         ntohl (ch->mid_send.mid));
1606 }
1607
1608
1609
1610 /* end of gnunet-service-cadet-new_channel.c */