- cancel retry task if all pending messages are freed by bitfield ACK
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_channel.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21
22 #include "platform.h"
23 #include "gnunet_util_lib.h"
24
25 #include "gnunet_statistics_service.h"
26
27 #include "cadet.h"
28 #include "cadet_protocol.h"
29
30 #include "gnunet-service-cadet_channel.h"
31 #include "gnunet-service-cadet_local.h"
32 #include "gnunet-service-cadet_tunnel.h"
33 #include "gnunet-service-cadet_peer.h"
34
35 #define LOG(level, ...) GNUNET_log_from(level,"cadet-chn",__VA_ARGS__)
36
37 #define CADET_RETRANSMIT_TIME    GNUNET_TIME_relative_multiply(\
38                                     GNUNET_TIME_UNIT_MILLISECONDS, 250)
39 #define CADET_RETRANSMIT_MARGIN  4
40
41
42 /**
43  * All the states a connection can be in.
44  */
45 enum CadetChannelState
46 {
47   /**
48    * Uninitialized status, should never appear in operation.
49    */
50   CADET_CHANNEL_NEW,
51
52   /**
53    * Connection create message sent, waiting for ACK.
54    */
55   CADET_CHANNEL_SENT,
56
57   /**
58    * Connection confirmed, ready to carry traffic.
59    */
60   CADET_CHANNEL_READY,
61 };
62
63
64 /**
65  * Info holder for channel messages in queues.
66  */
67 struct CadetChannelQueue
68 {
69   /**
70    * Tunnel Queue.
71    */
72   struct CadetTunnelQueue *tq;
73
74   /**
75    * Message type (DATA/DATA_ACK)
76    */
77   uint16_t type;
78
79   /**
80    * Message copy (for DATAs, to start retransmission timer)
81    */
82   struct CadetReliableMessage *copy;
83
84   /**
85    * Reliability (for DATA_ACKs, to access rel->ack_q)
86    */
87   struct CadetChannelReliability *rel;
88 };
89
90
91 /**
92  * Info needed to retry a message in case it gets lost.
93  */
94 struct CadetReliableMessage
95 {
96     /**
97      * Double linked list, FIFO style
98      */
99   struct CadetReliableMessage   *next;
100   struct CadetReliableMessage   *prev;
101
102     /**
103      * Type of message (payload, channel management).
104      */
105   int16_t                       type;
106
107     /**
108      * Tunnel Reliability queue this message is in.
109      */
110   struct CadetChannelReliability *rel;
111
112     /**
113      * ID of the message (ACK needed to free)
114      */
115   uint32_t                      mid;
116
117   /**
118    * Tunnel Queue.
119    */
120   struct CadetChannelQueue      *chq;
121
122     /**
123      * When was this message issued (to calculate ACK delay)
124      */
125   struct GNUNET_TIME_Absolute   timestamp;
126
127   /* struct GNUNET_CADET_Data with payload */
128 };
129
130
131 /**
132  * Info about the traffic state for a client in a channel.
133  */
134 struct CadetChannelReliability
135 {
136     /**
137      * Channel this is about.
138      */
139   struct CadetChannel *ch;
140
141     /**
142      * DLL of messages sent and not yet ACK'd.
143      */
144   struct CadetReliableMessage        *head_sent;
145   struct CadetReliableMessage        *tail_sent;
146
147     /**
148      * DLL of messages received out of order.
149      */
150   struct CadetReliableMessage        *head_recv;
151   struct CadetReliableMessage        *tail_recv;
152
153     /**
154      * Messages received.
155      */
156   unsigned int                      n_recv;
157
158     /**
159      * Next MID to use for outgoing traffic.
160      */
161   uint32_t                          mid_send;
162
163     /**
164      * Next MID expected for incoming traffic.
165      */
166   uint32_t                          mid_recv;
167
168     /**
169      * Handle for queued unique data CREATE, DATA_ACK.
170      */
171   struct CadetChannelQueue           *uniq;
172
173     /**
174      * Can we send data to the client?
175      */
176   int                               client_ready;
177
178   /**
179    * Can the client send data to us?
180    */
181   int                               client_allowed;
182
183     /**
184      * Task to resend/poll in case no ACK is received.
185      */
186   struct GNUNET_SCHEDULER_Task *   retry_task;
187
188     /**
189      * Counter for exponential backoff.
190      */
191   struct GNUNET_TIME_Relative       retry_timer;
192
193     /**
194      * How long does it usually take to get an ACK.
195      */
196   struct GNUNET_TIME_Relative       expected_delay;
197 };
198
199
200 /**
201  * Struct containing all information regarding a channel to a remote client.
202  */
203 struct CadetChannel
204 {
205     /**
206      * Tunnel this channel is in.
207      */
208   struct CadetTunnel *t;
209
210     /**
211      * Destination port of the channel.
212      */
213   uint32_t port;
214
215     /**
216      * Global channel number ( < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
217      */
218   CADET_ChannelNumber gid;
219
220     /**
221      * Local tunnel number for root (owner) client.
222      * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
223      */
224   CADET_ChannelNumber lid_root;
225
226     /**
227      * Local tunnel number for local destination clients (incoming number)
228      * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV or 0).
229      */
230   CADET_ChannelNumber lid_dest;
231
232     /**
233      * Channel state.
234      */
235   enum CadetChannelState state;
236
237     /**
238      * Is the tunnel bufferless (minimum latency)?
239      */
240   int nobuffer;
241
242     /**
243      * Is the tunnel reliable?
244      */
245   int reliable;
246
247     /**
248      * Last time the channel was used
249      */
250   struct GNUNET_TIME_Absolute timestamp;
251
252     /**
253      * Client owner of the tunnel, if any
254      */
255   struct CadetClient *root;
256
257     /**
258      * Client destination of the tunnel, if any.
259      */
260   struct CadetClient *dest;
261
262     /**
263      * Flag to signal the destruction of the channel.
264      * If this is set GNUNET_YES the channel will be destroyed
265      * when the queue is empty.
266      */
267   int destroy;
268
269     /**
270      * Total (reliable) messages pending ACK for this channel.
271      */
272   unsigned int pending_messages;
273
274     /**
275      * Reliability data.
276      * Only present (non-NULL) at the owner of a tunnel.
277      */
278   struct CadetChannelReliability *root_rel;
279
280     /**
281      * Reliability data.
282      * Only present (non-NULL) at the destination of a tunnel.
283      */
284   struct CadetChannelReliability *dest_rel;
285
286 };
287
288
289 /******************************************************************************/
290 /*******************************   GLOBALS  ***********************************/
291 /******************************************************************************/
292
293 /**
294  * Global handle to the statistics service.
295  */
296 extern struct GNUNET_STATISTICS_Handle *stats;
297
298 /**
299  * Local peer own ID (memory efficient handle).
300  */
301 extern GNUNET_PEER_Id myid;
302
303
304 /******************************************************************************/
305 /********************************   STATIC  ***********************************/
306 /******************************************************************************/
307
308
309 /**
310  * Destroy a reliable message after it has been acknowledged, either by
311  * direct mid ACK or bitfield. Updates the appropriate data structures and
312  * timers and frees all memory.
313  *
314  * @param copy Message that is no longer needed: remote peer got it.
315  * @param update_time Is the timing information relevant?
316  *                    If this message is ACK in a batch the timing information
317  *                    is skewed by the retransmission, count only for the
318  *                    retransmitted message.
319  *
320  * @return #GNUNET_YES if channel was destroyed as a result of the call,
321  *         #GNUNET_NO otherwise.
322  */
323 static int
324 rel_message_free (struct CadetReliableMessage *copy, int update_time);
325
326 /**
327  * send a channel create message.
328  *
329  * @param ch Channel for which to send.
330  */
331 static void
332 send_create (struct CadetChannel *ch);
333
334 /**
335  * Confirm we got a channel create, FWD ack.
336  *
337  * @param ch The channel to confirm.
338  * @param fwd Should we send a FWD ACK? (going dest->root)
339  */
340 static void
341 send_ack (struct CadetChannel *ch, int fwd);
342
343
344
345 /**
346  * Test if the channel is loopback: both root and dest are on the local peer.
347  *
348  * @param ch Channel to test.
349  *
350  * @return #GNUNET_YES if channel is loopback, #GNUNET_NO otherwise.
351  */
352 static int
353 is_loopback (const struct CadetChannel *ch)
354 {
355   if (NULL != ch->t)
356     return GCT_is_loopback (ch->t);
357
358   return (NULL != ch->root && NULL != ch->dest);
359 }
360
361
362 /**
363  * Save a copy of the data message for later retransmission.
364  *
365  * @param msg Message to copy.
366  * @param mid Message ID.
367  * @param rel Reliability data for retransmission.
368  */
369 static struct CadetReliableMessage *
370 copy_message (const struct GNUNET_CADET_Data *msg, uint32_t mid,
371               struct CadetChannelReliability *rel)
372 {
373   struct CadetReliableMessage *copy;
374   uint16_t size;
375
376   size = ntohs (msg->header.size);
377   copy = GNUNET_malloc (sizeof (*copy) + size);
378   copy->mid = mid;
379   copy->rel = rel;
380   copy->type = GNUNET_MESSAGE_TYPE_CADET_DATA;
381   memcpy (&copy[1], msg, size);
382
383   return copy;
384 }
385
386 /**
387  * We have received a message out of order, or the client is not ready.
388  * Buffer it until we receive an ACK from the client or the missing
389  * message from the channel.
390  *
391  * @param msg Message to buffer (MUST be of type CADET_DATA).
392  * @param rel Reliability data to the corresponding direction.
393  */
394 static void
395 add_buffered_data (const struct GNUNET_CADET_Data *msg,
396                    struct CadetChannelReliability *rel)
397 {
398   struct CadetReliableMessage *copy;
399   struct CadetReliableMessage *prev;
400   uint32_t mid;
401
402   mid = ntohl (msg->mid);
403
404   LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data MID %u (%u)\n",
405        mid, rel->n_recv);
406
407   rel->n_recv++;
408
409   // FIXME do something better than O(n), although n < 64...
410   // FIXME start from the end (most messages are the latest ones)
411   for (prev = rel->head_recv; NULL != prev; prev = prev->next)
412   {
413     LOG (GNUNET_ERROR_TYPE_DEBUG, " prev %u\n", prev->mid);
414     if (prev->mid == mid)
415     {
416       LOG (GNUNET_ERROR_TYPE_DEBUG, " already there!\n");
417       rel->n_recv--;
418       return;
419     }
420     else if (GC_is_pid_bigger (prev->mid, mid))
421     {
422       LOG (GNUNET_ERROR_TYPE_DEBUG, " bingo!\n");
423       copy = copy_message (msg, mid, rel);
424       GNUNET_CONTAINER_DLL_insert_before (rel->head_recv, rel->tail_recv,
425                                           prev, copy);
426       return;
427     }
428   }
429   copy = copy_message (msg, mid, rel);
430   LOG (GNUNET_ERROR_TYPE_DEBUG, " insert at tail! (now: %u)\n", rel->n_recv);
431   GNUNET_CONTAINER_DLL_insert_tail (rel->head_recv, rel->tail_recv, copy);
432   LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data END\n");
433 }
434
435
436 /**
437  * Add a destination client to a channel, initializing all data structures
438  * in the channel and the client.
439  *
440  * @param ch Channel to which add the destination.
441  * @param c Client which to add to the channel.
442  */
443 static void
444 add_destination (struct CadetChannel *ch, struct CadetClient *c)
445 {
446   if (NULL != ch->dest)
447   {
448     GNUNET_break (0);
449     return;
450   }
451
452   /* Assign local id as destination */
453   ch->lid_dest = GML_get_next_chid (c);
454
455   /* Store in client's hashmap */
456   GML_channel_add (c, ch->lid_dest, ch);
457
458   GNUNET_break (NULL == ch->dest_rel);
459   ch->dest_rel = GNUNET_new (struct CadetChannelReliability);
460   ch->dest_rel->ch = ch;
461   ch->dest_rel->expected_delay.rel_value_us = 0;
462   ch->dest_rel->retry_timer = CADET_RETRANSMIT_TIME;
463
464   ch->dest = c;
465 }
466
467
468 /**
469  * Set options in a channel, extracted from a bit flag field.
470  *
471  * @param ch Channel to set options to.
472  * @param options Bit array in host byte order.
473  */
474 static void
475 channel_set_options (struct CadetChannel *ch, uint32_t options)
476 {
477   ch->nobuffer = (options & GNUNET_CADET_OPTION_NOBUFFER) != 0 ?
478   GNUNET_YES : GNUNET_NO;
479   ch->reliable = (options & GNUNET_CADET_OPTION_RELIABLE) != 0 ?
480   GNUNET_YES : GNUNET_NO;
481 }
482
483
484 /**
485  * Get a bit flag field with the options of a channel.
486  *
487  * @param ch Channel to get options from.
488  *
489  * @return Bit array in host byte order.
490  */
491 static uint32_t
492 channel_get_options (struct CadetChannel *ch)
493 {
494   uint32_t options;
495
496   options = 0;
497   if (ch->nobuffer)
498     options |= GNUNET_CADET_OPTION_NOBUFFER;
499   if (ch->reliable)
500     options |= GNUNET_CADET_OPTION_RELIABLE;
501
502   return options;
503 }
504
505
506 /**
507  * Notify a client that the channel is no longer valid.
508  *
509  * @param ch Channel that is destroyed.
510  * @param local_only Should we avoid sending it to other peers?
511  */
512 static void
513 send_destroy (struct CadetChannel *ch, int local_only)
514 {
515   struct GNUNET_CADET_ChannelManage msg;
516
517   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY);
518   msg.header.size = htons (sizeof (msg));
519   msg.chid = htonl (ch->gid);
520
521   /* If root is not NULL, notify.
522    * If it's NULL, check lid_root. When a local destroy comes in, root
523    * is set to NULL but lid_root is left untouched. In this case, do nothing,
524    * the client is the one who requested the channel to be destroyed.
525    */
526   if (NULL != ch->root)
527     GML_send_channel_destroy (ch->root, ch->lid_root);
528   else if (0 == ch->lid_root && GNUNET_NO == local_only)
529     GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
530
531   if (NULL != ch->dest)
532     GML_send_channel_destroy (ch->dest, ch->lid_dest);
533   else if (0 == ch->lid_dest && GNUNET_NO == local_only)
534     GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES, NULL);
535 }
536
537
538 /**
539  * Notify the destination client that a new incoming channel was created.
540  *
541  * @param ch Channel that was created.
542  */
543 static void
544 send_client_create (struct CadetChannel *ch)
545 {
546   uint32_t opt;
547
548   if (NULL == ch->dest)
549     return;
550
551   opt = 0;
552   opt |= GNUNET_YES == ch->reliable ? GNUNET_CADET_OPTION_RELIABLE : 0;
553   opt |= GNUNET_YES == ch->nobuffer ? GNUNET_CADET_OPTION_NOBUFFER : 0;
554   GML_send_channel_create (ch->dest, ch->lid_dest, ch->port, opt,
555                            GCT_get_destination (ch->t));
556
557 }
558
559
560 /**
561  * Send data to a client.
562  *
563  * If the client is ready, send directly, otherwise buffer while listening
564  * for a local ACK.
565  *
566  * @param ch Channel
567  * @param msg Message.
568  * @param fwd Is this a fwd (root->dest) message?
569  */
570 static void
571 send_client_data (struct CadetChannel *ch,
572                   const struct GNUNET_CADET_Data *msg,
573                   int fwd)
574 {
575   if (fwd)
576   {
577     if (ch->dest_rel->client_ready)
578     {
579       GML_send_data (ch->dest, msg, ch->lid_dest);
580       ch->dest_rel->client_ready = GNUNET_NO;
581       ch->dest_rel->mid_recv++;
582     }
583     else
584       add_buffered_data (msg, ch->dest_rel);
585   }
586   else
587   {
588     if (ch->root_rel->client_ready)
589     {
590       GML_send_data (ch->root, msg, ch->lid_root);
591       ch->root_rel->client_ready = GNUNET_NO;
592       ch->root_rel->mid_recv++;
593     }
594     else
595       add_buffered_data (msg, ch->root_rel);
596   }
597 }
598
599
600 /**
601  * Send a buffered message to the client, for in order delivery or
602  * as result of client ACK.
603  *
604  * @param ch Channel on which to empty the message buffer.
605  * @param c Client to send to.
606  * @param fwd Is this to send FWD data?.
607  */
608 static void
609 send_client_buffered_data (struct CadetChannel *ch,
610                            struct CadetClient *c,
611                            int fwd)
612 {
613   struct CadetReliableMessage *copy;
614   struct CadetChannelReliability *rel;
615
616   LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n");
617   rel = fwd ? ch->dest_rel : ch->root_rel;
618   if (GNUNET_NO == rel->client_ready)
619   {
620     LOG (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n");
621     return;
622   }
623
624   copy = rel->head_recv;
625   /* We never buffer channel management messages */
626   if (NULL != copy)
627   {
628     if (copy->mid == rel->mid_recv || GNUNET_NO == ch->reliable)
629     {
630       struct GNUNET_CADET_Data *msg = (struct GNUNET_CADET_Data *) &copy[1];
631
632       LOG (GNUNET_ERROR_TYPE_DEBUG, " have %u! now expecting %u\n",
633            copy->mid, rel->mid_recv + 1);
634       send_client_data (ch, msg, fwd);
635       rel->n_recv--;
636       GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
637       LOG (GNUNET_ERROR_TYPE_DEBUG, " free copy recv MID %u (%p), %u left\n",
638            copy->mid, copy, rel->n_recv);
639       GNUNET_free (copy);
640       GCCH_send_data_ack (ch, fwd);
641     }
642     else
643     {
644       LOG (GNUNET_ERROR_TYPE_DEBUG, " reliable && don't have %u, next is %u\n",
645            rel->mid_recv, copy->mid);
646       if (GNUNET_YES == ch->destroy)
647       {
648         /* We don't have the next data piece and the remote peer has closed the
649          * channel. We won't receive it anymore, so just destroy the channel.
650          * FIXME: wait some time to allow other connections to
651          *        deliver missing messages
652          */
653         send_destroy (ch, GNUNET_YES);
654         GCCH_destroy (ch);
655       }
656     }
657   }
658   LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data END\n");
659 }
660
661
662 /**
663  * Allow a client to send more data.
664  *
665  * In case the client was already allowed to send data, do nothing.
666  *
667  * @param ch Channel.
668  * @param fwd Is this a FWD ACK? (FWD ACKs are sent to root)
669  */
670 static void
671 send_client_ack (struct CadetChannel *ch, int fwd)
672 {
673   struct CadetChannelReliability *rel = fwd ? ch->root_rel : ch->dest_rel;
674   struct CadetClient *c = fwd ? ch->root : ch->dest;
675
676   if (NULL == c)
677   {
678     GNUNET_break (GNUNET_NO != ch->destroy);
679     return;
680   }
681   LOG (GNUNET_ERROR_TYPE_DEBUG,
682        "  sending %s ack to client on channel %s\n",
683        GC_f2s (fwd), GCCH_2s (ch));
684
685   if (NULL == rel)
686   {
687     GNUNET_break (0);
688     return;
689   }
690
691   if (GNUNET_YES == rel->client_allowed)
692   {
693     LOG (GNUNET_ERROR_TYPE_DEBUG, "  already allowed\n");
694     return;
695   }
696   rel->client_allowed = GNUNET_YES;
697
698   GML_send_ack (c, fwd ? ch->lid_root : ch->lid_dest);
699 }
700
701
702 /**
703  * Notify the root that the destination rejected the channel.
704  *
705  * @param ch Rejected channel.
706  */
707 static void
708 send_client_nack (struct CadetChannel *ch)
709 {
710   if (NULL == ch->root)
711   {
712     GNUNET_break (0);
713     return;
714   }
715   GML_send_channel_nack (ch->root, ch->lid_root);
716 }
717
718
719 /**
720  * We haven't received an ACK after a certain time: restransmit the message.
721  *
722  * @param cls Closure (CadetChannelReliability with the message to restransmit)
723  * @param tc TaskContext.
724  */
725 static void
726 channel_retransmit_message (void *cls,
727                             const struct GNUNET_SCHEDULER_TaskContext *tc)
728 {
729   struct CadetChannelReliability *rel = cls;
730   struct CadetReliableMessage *copy;
731   struct CadetChannel *ch;
732   struct GNUNET_CADET_Data *payload;
733   int fwd;
734
735   rel->retry_task = NULL;
736   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
737     return;
738
739   ch = rel->ch;
740   copy = rel->head_sent;
741   if (NULL == copy)
742   {
743     GNUNET_break (0); // FIXME tripped in rps testcase
744     return;
745   }
746
747   payload = (struct GNUNET_CADET_Data *) &copy[1];
748   fwd = (rel == ch->root_rel);
749
750   /* Message not found in the queue that we are going to use. */
751   LOG (GNUNET_ERROR_TYPE_DEBUG, "RETRANSMIT MID %u\n", copy->mid);
752
753   GCCH_send_prebuilt_message (&payload->header, ch, fwd, copy);
754   GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
755 }
756
757
758 /**
759  * We haven't received an Channel ACK after a certain time: resend the CREATE.
760  *
761  * @param cls Closure (CadetChannelReliability of the channel to recreate)
762  * @param tc TaskContext.
763  */
764 static void
765 channel_recreate (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
766 {
767   struct CadetChannelReliability *rel = cls;
768
769   rel->retry_task = NULL;
770   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
771     return;
772
773   LOG (GNUNET_ERROR_TYPE_DEBUG, "RE-CREATE\n");
774   GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
775
776   if (rel == rel->ch->root_rel)
777   {
778     send_create (rel->ch);
779   }
780   else if (rel == rel->ch->dest_rel)
781   {
782     send_ack (rel->ch, GNUNET_YES);
783   }
784   else
785   {
786     GNUNET_break (0);
787   }
788
789 }
790
791
792 /**
793  * Message has been sent: start retransmission timer.
794  *
795  * @param cls Closure (queue structure).
796  * @param t Tunnel.
797  * @param q Queue handler (no longer valid).
798  * @param type Type of message.
799  * @param size Size of the message.
800  */
801 static void
802 ch_message_sent (void *cls,
803                  struct CadetTunnel *t,
804                  struct CadetTunnelQueue *q,
805                  uint16_t type, size_t size)
806 {
807   struct CadetChannelQueue *chq = cls;
808   struct CadetReliableMessage *copy = chq->copy;
809   struct CadetChannelReliability *rel;
810
811   LOG (GNUNET_ERROR_TYPE_DEBUG, "channel_message_sent callback %s\n",
812        GC_m2s (chq->type));
813
814   switch (chq->type)
815   {
816     case GNUNET_MESSAGE_TYPE_CADET_DATA:
817       LOG (GNUNET_ERROR_TYPE_DEBUG, "data MID %u sent\n", copy->mid);
818       GNUNET_assert (chq == copy->chq);
819       copy->timestamp = GNUNET_TIME_absolute_get ();
820       rel = copy->rel;
821       if (NULL == rel->retry_task)
822       {
823         LOG (GNUNET_ERROR_TYPE_DEBUG, "  scheduling retry in %d * %s\n",
824              CADET_RETRANSMIT_MARGIN,
825              GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
826                                                      GNUNET_YES));
827         if (0 != rel->expected_delay.rel_value_us)
828         {
829           rel->retry_timer =
830               GNUNET_TIME_relative_multiply (rel->expected_delay,
831                                              CADET_RETRANSMIT_MARGIN);
832         }
833         else
834         {
835           rel->retry_timer = CADET_RETRANSMIT_TIME;
836         }
837         LOG (GNUNET_ERROR_TYPE_DEBUG, "  using delay %s\n",
838              GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
839                                                      GNUNET_NO));
840         rel->retry_task =
841             GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
842                                           &channel_retransmit_message, rel);
843       }
844       else
845       {
846         LOG (GNUNET_ERROR_TYPE_DEBUG, "retry running %p\n", rel->retry_task);
847       }
848       copy->chq = NULL;
849       break;
850
851
852     case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
853     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
854     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
855       LOG (GNUNET_ERROR_TYPE_DEBUG, "sent %s\n", GC_m2s (chq->type));
856       rel = chq->rel;
857       GNUNET_assert (rel->uniq == chq);
858       rel->uniq = NULL;
859
860       if (CADET_CHANNEL_READY != rel->ch->state
861           && GNUNET_MESSAGE_TYPE_CADET_DATA_ACK != type
862           && GNUNET_NO == rel->ch->destroy)
863       {
864         GNUNET_assert (NULL == rel->retry_task);
865         LOG (GNUNET_ERROR_TYPE_DEBUG, "STD BACKOFF %s\n",
866              GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
867                                                      GNUNET_NO));
868         rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer);
869         rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
870                                                         &channel_recreate, rel);
871       }
872       break;
873
874     default:
875       GNUNET_break (0);
876   }
877
878   GNUNET_free (chq);
879 }
880
881
882 /**
883  * send a channel create message.
884  *
885  * @param ch Channel for which to send.
886  */
887 static void
888 send_create (struct CadetChannel *ch)
889 {
890   struct GNUNET_CADET_ChannelCreate msgcc;
891
892   msgcc.header.size = htons (sizeof (msgcc));
893   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE);
894   msgcc.chid = htonl (ch->gid);
895   msgcc.port = htonl (ch->port);
896   msgcc.opt = htonl (channel_get_options (ch));
897
898   GCCH_send_prebuilt_message (&msgcc.header, ch, GNUNET_YES, NULL);
899 }
900
901
902 /**
903  * Confirm we got a channel create or FWD ack.
904  *
905  * @param ch The channel to confirm.
906  * @param fwd Should we send a FWD ACK? (going dest->root)
907  */
908 static void
909 send_ack (struct CadetChannel *ch, int fwd)
910 {
911   struct GNUNET_CADET_ChannelManage msg;
912
913   msg.header.size = htons (sizeof (msg));
914   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK);
915   LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending channel %s ack for channel %s\n",
916        GC_f2s (fwd), GCCH_2s (ch));
917
918   msg.chid = htonl (ch->gid);
919   GCCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL);
920 }
921
922
923 /**
924  * Send a message and don't keep any info about it: we won't need to cancel it
925  * or resend it.
926  *
927  * @param msg Header of the message to fire away.
928  * @param ch Channel on which the message should go.
929  * @param force Is this a forced (undroppable) message?
930  */
931 static void
932 fire_and_forget (const struct GNUNET_MessageHeader *msg,
933                  struct CadetChannel *ch,
934                  int force)
935 {
936   GNUNET_break (NULL == GCT_send_prebuilt_message (msg, ch->t, NULL,
937                                                    force, NULL, NULL));
938 }
939
940
941 /**
942  * Notify that a channel create didn't succeed.
943  *
944  * @param ch The channel to reject.
945  */
946 static void
947 send_nack (struct CadetChannel *ch)
948 {
949   struct GNUNET_CADET_ChannelManage msg;
950
951   msg.header.size = htons (sizeof (msg));
952   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK);
953   LOG (GNUNET_ERROR_TYPE_DEBUG,
954        "  sending channel NACK for channel %s\n",
955        GCCH_2s (ch));
956
957   msg.chid = htonl (ch->gid);
958   GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
959 }
960
961
962 /**
963  * Destroy all reliable messages queued for a channel,
964  * during a channel destruction.
965  * Frees the reliability structure itself.
966  *
967  * @param rel Reliability data for a channel.
968  */
969 static void
970 channel_rel_free_all (struct CadetChannelReliability *rel)
971 {
972   struct CadetReliableMessage *copy;
973   struct CadetReliableMessage *next;
974
975   if (NULL == rel)
976     return;
977
978   for (copy = rel->head_recv; NULL != copy; copy = next)
979   {
980     next = copy->next;
981     GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
982     LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE ALL RECV %p\n", copy);
983     GNUNET_break (NULL == copy->chq);
984     GNUNET_free (copy);
985   }
986   for (copy = rel->head_sent; NULL != copy; copy = next)
987   {
988     next = copy->next;
989     GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
990     LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE ALL SEND %p\n", copy);
991     if (NULL != copy->chq)
992     {
993       if (NULL != copy->chq->tq)
994       {
995         GCT_cancel (copy->chq->tq);
996         /* ch_message_sent will free copy->q */
997       }
998       else
999       {
1000         GNUNET_free (copy->chq);
1001         GNUNET_break (0);
1002       }
1003     }
1004     GNUNET_free (copy);
1005   }
1006   if (NULL != rel->uniq && NULL != rel->uniq->tq)
1007   {
1008     GCT_cancel (rel->uniq->tq);
1009     /* ch_message_sent is called freeing uniq */
1010   }
1011   if (NULL != rel->retry_task)
1012   {
1013     GNUNET_SCHEDULER_cancel (rel->retry_task);
1014     rel->retry_task = NULL;
1015   }
1016   GNUNET_free (rel);
1017 }
1018
1019
1020 /**
1021  * Mark future messages as ACK'd.
1022  *
1023  * @param rel Reliability data.
1024  * @param msg DataACK message with a bitfield of future ACK'd messages.
1025  *
1026  * @return How many messages have been freed.
1027  */
1028 static unsigned int
1029 channel_rel_free_sent (struct CadetChannelReliability *rel,
1030                        const struct GNUNET_CADET_DataACK *msg)
1031 {
1032   struct CadetReliableMessage *copy;
1033   struct CadetReliableMessage *next;
1034   uint64_t bitfield;
1035   uint64_t mask;
1036   uint32_t mid;
1037   uint32_t target;
1038   unsigned int i;
1039   unsigned int r;
1040
1041   bitfield = msg->futures;
1042   mid = ntohl (msg->mid);
1043   LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable %u %llX\n", mid, bitfield);
1044   LOG (GNUNET_ERROR_TYPE_DEBUG, " rel %p, head %p\n", rel, rel->head_sent);
1045   for (i = 0, r = 0, copy = rel->head_sent;
1046        i < 64 && NULL != copy && 0 != bitfield;
1047        i++)
1048   {
1049     LOG (GNUNET_ERROR_TYPE_DEBUG, " trying bit %u (mid %u)\n", i, mid + i + 1);
1050     mask = 0x1LL << i;
1051     if (0 == (bitfield & mask))
1052      continue;
1053
1054     LOG (GNUNET_ERROR_TYPE_DEBUG, " set!\n");
1055     /* Bit was set, clear the bit from the bitfield */
1056     bitfield &= ~mask;
1057
1058     /* The i-th bit was set. Do we have that copy? */
1059     /* Skip copies with mid < target */
1060     target = mid + i + 1;
1061     LOG (GNUNET_ERROR_TYPE_DEBUG, " target %u\n", target);
1062     while (NULL != copy && GC_is_pid_bigger (target, copy->mid))
1063       copy = copy->next;
1064
1065     /* Did we run out of copies? (previously freed, it's ok) */
1066     if (NULL == copy)
1067     {
1068       LOG (GNUNET_ERROR_TYPE_DEBUG, "run out of copies...\n");
1069       return r;
1070     }
1071
1072     /* Did we overshoot the target? (previously freed, it's ok) */
1073     if (GC_is_pid_bigger (copy->mid, target))
1074     {
1075       LOG (GNUNET_ERROR_TYPE_DEBUG, " next copy %u\n", copy->mid);
1076       i += copy->mid - target - 1;   /* MID: 90, t = 85, i += 4 (i++ later) */
1077       mask = (0x1LL << (i + 1)) - 1; /* Mask = i-th bit and all before */
1078       bitfield &= ~mask;             /* Clear all bits up to MID - 1 */
1079       continue;
1080     }
1081
1082     /* Now copy->mid == target, free it */
1083     next = copy->next;
1084     GNUNET_break (GNUNET_YES != rel_message_free (copy, GNUNET_YES));
1085     r++;
1086     copy = next;
1087   }
1088   LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable END\n");
1089   return r;
1090 }
1091
1092
1093 /**
1094  * Destroy a reliable message after it has been acknowledged, either by
1095  * direct mid ACK or bitfield. Updates the appropriate data structures and
1096  * timers and frees all memory.
1097  *
1098  * @param copy Message that is no longer needed: remote peer got it.
1099  * @param update_time Is the timing information relevant?
1100  *                    If this message is ACK in a batch the timing information
1101  *                    is skewed by the retransmission, count only for the
1102  *                    retransmitted message.
1103  *
1104  * @return #GNUNET_YES if channel was destroyed as a result of the call,
1105  *         #GNUNET_NO otherwise.
1106  */
1107 static int
1108 rel_message_free (struct CadetReliableMessage *copy, int update_time)
1109 {
1110   struct CadetChannelReliability *rel;
1111   struct GNUNET_TIME_Relative time;
1112
1113   rel = copy->rel;
1114   LOG (GNUNET_ERROR_TYPE_DEBUG, "Freeing %u\n", copy->mid);
1115   if (GNUNET_YES == update_time)
1116   {
1117     time = GNUNET_TIME_absolute_get_duration (copy->timestamp);
1118     if (0 == rel->expected_delay.rel_value_us)
1119       rel->expected_delay = time;
1120     else
1121     {
1122       rel->expected_delay.rel_value_us *= 7;
1123       rel->expected_delay.rel_value_us += time.rel_value_us;
1124       rel->expected_delay.rel_value_us /= 8;
1125     }
1126     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message time %12s\n",
1127          GNUNET_STRINGS_relative_time_to_string (time, GNUNET_NO));
1128     LOG (GNUNET_ERROR_TYPE_DEBUG, "  new delay    %12s\n",
1129          GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
1130                                                  GNUNET_NO));
1131     rel->retry_timer = rel->expected_delay;
1132   }
1133   else
1134   {
1135     LOG (GNUNET_ERROR_TYPE_DEBUG, "batch free, ignoring timing\n");
1136   }
1137   rel->ch->pending_messages--;
1138   if (NULL != copy->chq)
1139   {
1140     GCT_cancel (copy->chq->tq);
1141     /* copy->q is set to NULL by ch_message_sent */
1142   }
1143   GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
1144   LOG (GNUNET_ERROR_TYPE_DEBUG, " free send copy MID %u at %p\n",
1145        copy->mid, copy);
1146   GNUNET_free (copy);
1147
1148   if (GNUNET_NO != rel->ch->destroy && 0 == rel->ch->pending_messages)
1149   {
1150     GCCH_destroy (rel->ch);
1151     return GNUNET_YES;
1152   }
1153   return GNUNET_NO;
1154 }
1155
1156
1157 /**
1158  * Channel was ACK'd by remote peer, mark as ready and cancel retransmission.
1159  *
1160  * @param ch Channel to mark as ready.
1161  * @param fwd Was the ACK message a FWD ACK? (dest->root, SYNACK)
1162  */
1163 static void
1164 channel_confirm (struct CadetChannel *ch, int fwd)
1165 {
1166   struct CadetChannelReliability *rel;
1167   enum CadetChannelState oldstate;
1168
1169   rel = fwd ? ch->root_rel : ch->dest_rel;
1170   if (NULL == rel)
1171   {
1172     GNUNET_break (GNUNET_NO != ch->destroy);
1173     return;
1174   }
1175   LOG (GNUNET_ERROR_TYPE_DEBUG, "  channel confirm %s %s\n",
1176        GC_f2s (fwd), GCCH_2s (ch));
1177   oldstate = ch->state;
1178   ch->state = CADET_CHANNEL_READY;
1179
1180   if (CADET_CHANNEL_READY != oldstate || GNUNET_YES == is_loopback (ch))
1181   {
1182     rel->client_ready = GNUNET_YES;
1183     rel->expected_delay = rel->retry_timer;
1184     LOG (GNUNET_ERROR_TYPE_DEBUG, "  confirm retry timer %s\n",
1185          GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, GNUNET_NO));
1186     if (GCT_get_connections_buffer (ch->t) > 0 || GCT_is_loopback (ch->t))
1187       send_client_ack (ch, fwd);
1188
1189     if (NULL != rel->retry_task)
1190     {
1191       GNUNET_SCHEDULER_cancel (rel->retry_task);
1192       rel->retry_task = NULL;
1193     }
1194     else if (NULL != rel->uniq)
1195     {
1196       GCT_cancel (rel->uniq->tq);
1197       /* ch_message_sent will free and NULL uniq */
1198     }
1199     else if (GNUNET_NO == is_loopback (ch))
1200     {
1201       /* We SHOULD have been trying to retransmit this! */
1202       GNUNET_break (0);
1203     }
1204   }
1205
1206   /* In case of a FWD ACK (SYNACK) send a BCK ACK (ACK). */
1207   if (GNUNET_YES == fwd)
1208     send_ack (ch, GNUNET_NO);
1209 }
1210
1211
1212 /**
1213  * Save a copy to retransmit in case it gets lost.
1214  *
1215  * Initializes all needed callbacks and timers.
1216  *
1217  * @param ch Channel this message goes on.
1218  * @param msg Message to copy.
1219  * @param fwd Is this fwd traffic?
1220  */
1221 static struct CadetReliableMessage *
1222 channel_save_copy (struct CadetChannel *ch,
1223                    const struct GNUNET_MessageHeader *msg,
1224                    int fwd)
1225 {
1226   struct CadetChannelReliability *rel;
1227   struct CadetReliableMessage *copy;
1228   uint32_t mid;
1229   uint16_t type;
1230   uint16_t size;
1231
1232   rel = fwd ? ch->root_rel : ch->dest_rel;
1233   mid = rel->mid_send - 1;
1234   type = ntohs (msg->type);
1235   size = ntohs (msg->size);
1236
1237   LOG (GNUNET_ERROR_TYPE_DEBUG, "save MID %u %s\n", mid, GC_m2s (type));
1238   copy = GNUNET_malloc (sizeof (struct CadetReliableMessage) + size);
1239   LOG (GNUNET_ERROR_TYPE_DEBUG, "  at %p\n", copy);
1240   copy->mid = mid;
1241   copy->rel = rel;
1242   copy->type = type;
1243   memcpy (&copy[1], msg, size);
1244   GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy);
1245   ch->pending_messages++;
1246
1247   return copy;
1248 }
1249
1250
1251 /**
1252  * Create a new channel.
1253  *
1254  * @param t Tunnel this channel is in.
1255  * @param owner Client that owns the channel, NULL for foreign channels.
1256  * @param lid_root Local ID for root client.
1257  *
1258  * @return A new initialized channel. NULL on error.
1259  */
1260 static struct CadetChannel *
1261 channel_new (struct CadetTunnel *t,
1262              struct CadetClient *owner,
1263              CADET_ChannelNumber lid_root)
1264 {
1265   struct CadetChannel *ch;
1266
1267   ch = GNUNET_new (struct CadetChannel);
1268   ch->root = owner;
1269   ch->lid_root = lid_root;
1270   ch->t = t;
1271
1272   GNUNET_STATISTICS_update (stats, "# channels", 1, GNUNET_NO);
1273
1274   if (NULL != owner)
1275   {
1276     ch->gid = GCT_get_next_chid (t);
1277     GML_channel_add (owner, lid_root, ch);
1278   }
1279   GCT_add_channel (t, ch);
1280
1281   return ch;
1282 }
1283
1284
1285 /**
1286  * Handle a loopback message: call the appropriate handler for the message type.
1287  *
1288  * @param ch Channel this message is on.
1289  * @param msgh Message header.
1290  * @param fwd Is this FWD traffic?
1291  */
1292 void
1293 handle_loopback (struct CadetChannel *ch,
1294                  const struct GNUNET_MessageHeader *msgh,
1295                  int fwd)
1296 {
1297   uint16_t type;
1298
1299   type = ntohs (msgh->type);
1300   LOG (GNUNET_ERROR_TYPE_DEBUG,
1301        "Loopback %s %s message!\n",
1302        GC_f2s (fwd), GC_m2s (type));
1303
1304   switch (type)
1305   {
1306     case GNUNET_MESSAGE_TYPE_CADET_DATA:
1307       /* Don't send hop ACK, wait for client to ACK */
1308       LOG (GNUNET_ERROR_TYPE_DEBUG, "SEND loopback %u (%u)\n",
1309            ntohl (((struct GNUNET_CADET_Data *) msgh)->mid), ntohs (msgh->size));
1310       GCCH_handle_data (ch, (struct GNUNET_CADET_Data *) msgh, fwd);
1311       break;
1312
1313     case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
1314       GCCH_handle_data_ack (ch, (struct GNUNET_CADET_DataACK *) msgh, fwd);
1315       break;
1316
1317     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
1318       GCCH_handle_create (ch->t,
1319                           (struct GNUNET_CADET_ChannelCreate *) msgh);
1320       break;
1321
1322     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
1323       GCCH_handle_ack (ch,
1324                        (struct GNUNET_CADET_ChannelManage *) msgh,
1325                        fwd);
1326       break;
1327
1328     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
1329       GCCH_handle_nack (ch);
1330       break;
1331
1332     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1333       GCCH_handle_destroy (ch,
1334                            (struct GNUNET_CADET_ChannelManage *) msgh,
1335                            fwd);
1336       break;
1337
1338     default:
1339       GNUNET_break_op (0);
1340       LOG (GNUNET_ERROR_TYPE_DEBUG,
1341            "end-to-end message not known (%u)\n",
1342            ntohs (msgh->type));
1343   }
1344 }
1345
1346
1347
1348 /******************************************************************************/
1349 /********************************    API    ***********************************/
1350 /******************************************************************************/
1351
1352 /**
1353  * Destroy a channel and free all resources.
1354  *
1355  * @param ch Channel to destroy.
1356  */
1357 void
1358 GCCH_destroy (struct CadetChannel *ch)
1359 {
1360   struct CadetClient *c;
1361   struct CadetTunnel *t;
1362
1363   if (NULL == ch)
1364     return;
1365   if (2 == ch->destroy)
1366     return; /* recursive call */
1367   ch->destroy = 2;
1368
1369   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying channel %s:%u\n",
1370               GCT_2s (ch->t), ch->gid);
1371   GCCH_debug (ch);
1372
1373   c = ch->root;
1374   if (NULL != c)
1375   {
1376     GML_channel_remove (c, ch->lid_root, ch);
1377   }
1378
1379   c = ch->dest;
1380   if (NULL != c)
1381   {
1382     GML_channel_remove (c, ch->lid_dest, ch);
1383   }
1384
1385   channel_rel_free_all (ch->root_rel);
1386   channel_rel_free_all (ch->dest_rel);
1387
1388   t = ch->t;
1389   GCT_remove_channel (t, ch);
1390   GNUNET_STATISTICS_update (stats, "# channels", -1, GNUNET_NO);
1391
1392   GNUNET_free (ch);
1393   GCT_destroy_if_empty (t);
1394 }
1395
1396
1397 /**
1398  * Get the channel's public ID.
1399  *
1400  * @param ch Channel.
1401  *
1402  * @return ID used to identify the channel with the remote peer.
1403  */
1404 CADET_ChannelNumber
1405 GCCH_get_id (const struct CadetChannel *ch)
1406 {
1407   return ch->gid;
1408 }
1409
1410
1411 /**
1412  * Get the channel tunnel.
1413  *
1414  * @param ch Channel to get the tunnel from.
1415  *
1416  * @return tunnel of the channel.
1417  */
1418 struct CadetTunnel *
1419 GCCH_get_tunnel (const struct CadetChannel *ch)
1420 {
1421   return ch->t;
1422 }
1423
1424
1425 /**
1426  * Get free buffer space towards the client on a specific channel.
1427  *
1428  * @param ch Channel.
1429  * @param fwd Is query about FWD traffic?
1430  *
1431  * @return Free buffer space [0 - 64]
1432  */
1433 unsigned int
1434 GCCH_get_buffer (struct CadetChannel *ch, int fwd)
1435 {
1436   struct CadetChannelReliability *rel;
1437
1438   rel = fwd ? ch->dest_rel : ch->root_rel;
1439   LOG (GNUNET_ERROR_TYPE_DEBUG, "   get buffer, channel %s\n", GCCH_2s (ch));
1440   GCCH_debug (ch);
1441   /* If rel is NULL it means that the end is not yet created,
1442    * most probably is a loopback channel at the point of sending
1443    * the ChannelCreate to itself.
1444    */
1445   if (NULL == rel)
1446   {
1447     LOG (GNUNET_ERROR_TYPE_DEBUG, "  rel is NULL: max\n");
1448     return 64;
1449   }
1450
1451   LOG (GNUNET_ERROR_TYPE_DEBUG, "   n_recv %d\n", rel->n_recv);
1452   return (64 - rel->n_recv);
1453 }
1454
1455
1456 /**
1457  * Get flow control status of end point: is client allow to send?
1458  *
1459  * @param ch Channel.
1460  * @param fwd Is query about FWD traffic? (Request root status).
1461  *
1462  * @return #GNUNET_YES if client is allowed to send us data.
1463  */
1464 int
1465 GCCH_get_allowed (struct CadetChannel *ch, int fwd)
1466 {
1467   struct CadetChannelReliability *rel;
1468
1469   rel = fwd ? ch->root_rel : ch->dest_rel;
1470
1471   if (NULL == rel)
1472   {
1473     /* Probably shutting down: root/dest NULL'ed to mark disconnection */
1474     GNUNET_break (GNUNET_NO != ch->destroy);
1475     return 0;
1476   }
1477
1478   return rel->client_allowed;
1479 }
1480
1481
1482 /**
1483  * Is the root client for this channel on this peer?
1484  *
1485  * @param ch Channel.
1486  * @param fwd Is this for fwd traffic?
1487  *
1488  * @return #GNUNET_YES in case it is.
1489  */
1490 int
1491 GCCH_is_origin (struct CadetChannel *ch, int fwd)
1492 {
1493   struct CadetClient *c;
1494
1495   c = fwd ? ch->root : ch->dest;
1496   return NULL != c;
1497 }
1498
1499
1500 /**
1501  * Is the destination client for this channel on this peer?
1502  *
1503  * @param ch Channel.
1504  * @param fwd Is this for fwd traffic?
1505  *
1506  * @return #GNUNET_YES in case it is.
1507  */
1508 int
1509 GCCH_is_terminal (struct CadetChannel *ch, int fwd)
1510 {
1511   struct CadetClient *c;
1512
1513   c = fwd ? ch->dest : ch->root;
1514   return NULL != c;
1515 }
1516
1517
1518 /**
1519  * Send an end-to-end ACK message for the most recent in-sequence payload.
1520  *
1521  * If channel is not reliable, do nothing.
1522  *
1523  * @param ch Channel this is about.
1524  * @param fwd Is for FWD traffic? (ACK dest->owner)
1525  */
1526 void
1527 GCCH_send_data_ack (struct CadetChannel *ch, int fwd)
1528 {
1529   struct GNUNET_CADET_DataACK msg;
1530   struct CadetChannelReliability *rel;
1531   struct CadetReliableMessage *copy;
1532   unsigned int delta;
1533   uint64_t mask;
1534   uint32_t ack;
1535
1536   if (GNUNET_NO == ch->reliable)
1537     return;
1538
1539   rel = fwd ? ch->dest_rel : ch->root_rel;
1540   ack = rel->mid_recv - 1;
1541
1542   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_DATA_ACK);
1543   msg.header.size = htons (sizeof (msg));
1544   msg.chid = htonl (ch->gid);
1545   msg.mid = htonl (ack);
1546
1547   msg.futures = 0LL;
1548   for (copy = rel->head_recv; NULL != copy; copy = copy->next)
1549   {
1550     if (copy->type != GNUNET_MESSAGE_TYPE_CADET_DATA)
1551     {
1552       LOG (GNUNET_ERROR_TYPE_DEBUG, " Type %s, expected DATA\n",
1553            GC_m2s (copy->type));
1554       continue;
1555     }
1556     GNUNET_assert (GC_is_pid_bigger(copy->mid, ack));
1557     delta = copy->mid - (ack + 1);
1558     if (63 < delta)
1559       break;
1560     mask = 0x1LL << delta;
1561     msg.futures |= mask;
1562     LOG (GNUNET_ERROR_TYPE_DEBUG,
1563          " setting bit for %u (delta %u) (%llX) -> %llX\n",
1564          copy->mid, delta, mask, msg.futures);
1565   }
1566   LOG (GNUNET_ERROR_TYPE_INFO, "===> DATA_ACK for %u + %llX\n",
1567        ack, msg.futures);
1568
1569   GCCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL);
1570   LOG (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n");
1571 }
1572
1573
1574 /**
1575  * Allow a client to send us more data, in case it was choked.
1576  *
1577  * @param ch Channel.
1578  * @param fwd Is this about FWD traffic? (Root client).
1579  */
1580 void
1581 GCCH_allow_client (struct CadetChannel *ch, int fwd)
1582 {
1583   struct CadetChannelReliability *rel;
1584   unsigned int buffer;
1585
1586   LOG (GNUNET_ERROR_TYPE_DEBUG, "GMCH allow\n");
1587
1588   if (CADET_CHANNEL_READY != ch->state)
1589   {
1590     LOG (GNUNET_ERROR_TYPE_DEBUG, " channel not ready yet!\n");
1591     return;
1592   }
1593
1594   if (GNUNET_YES == ch->reliable)
1595   {
1596     rel = fwd ? ch->root_rel : ch->dest_rel;
1597     if (NULL == rel)
1598     {
1599       GNUNET_break (GNUNET_NO != ch->destroy);
1600       return;
1601     }
1602     if (NULL != rel->head_sent)
1603     {
1604       if (64 <= rel->mid_send - rel->head_sent->mid)
1605       {
1606         LOG (GNUNET_ERROR_TYPE_DEBUG, " too big MID gap! Wait for ACK.\n");
1607         return;
1608       }
1609       else
1610       {
1611         LOG (GNUNET_ERROR_TYPE_DEBUG, " gap ok: %u - %u\n",
1612              rel->head_sent->mid, rel->mid_send);
1613         struct CadetReliableMessage *aux;
1614         for (aux = rel->head_sent; NULL != aux; aux = aux->next)
1615         {
1616           LOG (GNUNET_ERROR_TYPE_DEBUG, "   - sent mid %u\n", aux->mid);
1617         }
1618       }
1619     }
1620     else
1621     {
1622       LOG (GNUNET_ERROR_TYPE_DEBUG, " head sent is NULL\n");
1623     }
1624   }
1625
1626   if (is_loopback (ch))
1627     buffer = GCCH_get_buffer (ch, fwd);
1628   else
1629     buffer = GCT_get_connections_buffer (ch->t);
1630
1631   if (0 == buffer)
1632   {
1633     LOG (GNUNET_ERROR_TYPE_DEBUG, " no buffer space.\n");
1634     return;
1635   }
1636
1637   LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer space %u, allowing\n", buffer);
1638   send_client_ack (ch, fwd);
1639 }
1640
1641
1642 /**
1643  * Log channel info.
1644  *
1645  * @param ch Channel.
1646  */
1647 void
1648 GCCH_debug (struct CadetChannel *ch)
1649 {
1650   if (NULL == ch)
1651   {
1652     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CHANNEL ***\n");
1653     return;
1654   }
1655   LOG (GNUNET_ERROR_TYPE_DEBUG, "Channel %s:%X (%p)\n",
1656               GCT_2s (ch->t), ch->gid, ch);
1657   LOG (GNUNET_ERROR_TYPE_DEBUG, "  root %p/%p\n",
1658               ch->root, ch->root_rel);
1659   if (NULL != ch->root)
1660   {
1661     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cli %s\n", GML_2s (ch->root));
1662     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ready %s\n",
1663                 ch->root_rel->client_ready ? "YES" : "NO");
1664     LOG (GNUNET_ERROR_TYPE_DEBUG, "  id %X\n", ch->lid_root);
1665     LOG (GNUNET_ERROR_TYPE_DEBUG, "  recv %d\n", ch->root_rel->n_recv);
1666     LOG (GNUNET_ERROR_TYPE_DEBUG, "  MID r: %d, s: %d\n",
1667          ch->root_rel->mid_recv, ch->root_rel->mid_send);
1668   }
1669   LOG (GNUNET_ERROR_TYPE_DEBUG, "  dest %p/%p\n",
1670               ch->dest, ch->dest_rel);
1671   if (NULL != ch->dest)
1672   {
1673     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cli %s\n", GML_2s (ch->dest));
1674     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ready %s\n",
1675                 ch->dest_rel->client_ready ? "YES" : "NO");
1676     LOG (GNUNET_ERROR_TYPE_DEBUG, "  id %X\n", ch->lid_dest);
1677     LOG (GNUNET_ERROR_TYPE_DEBUG, "  recv %d\n", ch->dest_rel->n_recv);
1678     LOG (GNUNET_ERROR_TYPE_DEBUG, "  MID r: %d, s: %d\n",
1679          ch->dest_rel->mid_recv, ch->dest_rel->mid_send);
1680
1681   }
1682 }
1683
1684
1685 /**
1686  * Handle an ACK given by a client.
1687  *
1688  * Mark client as ready and send him any buffered data we could have for him.
1689  *
1690  * @param ch Channel.
1691  * @param fwd Is this a "FWD ACK"? (FWD ACKs are sent by dest and go BCK)
1692  */
1693 void
1694 GCCH_handle_local_ack (struct CadetChannel *ch, int fwd)
1695 {
1696   struct CadetChannelReliability *rel;
1697   struct CadetClient *c;
1698
1699   rel = fwd ? ch->dest_rel : ch->root_rel;
1700   c   = fwd ? ch->dest     : ch->root;
1701
1702   rel->client_ready = GNUNET_YES;
1703   send_client_buffered_data (ch, c, fwd);
1704
1705   if (GNUNET_YES == ch->destroy && 0 == rel->n_recv)
1706   {
1707     send_destroy (ch, GNUNET_YES);
1708     GCCH_destroy (ch);
1709     return;
1710   }
1711   /* if loopback is marked for destruction, no need to ACK to the other peer,
1712    * it requested the destruction and is already gone, therefore, else if.
1713    */
1714   else if (is_loopback (ch))
1715   {
1716     unsigned int buffer;
1717
1718     buffer = GCCH_get_buffer (ch, fwd);
1719     if (0 < buffer)
1720       GCCH_allow_client (ch, fwd);
1721
1722     return;
1723   }
1724   GCT_send_connection_acks (ch->t);
1725 }
1726
1727
1728 /**
1729  * Handle data given by a client.
1730  *
1731  * Check whether the client is allowed to send in this tunnel, save if channel
1732  * is reliable and send an ACK to the client if there is still buffer space
1733  * in the tunnel.
1734  *
1735  * @param ch Channel.
1736  * @param c Client which sent the data.
1737  * @param fwd Is this a FWD data?
1738  * @param message Data message.
1739  * @param size Size of data.
1740  *
1741  * @return GNUNET_OK if everything goes well, GNUNET_SYSERR in case of en error.
1742  */
1743 int
1744 GCCH_handle_local_data (struct CadetChannel *ch,
1745                         struct CadetClient *c, int fwd,
1746                         const struct GNUNET_MessageHeader *message,
1747                         size_t size)
1748 {
1749   struct CadetChannelReliability *rel;
1750   struct GNUNET_CADET_Data *payload;
1751   uint16_t p2p_size = sizeof(struct GNUNET_CADET_Data) + size;
1752   unsigned char cbuf[p2p_size];
1753   unsigned char buffer;
1754
1755   /* Is the client in the channel? */
1756   if ( !( (fwd &&
1757            ch->root == c)
1758          ||
1759           (!fwd &&
1760            ch->dest == c) ) )
1761   {
1762     GNUNET_break_op (0);
1763     return GNUNET_SYSERR;
1764   }
1765
1766   rel = fwd ? ch->root_rel : ch->dest_rel;
1767
1768   if (GNUNET_NO == rel->client_allowed)
1769   {
1770     GNUNET_break_op (0);
1771     return GNUNET_SYSERR;
1772   }
1773
1774   rel->client_allowed = GNUNET_NO;
1775
1776   /* Ok, everything is correct, send the message. */
1777   payload = (struct GNUNET_CADET_Data *) cbuf;
1778   payload->mid = htonl (rel->mid_send);
1779   rel->mid_send++;
1780   memcpy (&payload[1], message, size);
1781   payload->header.size = htons (p2p_size);
1782   payload->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_DATA);
1783   payload->chid = htonl (ch->gid);
1784   LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channel...\n");
1785   GCCH_send_prebuilt_message (&payload->header, ch, fwd, NULL);
1786
1787   if (is_loopback (ch))
1788     buffer = GCCH_get_buffer (ch, fwd);
1789   else
1790     buffer = GCT_get_connections_buffer (ch->t);
1791
1792   if (0 < buffer)
1793     GCCH_allow_client (ch, fwd);
1794
1795   return GNUNET_OK;
1796 }
1797
1798
1799 /**
1800  * Handle a channel destroy requested by a client.
1801  *
1802  * TODO: add "reason" field
1803  *
1804  * Destroy the channel and the tunnel in case this was the last channel.
1805  *
1806  * @param ch Channel.
1807  * @param c Client that requested the destruction (to avoid notifying him).
1808  * @param is_root Is the request coming from root?
1809  */
1810 void
1811 GCCH_handle_local_destroy (struct CadetChannel *ch,
1812                            struct CadetClient *c,
1813                            int is_root)
1814 {
1815   ch->destroy = GNUNET_YES;
1816   /* Cleanup after the tunnel */
1817   if (GNUNET_NO == is_root && c == ch->dest)
1818   {
1819     LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is destination.\n", GML_2s (c));
1820     GML_client_delete_channel (c, ch, ch->lid_dest);
1821     ch->dest = NULL;
1822   }
1823   if (GNUNET_YES == is_root && c == ch->root)
1824   {
1825     LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is owner.\n", GML_2s (c));
1826     GML_client_delete_channel (c, ch, ch->lid_root);
1827     ch->root = NULL;
1828   }
1829
1830   send_destroy (ch, GNUNET_NO);
1831   if (0 == ch->pending_messages)
1832     GCCH_destroy (ch);
1833 }
1834
1835
1836 /**
1837  * Handle a channel create requested by a client.
1838  *
1839  * Create the channel and the tunnel in case this was the first0 channel.
1840  *
1841  * @param c Client that requested the creation (will be the root).
1842  * @param msg Create Channel message.
1843  *
1844  * @return GNUNET_OK if everything went fine, GNUNET_SYSERR otherwise.
1845  */
1846 int
1847 GCCH_handle_local_create (struct CadetClient *c,
1848                           struct GNUNET_CADET_ChannelMessage *msg)
1849 {
1850   struct CadetChannel *ch;
1851   struct CadetTunnel *t;
1852   struct CadetPeer *peer;
1853   CADET_ChannelNumber chid;
1854
1855   LOG (GNUNET_ERROR_TYPE_DEBUG, "  towards %s:%u\n",
1856               GNUNET_i2s (&msg->peer), ntohl (msg->port));
1857   chid = ntohl (msg->channel_id);
1858
1859   /* Sanity check for duplicate channel IDs */
1860   if (NULL != GML_channel_get (c, chid))
1861   {
1862     GNUNET_break (0);
1863     return GNUNET_SYSERR;
1864   }
1865
1866   peer = GCP_get (&msg->peer);
1867   GCP_add_tunnel (peer);
1868   t = GCP_get_tunnel (peer);
1869
1870   if (GCP_get_short_id (peer) == myid)
1871   {
1872     GCT_change_cstate (t, CADET_TUNNEL_READY);
1873   }
1874   else
1875   {
1876     /* FIXME change to a tunnel API, eliminate ch <-> peer connection */
1877     GCP_connect (peer);
1878   }
1879
1880   /* Create channel */
1881   ch = channel_new (t, c, chid);
1882   if (NULL == ch)
1883   {
1884     GNUNET_break (0);
1885     return GNUNET_SYSERR;
1886   }
1887   ch->port = ntohl (msg->port);
1888   channel_set_options (ch, ntohl (msg->opt));
1889
1890   /* In unreliable channels, we'll use the DLL to buffer BCK data */
1891   ch->root_rel = GNUNET_new (struct CadetChannelReliability);
1892   ch->root_rel->ch = ch;
1893   ch->root_rel->retry_timer = CADET_RETRANSMIT_TIME;
1894   ch->root_rel->expected_delay.rel_value_us = 0;
1895
1896   LOG (GNUNET_ERROR_TYPE_DEBUG, "CREATED CHANNEL %s\n", GCCH_2s (ch));
1897
1898   send_create (ch);
1899
1900   return GNUNET_OK;
1901 }
1902
1903
1904 /**
1905  * Handler for cadet network payload traffic.
1906  *
1907  * @param ch Channel for the message.
1908  * @param msg Unencryted data message.
1909  * @param fwd Is this message fwd? This only is meaningful in loopback channels.
1910  *            #GNUNET_YES if message is FWD on the respective channel (loopback)
1911  *            #GNUNET_NO if message is BCK on the respective channel (loopback)
1912  *            #GNUNET_SYSERR if message on a one-ended channel (remote)
1913  */
1914 void
1915 GCCH_handle_data (struct CadetChannel *ch,
1916                   const struct GNUNET_CADET_Data *msg,
1917                   int fwd)
1918 {
1919   struct CadetChannelReliability *rel;
1920   struct CadetClient *c;
1921   uint32_t mid;
1922
1923   /* If this is a remote (non-loopback) channel, find 'fwd'. */
1924   if (GNUNET_SYSERR == fwd)
1925   {
1926     if (is_loopback (ch))
1927     {
1928       /* It is a loopback channel after all... */
1929       GNUNET_break (0);
1930       return;
1931     }
1932     fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
1933   }
1934
1935   /*  Initialize FWD/BCK data */
1936   c   = fwd ? ch->dest     : ch->root;
1937   rel = fwd ? ch->dest_rel : ch->root_rel;
1938
1939   if (NULL == c)
1940   {
1941     GNUNET_break (GNUNET_NO != ch->destroy);
1942     return;
1943   }
1944
1945   if (CADET_CHANNEL_READY != ch->state)
1946   {
1947     if (GNUNET_NO == fwd)
1948     {
1949       /* If we are the root, this means the other peer has sent traffic before
1950        * receiving our ACK. Even if the SYNACK goes missing, no traffic should
1951        * be sent before the ACK.
1952        */
1953       GNUNET_break_op (0);
1954       return;
1955     }
1956     /* If we are the dest, this means that the SYNACK got to the root but
1957      * the ACK went missing. Treat this as an ACK.
1958      */
1959     channel_confirm (ch, GNUNET_NO);
1960   }
1961
1962   GNUNET_STATISTICS_update (stats, "# data received", 1, GNUNET_NO);
1963
1964   mid = ntohl (msg->mid);
1965   LOG (GNUNET_ERROR_TYPE_INFO, "<=== DATA %u %s on channel %s\n",
1966        mid, GC_f2s (fwd), GCCH_2s (ch));
1967
1968   if (GNUNET_NO == ch->reliable ||
1969       ( !GC_is_pid_bigger (rel->mid_recv, mid) &&
1970         GC_is_pid_bigger (rel->mid_recv + 64, mid) ) )
1971   {
1972     LOG (GNUNET_ERROR_TYPE_DEBUG, "RECV MID %u (%u)\n",
1973          mid, ntohs (msg->header.size));
1974     if (GNUNET_YES == ch->reliable)
1975     {
1976       /* Is this the exact next expected messasge? */
1977       if (mid == rel->mid_recv)
1978       {
1979         LOG (GNUNET_ERROR_TYPE_DEBUG, "as expected, sending to client\n");
1980         send_client_data (ch, msg, fwd);
1981       }
1982       else
1983       {
1984         LOG (GNUNET_ERROR_TYPE_DEBUG, "save for later\n");
1985         add_buffered_data (msg, rel);
1986       }
1987     }
1988     else
1989     {
1990       /* Tunnel is unreliable: send to clients directly */
1991       /* FIXME: accept Out Of Order traffic */
1992       rel->mid_recv = mid + 1;
1993       send_client_data (ch, msg, fwd);
1994     }
1995   }
1996   else
1997   {
1998     if (GC_is_pid_bigger (rel->mid_recv, mid))
1999     {
2000       GNUNET_break_op (0);
2001       LOG (GNUNET_ERROR_TYPE_WARNING,
2002           "MID %u on channel %s not expected (window: %u - %u). Dropping!\n",
2003           mid, GCCH_2s (ch), rel->mid_recv, rel->mid_recv + 63);
2004     }
2005     else
2006     {
2007       LOG (GNUNET_ERROR_TYPE_WARNING,
2008            "Duplicate MID %u, channel %s (expecting MID %u). Re-sending ACK!\n",
2009            mid, GCCH_2s (ch), rel->mid_recv);
2010       if (NULL != rel->uniq)
2011       {
2012         LOG (GNUNET_ERROR_TYPE_WARNING,
2013             "We are trying to send an ACK, but don't seem have the "
2014             "bandwidth. Try to increase your ats QUOTA in you config file\n");
2015       }
2016
2017     }
2018   }
2019
2020   GCCH_send_data_ack (ch, fwd);
2021 }
2022
2023
2024 /**
2025  * Handler for cadet network traffic end-to-end ACKs.
2026  *
2027  * @param ch Channel on which we got this message.
2028  * @param msg Data message.
2029  * @param fwd Is this message fwd? This only is meaningful in loopback channels.
2030  *            #GNUNET_YES if message is FWD on the respective channel (loopback)
2031  *            #GNUNET_NO if message is BCK on the respective channel (loopback)
2032  *            #GNUNET_SYSERR if message on a one-ended channel (remote)
2033  */
2034 void
2035 GCCH_handle_data_ack (struct CadetChannel *ch,
2036                       const struct GNUNET_CADET_DataACK *msg,
2037                       int fwd)
2038 {
2039   struct CadetChannelReliability *rel;
2040   struct CadetReliableMessage *copy;
2041   struct CadetReliableMessage *next;
2042   uint32_t ack;
2043   int work;
2044
2045   /* If this is a remote (non-loopback) channel, find 'fwd'. */
2046   if (GNUNET_SYSERR == fwd)
2047   {
2048     if (is_loopback (ch))
2049     {
2050       /* It is a loopback channel after all... */
2051       GNUNET_break (0);
2052       return;
2053     }
2054     /* Inverted: if message came 'FWD' is a 'BCK ACK'. */
2055     fwd = (NULL != ch->dest) ? GNUNET_NO : GNUNET_YES;
2056   }
2057
2058   ack = ntohl (msg->mid);
2059   LOG (GNUNET_ERROR_TYPE_INFO, "<=== %s ACK %u + %llX\n",
2060        GC_f2s (fwd), ack, msg->futures);
2061
2062   if (GNUNET_YES == fwd)
2063     rel = ch->root_rel;
2064   else
2065     rel = ch->dest_rel;
2066
2067   if (NULL == rel)
2068   {
2069     GNUNET_break (GNUNET_NO != ch->destroy);
2070     return;
2071   }
2072
2073   /* Free ACK'd copies: no need to retransmit those anymore FIXME refactor */
2074   for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next)
2075   {
2076     if (GC_is_pid_bigger (copy->mid, ack))
2077     {
2078       LOG (GNUNET_ERROR_TYPE_DEBUG, "  head %u, out!\n", copy->mid);
2079       if (0 < channel_rel_free_sent (rel, msg))
2080         work = GNUNET_YES;
2081       break;
2082     }
2083     work = GNUNET_YES;
2084     LOG (GNUNET_ERROR_TYPE_DEBUG, "  id %u\n", copy->mid);
2085     next = copy->next;
2086     if (GNUNET_YES == rel_message_free (copy, GNUNET_YES))
2087     {
2088       LOG (GNUNET_ERROR_TYPE_DEBUG, " channel destoyed\n");
2089       return;
2090     }
2091   }
2092
2093   /* ACK client if needed and possible */
2094   GCCH_allow_client (ch, fwd);
2095
2096   /* If some message was free'd, update the retransmission delay */
2097   if (GNUNET_YES == work)
2098   {
2099     if (NULL != rel->retry_task)
2100     {
2101       GNUNET_SCHEDULER_cancel (rel->retry_task);
2102       rel->retry_task = NULL;
2103       if (NULL != rel->head_sent && NULL == rel->head_sent->chq)
2104       {
2105         struct GNUNET_TIME_Absolute new_target;
2106         struct GNUNET_TIME_Relative delay;
2107
2108         delay = GNUNET_TIME_relative_multiply (rel->retry_timer,
2109                                                CADET_RETRANSMIT_MARGIN);
2110         new_target = GNUNET_TIME_absolute_add (rel->head_sent->timestamp,
2111                                                delay);
2112         delay = GNUNET_TIME_absolute_get_remaining (new_target);
2113         rel->retry_task =
2114             GNUNET_SCHEDULER_add_delayed (delay,
2115                                           &channel_retransmit_message,
2116                                           rel);
2117       }
2118     }
2119     else
2120     {
2121       /* Work was done but no task was pending? Shouldn't happen! */
2122       GNUNET_break (0);
2123     }
2124   }
2125 }
2126
2127
2128 /**
2129  * Handler for channel create messages.
2130  *
2131  * Does not have fwd parameter because it's always 'FWD': channel is incoming.
2132  *
2133  * @param t Tunnel this channel will be in.
2134  * @param msg Channel crate message.
2135  */
2136 struct CadetChannel *
2137 GCCH_handle_create (struct CadetTunnel *t,
2138                     const struct GNUNET_CADET_ChannelCreate *msg)
2139 {
2140   CADET_ChannelNumber chid;
2141   struct CadetChannel *ch;
2142   struct CadetClient *c;
2143   int new_channel;
2144
2145   chid = ntohl (msg->chid);
2146   ch = GCT_get_channel (t, chid);
2147   if (NULL == ch)
2148   {
2149     /* Create channel */
2150     ch = channel_new (t, NULL, 0);
2151     ch->gid = chid;
2152     channel_set_options (ch, ntohl (msg->opt));
2153     new_channel = GNUNET_YES;
2154   }
2155   else
2156   {
2157     new_channel = GNUNET_NO;
2158   }
2159
2160   if (GNUNET_YES == new_channel || GCT_is_loopback (t))
2161   {
2162     /* Find a destination client */
2163     ch->port = ntohl (msg->port);
2164     LOG (GNUNET_ERROR_TYPE_DEBUG, "   port %u\n", ch->port);
2165     c = GML_client_get_by_port (ch->port);
2166     if (NULL == c)
2167     {
2168       LOG (GNUNET_ERROR_TYPE_DEBUG, "  no client has port registered\n");
2169       if (is_loopback (ch))
2170       {
2171         LOG (GNUNET_ERROR_TYPE_DEBUG, "  loopback: destroy on handler\n");
2172         send_nack (ch);
2173       }
2174       else
2175       {
2176         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not loopback: destroy now\n");
2177         send_nack (ch);
2178         GCCH_destroy (ch);
2179       }
2180       return NULL;
2181     }
2182     else
2183     {
2184       LOG (GNUNET_ERROR_TYPE_DEBUG, "  client %p has port registered\n", c);
2185     }
2186
2187     add_destination (ch, c);
2188     if (GNUNET_YES == ch->reliable)
2189       LOG (GNUNET_ERROR_TYPE_DEBUG, "Reliable\n");
2190     else
2191       LOG (GNUNET_ERROR_TYPE_DEBUG, "Not Reliable\n");
2192
2193     send_client_create (ch);
2194     ch->state =  CADET_CHANNEL_SENT;
2195   }
2196   else
2197   {
2198     LOG (GNUNET_ERROR_TYPE_DEBUG, "  duplicate create channel\n");
2199     if (NULL != ch->dest_rel->retry_task)
2200     {
2201       LOG (GNUNET_ERROR_TYPE_DEBUG, "  clearing retry task\n");
2202       /* we were waiting to re-send our 'SYNACK', wait no more! */
2203       GNUNET_SCHEDULER_cancel (ch->dest_rel->retry_task);
2204       ch->dest_rel->retry_task = NULL;
2205     }
2206     else if (NULL != ch->dest_rel->uniq)
2207     {
2208       /* we are waiting to for our 'SYNACK' to leave the queue, all done! */
2209       return ch;
2210     }
2211   }
2212   send_ack (ch, GNUNET_YES);
2213
2214   return ch;
2215 }
2216
2217
2218 /**
2219  * Handler for channel NACK messages.
2220  *
2221  * NACK messages always go dest -> root, no need for 'fwd' or 'msg' parameter.
2222  *
2223  * @param ch Channel.
2224  */
2225 void
2226 GCCH_handle_nack (struct CadetChannel *ch)
2227 {
2228   send_client_nack (ch);
2229   GCCH_destroy (ch);
2230 }
2231
2232
2233 /**
2234  * Handler for channel ack messages.
2235  *
2236  * @param ch Channel.
2237  * @param msg Message.
2238  * @param fwd Is this message fwd? This only is meaningful in loopback channels.
2239  *            #GNUNET_YES if message is FWD on the respective channel (loopback)
2240  *            #GNUNET_NO if message is BCK on the respective channel (loopback)
2241  *            #GNUNET_SYSERR if message on a one-ended channel (remote)
2242  */
2243 void
2244 GCCH_handle_ack (struct CadetChannel *ch,
2245                  const struct GNUNET_CADET_ChannelManage *msg,
2246                  int fwd)
2247 {
2248   /* If this is a remote (non-loopback) channel, find 'fwd'. */
2249   if (GNUNET_SYSERR == fwd)
2250   {
2251     if (is_loopback (ch))
2252     {
2253       /* It is a loopback channel after all... */
2254       GNUNET_break (0);
2255       return;
2256     }
2257     fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
2258   }
2259
2260   channel_confirm (ch, !fwd);
2261 }
2262
2263
2264 /**
2265  * Handler for channel destroy messages.
2266  *
2267  * @param ch Channel to be destroyed of.
2268  * @param msg Message.
2269  * @param fwd Is this message fwd? This only is meaningful in loopback channels.
2270  *            #GNUNET_YES if message is FWD on the respective channel (loopback)
2271  *            #GNUNET_NO if message is BCK on the respective channel (loopback)
2272  *            #GNUNET_SYSERR if message on a one-ended channel (remote)
2273  */
2274 void
2275 GCCH_handle_destroy (struct CadetChannel *ch,
2276                      const struct GNUNET_CADET_ChannelManage *msg,
2277                      int fwd)
2278 {
2279   struct CadetChannelReliability *rel;
2280
2281   /* If this is a remote (non-loopback) channel, find 'fwd'. */
2282   if (GNUNET_SYSERR == fwd)
2283   {
2284     if (is_loopback (ch))
2285     {
2286       /* It is a loopback channel after all... */
2287       GNUNET_break (0);
2288       return;
2289     }
2290     fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
2291   }
2292
2293   GCCH_debug (ch);
2294   if ( (fwd && NULL == ch->dest) || (!fwd && NULL == ch->root) )
2295   {
2296     /* Not for us (don't destroy twice a half-open loopback channel) */
2297     return;
2298   }
2299
2300   rel = fwd ? ch->dest_rel : ch->root_rel;
2301   if (0 == rel->n_recv)
2302   {
2303     send_destroy (ch, GNUNET_YES);
2304     GCCH_destroy (ch);
2305   }
2306   else
2307   {
2308     ch->destroy = GNUNET_YES;
2309   }
2310 }
2311
2312
2313 /**
2314  * Sends an already built message on a channel.
2315  *
2316  * If the channel is on a loopback tunnel, notifies the appropriate destination
2317  * client locally.
2318  *
2319  * On a normal channel passes the message to the tunnel for encryption and
2320  * sending on a connection.
2321  *
2322  * This function DOES NOT save the message for retransmission.
2323  *
2324  * @param message Message to send. Function makes a copy of it.
2325  * @param ch Channel on which this message is transmitted.
2326  * @param fwd Is this a fwd message?
2327  * @param existing_copy This is a retransmission, don't save a copy.
2328  */
2329 void
2330 GCCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2331                             struct CadetChannel *ch, int fwd,
2332                             void *existing_copy)
2333 {
2334   struct CadetChannelQueue *chq;
2335   uint16_t type;
2336
2337   type = ntohs (message->type);
2338   LOG (GNUNET_ERROR_TYPE_INFO, "===> %s %s on channel %s\n",
2339        GC_m2s (type), GC_f2s (fwd), GCCH_2s (ch));
2340
2341   if (GCT_is_loopback (ch->t))
2342   {
2343     handle_loopback (ch, message, fwd);
2344     return;
2345   }
2346
2347   switch (type)
2348   {
2349     struct GNUNET_CADET_Data *payload;
2350     case GNUNET_MESSAGE_TYPE_CADET_DATA:
2351
2352       payload = (struct GNUNET_CADET_Data *) message;
2353       LOG (GNUNET_ERROR_TYPE_INFO, "===> %s %u\n",
2354            GC_m2s (type), ntohl (payload->mid));
2355       if (GNUNET_YES == ch->reliable)
2356       {
2357         chq = GNUNET_new (struct CadetChannelQueue);
2358         chq->type = type;
2359         if (NULL == existing_copy)
2360           chq->copy = channel_save_copy (ch, message, fwd);
2361         else
2362         {
2363           chq->copy = (struct CadetReliableMessage *) existing_copy;
2364           if (NULL != chq->copy->chq)
2365           {
2366             /* Last retransmission was queued but not yet sent!
2367              * This retransmission was scheduled by a ch_message_sent which
2368              * followed a very fast RTT, so the tiny delay made the
2369              * retransmission function to execute before the previous
2370              * retransmitted message even had a chance to leave the peer.
2371              * Cancel this message and wait until the pending
2372              * retransmission leaves the peer and ch_message_sent starts
2373              * the timer for the next one.
2374              */
2375             GNUNET_free (chq);
2376             LOG (GNUNET_ERROR_TYPE_DEBUG,
2377                  "  exisitng copy not yet transmitted!\n");
2378             return;
2379           }
2380           LOG (GNUNET_ERROR_TYPE_DEBUG,
2381                "  using existing copy: %p {r:%p q:%p t:%u}\n",
2382                existing_copy,
2383                chq->copy->rel, chq->copy->chq, chq->copy->type);
2384         }
2385         LOG (GNUNET_ERROR_TYPE_DEBUG, "  new chq: %p\n", chq);
2386         chq->copy->chq = chq;
2387         chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL,
2388                                              NULL != existing_copy,
2389                                              &ch_message_sent, chq);
2390         /* q itself is stored in copy */
2391         GNUNET_assert (NULL != chq->tq || GNUNET_NO != ch->destroy);
2392       }
2393       else
2394       {
2395         fire_and_forget (message, ch, GNUNET_NO);
2396       }
2397       break;
2398
2399
2400     case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
2401     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
2402     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
2403       chq = GNUNET_new (struct CadetChannelQueue);
2404       chq->type = type;
2405       chq->rel = fwd ? ch->root_rel : ch->dest_rel;
2406       if (NULL != chq->rel->uniq)
2407       {
2408         if (NULL != chq->rel->uniq->tq)
2409         {
2410           GCT_cancel (chq->rel->uniq->tq);
2411           /* ch_message_sent is called, freeing and NULLing uniq */
2412           GNUNET_break (NULL == chq->rel->uniq);
2413         }
2414         else
2415         {
2416           GNUNET_break (0);
2417           GNUNET_free (chq->rel->uniq);
2418         }
2419       }
2420
2421       chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL, GNUNET_YES,
2422                                            &ch_message_sent, chq);
2423       if (NULL == chq->tq)
2424       {
2425         GNUNET_break (0);
2426         GCT_debug (ch->t, GNUNET_ERROR_TYPE_ERROR);
2427         GNUNET_free (chq);
2428         chq = NULL;
2429         return;
2430       }
2431       chq->rel->uniq = chq;
2432       break;
2433
2434
2435     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
2436     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
2437       fire_and_forget (message, ch, GNUNET_YES);
2438       break;
2439
2440
2441     default:
2442       GNUNET_break (0);
2443       LOG (GNUNET_ERROR_TYPE_DEBUG, "type %s unknown!\n", GC_m2s (type));
2444       fire_and_forget (message, ch, GNUNET_YES);
2445   }
2446 }
2447
2448
2449 /**
2450  * Get the static string for identification of the channel.
2451  *
2452  * @param ch Channel.
2453  *
2454  * @return Static string with the channel IDs.
2455  */
2456 const char *
2457 GCCH_2s (const struct CadetChannel *ch)
2458 {
2459   static char buf[64];
2460
2461   if (NULL == ch)
2462     return "(NULL Channel)";
2463
2464   SPRINTF (buf, "%s:%u gid:%X (%X / %X)",
2465            GCT_2s (ch->t), ch->port, ch->gid, ch->lid_root, ch->lid_dest);
2466
2467   return buf;
2468 }