-more datacache integration work
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_channel.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21
22 #include "platform.h"
23 #include "gnunet_util_lib.h"
24
25 #include "gnunet_statistics_service.h"
26
27 #include "cadet.h"
28 #include "cadet_protocol.h"
29
30 #include "gnunet-service-cadet_channel.h"
31 #include "gnunet-service-cadet_local.h"
32 #include "gnunet-service-cadet_tunnel.h"
33 #include "gnunet-service-cadet_peer.h"
34
35 #define LOG(level, ...) GNUNET_log_from(level,"cadet-chn",__VA_ARGS__)
36
37 #define CADET_RETRANSMIT_TIME    GNUNET_TIME_relative_multiply(\
38                                     GNUNET_TIME_UNIT_MILLISECONDS, 250)
39 #define CADET_RETRANSMIT_MARGIN  4
40
41
42 /**
43  * All the states a connection can be in.
44  */
45 enum CadetChannelState
46 {
47   /**
48    * Uninitialized status, should never appear in operation.
49    */
50   CADET_CHANNEL_NEW,
51
52   /**
53    * Connection create message sent, waiting for ACK.
54    */
55   CADET_CHANNEL_SENT,
56
57   /**
58    * Connection confirmed, ready to carry traffic.
59    */
60   CADET_CHANNEL_READY,
61 };
62
63
64 /**
65  * Info holder for channel messages in queues.
66  */
67 struct CadetChannelQueue
68 {
69   /**
70    * Tunnel Queue.
71    */
72   struct CadetTunnelQueue *tq;
73
74   /**
75    * Message type (DATA/DATA_ACK)
76    */
77   uint16_t type;
78
79   /**
80    * Message copy (for DATAs, to start retransmission timer)
81    */
82   struct CadetReliableMessage *copy;
83
84   /**
85    * Reliability (for DATA_ACKs, to access rel->ack_q)
86    */
87   struct CadetChannelReliability *rel;
88 };
89
90
91 /**
92  * Info needed to retry a message in case it gets lost.
93  */
94 struct CadetReliableMessage
95 {
96     /**
97      * Double linked list, FIFO style
98      */
99   struct CadetReliableMessage   *next;
100   struct CadetReliableMessage   *prev;
101
102     /**
103      * Type of message (payload, channel management).
104      */
105   int16_t                       type;
106
107     /**
108      * Tunnel Reliability queue this message is in.
109      */
110   struct CadetChannelReliability *rel;
111
112     /**
113      * ID of the message (ACK needed to free)
114      */
115   uint32_t                      mid;
116
117   /**
118    * Tunnel Queue.
119    */
120   struct CadetChannelQueue      *chq;
121
122     /**
123      * When was this message issued (to calculate ACK delay)
124      */
125   struct GNUNET_TIME_Absolute   timestamp;
126
127   /* struct GNUNET_CADET_Data with payload */
128 };
129
130
131 /**
132  * Info about the traffic state for a client in a channel.
133  */
134 struct CadetChannelReliability
135 {
136     /**
137      * Channel this is about.
138      */
139   struct CadetChannel *ch;
140
141     /**
142      * DLL of messages sent and not yet ACK'd.
143      */
144   struct CadetReliableMessage        *head_sent;
145   struct CadetReliableMessage        *tail_sent;
146
147     /**
148      * DLL of messages received out of order.
149      */
150   struct CadetReliableMessage        *head_recv;
151   struct CadetReliableMessage        *tail_recv;
152
153     /**
154      * Messages received.
155      */
156   unsigned int                      n_recv;
157
158     /**
159      * Next MID to use for outgoing traffic.
160      */
161   uint32_t                          mid_send;
162
163     /**
164      * Next MID expected for incoming traffic.
165      */
166   uint32_t                          mid_recv;
167
168     /**
169      * Handle for queued unique data CREATE, DATA_ACK.
170      */
171   struct CadetChannelQueue           *uniq;
172
173     /**
174      * Can we send data to the client?
175      */
176   int                               client_ready;
177
178   /**
179    * Can the client send data to us?
180    */
181   int                               client_allowed;
182
183     /**
184      * Task to resend/poll in case no ACK is received.
185      */
186   struct GNUNET_SCHEDULER_Task *   retry_task;
187
188     /**
189      * Counter for exponential backoff.
190      */
191   struct GNUNET_TIME_Relative       retry_timer;
192
193     /**
194      * How long does it usually take to get an ACK.
195      */
196   struct GNUNET_TIME_Relative       expected_delay;
197 };
198
199
200 /**
201  * Struct containing all information regarding a channel to a remote client.
202  */
203 struct CadetChannel
204 {
205     /**
206      * Tunnel this channel is in.
207      */
208   struct CadetTunnel *t;
209
210     /**
211      * Destination port of the channel.
212      */
213   uint32_t port;
214
215     /**
216      * Global channel number ( < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
217      */
218   CADET_ChannelNumber gid;
219
220     /**
221      * Local tunnel number for root (owner) client.
222      * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI or 0 )
223      */
224   CADET_ChannelNumber lid_root;
225
226     /**
227      * Local tunnel number for local destination clients (incoming number)
228      * ( >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV or 0).
229      */
230   CADET_ChannelNumber lid_dest;
231
232     /**
233      * Channel state.
234      */
235   enum CadetChannelState state;
236
237     /**
238      * Is the tunnel bufferless (minimum latency)?
239      */
240   int nobuffer;
241
242     /**
243      * Is the tunnel reliable?
244      */
245   int reliable;
246
247     /**
248      * Last time the channel was used
249      */
250   struct GNUNET_TIME_Absolute timestamp;
251
252     /**
253      * Client owner of the tunnel, if any
254      */
255   struct CadetClient *root;
256
257     /**
258      * Client destination of the tunnel, if any.
259      */
260   struct CadetClient *dest;
261
262     /**
263      * Flag to signal the destruction of the channel.
264      * If this is set GNUNET_YES the channel will be destroyed
265      * when the queue is empty.
266      */
267   int destroy;
268
269     /**
270      * Total (reliable) messages pending ACK for this channel.
271      */
272   unsigned int pending_messages;
273
274     /**
275      * Reliability data.
276      * Only present (non-NULL) at the owner of a tunnel.
277      */
278   struct CadetChannelReliability *root_rel;
279
280     /**
281      * Reliability data.
282      * Only present (non-NULL) at the destination of a tunnel.
283      */
284   struct CadetChannelReliability *dest_rel;
285
286 };
287
288
289 /******************************************************************************/
290 /*******************************   GLOBALS  ***********************************/
291 /******************************************************************************/
292
293 /**
294  * Global handle to the statistics service.
295  */
296 extern struct GNUNET_STATISTICS_Handle *stats;
297
298 /**
299  * Local peer own ID (memory efficient handle).
300  */
301 extern GNUNET_PEER_Id myid;
302
303
304 /******************************************************************************/
305 /********************************   STATIC  ***********************************/
306 /******************************************************************************/
307
308
309 /**
310  * Destroy a reliable message after it has been acknowledged, either by
311  * direct mid ACK or bitfield. Updates the appropriate data structures and
312  * timers and frees all memory.
313  *
314  * @param copy Message that is no longer needed: remote peer got it.
315  * @param update_time Is the timing information relevant?
316  *                    If this message is ACK in a batch the timing information
317  *                    is skewed by the retransmission, count only for the
318  *                    retransmitted message.
319  *
320  * @return #GNUNET_YES if channel was destroyed as a result of the call,
321  *         #GNUNET_NO otherwise.
322  */
323 static int
324 rel_message_free (struct CadetReliableMessage *copy, int update_time);
325
326 /**
327  * send a channel create message.
328  *
329  * @param ch Channel for which to send.
330  */
331 static void
332 send_create (struct CadetChannel *ch);
333
334 /**
335  * Confirm we got a channel create, FWD ack.
336  *
337  * @param ch The channel to confirm.
338  * @param fwd Should we send a FWD ACK? (going dest->root)
339  */
340 static void
341 send_ack (struct CadetChannel *ch, int fwd);
342
343
344
345 /**
346  * Test if the channel is loopback: both root and dest are on the local peer.
347  *
348  * @param ch Channel to test.
349  *
350  * @return #GNUNET_YES if channel is loopback, #GNUNET_NO otherwise.
351  */
352 static int
353 is_loopback (const struct CadetChannel *ch)
354 {
355   if (NULL != ch->t)
356     return GCT_is_loopback (ch->t);
357
358   return (NULL != ch->root && NULL != ch->dest);
359 }
360
361
362 /**
363  * Save a copy of the data message for later retransmission.
364  *
365  * @param msg Message to copy.
366  * @param mid Message ID.
367  * @param rel Reliability data for retransmission.
368  */
369 static struct CadetReliableMessage *
370 copy_message (const struct GNUNET_CADET_Data *msg, uint32_t mid,
371               struct CadetChannelReliability *rel)
372 {
373   struct CadetReliableMessage *copy;
374   uint16_t size;
375
376   size = ntohs (msg->header.size);
377   copy = GNUNET_malloc (sizeof (*copy) + size);
378   copy->mid = mid;
379   copy->rel = rel;
380   copy->type = GNUNET_MESSAGE_TYPE_CADET_DATA;
381   memcpy (&copy[1], msg, size);
382
383   return copy;
384 }
385
386 /**
387  * We have received a message out of order, or the client is not ready.
388  * Buffer it until we receive an ACK from the client or the missing
389  * message from the channel.
390  *
391  * @param msg Message to buffer (MUST be of type CADET_DATA).
392  * @param rel Reliability data to the corresponding direction.
393  */
394 static void
395 add_buffered_data (const struct GNUNET_CADET_Data *msg,
396                    struct CadetChannelReliability *rel)
397 {
398   struct CadetReliableMessage *copy;
399   struct CadetReliableMessage *prev;
400   uint32_t mid;
401
402   mid = ntohl (msg->mid);
403
404   LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data %u\n", mid);
405
406   rel->n_recv++;
407
408   // FIXME do something better than O(n), although n < 64...
409   // FIXME start from the end (most messages are the latest ones)
410   for (prev = rel->head_recv; NULL != prev; prev = prev->next)
411   {
412     LOG (GNUNET_ERROR_TYPE_DEBUG, " prev %u\n", prev->mid);
413     if (prev->mid == mid)
414     {
415       LOG (GNUNET_ERROR_TYPE_DEBUG, " already there!\n");
416       return;
417     }
418     else if (GC_is_pid_bigger (prev->mid, mid))
419     {
420       LOG (GNUNET_ERROR_TYPE_DEBUG, " bingo!\n");
421       copy = copy_message (msg, mid, rel);
422       GNUNET_CONTAINER_DLL_insert_before (rel->head_recv, rel->tail_recv,
423                                           prev, copy);
424       return;
425     }
426   }
427   copy = copy_message (msg, mid, rel);
428   LOG (GNUNET_ERROR_TYPE_DEBUG, " insert at tail!\n");
429   GNUNET_CONTAINER_DLL_insert_tail (rel->head_recv, rel->tail_recv, copy);
430   LOG (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data END\n");
431 }
432
433
434 /**
435  * Add a destination client to a channel, initializing all data structures
436  * in the channel and the client.
437  *
438  * @param ch Channel to which add the destination.
439  * @param c Client which to add to the channel.
440  */
441 static void
442 add_destination (struct CadetChannel *ch, struct CadetClient *c)
443 {
444   if (NULL != ch->dest)
445   {
446     GNUNET_break (0);
447     return;
448   }
449
450   /* Assign local id as destination */
451   ch->lid_dest = GML_get_next_chid (c);
452
453   /* Store in client's hashmap */
454   GML_channel_add (c, ch->lid_dest, ch);
455
456   GNUNET_break (NULL == ch->dest_rel);
457   ch->dest_rel = GNUNET_new (struct CadetChannelReliability);
458   ch->dest_rel->ch = ch;
459   ch->dest_rel->expected_delay.rel_value_us = 0;
460   ch->dest_rel->retry_timer = CADET_RETRANSMIT_TIME;
461
462   ch->dest = c;
463 }
464
465
466 /**
467  * Set options in a channel, extracted from a bit flag field.
468  *
469  * @param ch Channel to set options to.
470  * @param options Bit array in host byte order.
471  */
472 static void
473 channel_set_options (struct CadetChannel *ch, uint32_t options)
474 {
475   ch->nobuffer = (options & GNUNET_CADET_OPTION_NOBUFFER) != 0 ?
476   GNUNET_YES : GNUNET_NO;
477   ch->reliable = (options & GNUNET_CADET_OPTION_RELIABLE) != 0 ?
478   GNUNET_YES : GNUNET_NO;
479 }
480
481
482 /**
483  * Get a bit flag field with the options of a channel.
484  *
485  * @param ch Channel to get options from.
486  *
487  * @return Bit array in host byte order.
488  */
489 static uint32_t
490 channel_get_options (struct CadetChannel *ch)
491 {
492   uint32_t options;
493
494   options = 0;
495   if (ch->nobuffer)
496     options |= GNUNET_CADET_OPTION_NOBUFFER;
497   if (ch->reliable)
498     options |= GNUNET_CADET_OPTION_RELIABLE;
499
500   return options;
501 }
502
503
504 /**
505  * Notify a client that the channel is no longer valid.
506  *
507  * @param ch Channel that is destroyed.
508  * @param local_only Should we avoid sending it to other peers?
509  */
510 static void
511 send_destroy (struct CadetChannel *ch, int local_only)
512 {
513   struct GNUNET_CADET_ChannelManage msg;
514
515   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY);
516   msg.header.size = htons (sizeof (msg));
517   msg.chid = htonl (ch->gid);
518
519   /* If root is not NULL, notify.
520    * If it's NULL, check lid_root. When a local destroy comes in, root
521    * is set to NULL but lid_root is left untouched. In this case, do nothing,
522    * the client is the one who requested the channel to be destroyed.
523    */
524   if (NULL != ch->root)
525     GML_send_channel_destroy (ch->root, ch->lid_root);
526   else if (0 == ch->lid_root && GNUNET_NO == local_only)
527     GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
528
529   if (NULL != ch->dest)
530     GML_send_channel_destroy (ch->dest, ch->lid_dest);
531   else if (0 == ch->lid_dest && GNUNET_NO == local_only)
532     GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_YES, NULL);
533 }
534
535
536 /**
537  * Notify the destination client that a new incoming channel was created.
538  *
539  * @param ch Channel that was created.
540  */
541 static void
542 send_client_create (struct CadetChannel *ch)
543 {
544   uint32_t opt;
545
546   if (NULL == ch->dest)
547     return;
548
549   opt = 0;
550   opt |= GNUNET_YES == ch->reliable ? GNUNET_CADET_OPTION_RELIABLE : 0;
551   opt |= GNUNET_YES == ch->nobuffer ? GNUNET_CADET_OPTION_NOBUFFER : 0;
552   GML_send_channel_create (ch->dest, ch->lid_dest, ch->port, opt,
553                            GCT_get_destination (ch->t));
554
555 }
556
557
558 /**
559  * Send data to a client.
560  *
561  * If the client is ready, send directly, otherwise buffer while listening
562  * for a local ACK.
563  *
564  * @param ch Channel
565  * @param msg Message.
566  * @param fwd Is this a fwd (root->dest) message?
567  */
568 static void
569 send_client_data (struct CadetChannel *ch,
570                   const struct GNUNET_CADET_Data *msg,
571                   int fwd)
572 {
573   if (fwd)
574   {
575     if (ch->dest_rel->client_ready)
576     {
577       GML_send_data (ch->dest, msg, ch->lid_dest);
578       ch->dest_rel->client_ready = GNUNET_NO;
579       ch->dest_rel->mid_recv++;
580     }
581     else
582       add_buffered_data (msg, ch->dest_rel);
583   }
584   else
585   {
586     if (ch->root_rel->client_ready)
587     {
588       GML_send_data (ch->root, msg, ch->lid_root);
589       ch->root_rel->client_ready = GNUNET_NO;
590       ch->root_rel->mid_recv++;
591     }
592     else
593       add_buffered_data (msg, ch->root_rel);
594   }
595 }
596
597
598 /**
599  * Send a buffered message to the client, for in order delivery or
600  * as result of client ACK.
601  *
602  * @param ch Channel on which to empty the message buffer.
603  * @param c Client to send to.
604  * @param fwd Is this to send FWD data?.
605  */
606 static void
607 send_client_buffered_data (struct CadetChannel *ch,
608                            struct CadetClient *c,
609                            int fwd)
610 {
611   struct CadetReliableMessage *copy;
612   struct CadetChannelReliability *rel;
613
614   LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n");
615   rel = fwd ? ch->dest_rel : ch->root_rel;
616   if (GNUNET_NO == rel->client_ready)
617   {
618     LOG (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n");
619     return;
620   }
621
622   copy = rel->head_recv;
623   /* We never buffer channel management messages */
624   if (NULL != copy)
625   {
626     if (copy->mid == rel->mid_recv || GNUNET_NO == ch->reliable)
627     {
628       struct GNUNET_CADET_Data *msg = (struct GNUNET_CADET_Data *) &copy[1];
629
630       LOG (GNUNET_ERROR_TYPE_DEBUG, " have %u! now expecting %u\n",
631            copy->mid, rel->mid_recv + 1);
632       send_client_data (ch, msg, fwd);
633       rel->n_recv--;
634       GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
635       LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE RECV %p\n", copy);
636       GNUNET_free (copy);
637       GCCH_send_data_ack (ch, fwd);
638     }
639     else
640     {
641       LOG (GNUNET_ERROR_TYPE_DEBUG, " reliable && don't have %u, next is %u\n",
642            rel->mid_recv, copy->mid);
643       if (GNUNET_YES == ch->destroy)
644       {
645         /* We don't have the next data piece and the remote peer has closed the
646          * channel. We won't receive it anymore, so just destroy the channel.
647          * FIXME: wait some time to allow other connections to
648          *        deliver missing messages
649          */
650         send_destroy (ch, GNUNET_YES);
651         GCCH_destroy (ch);
652       }
653     }
654   }
655   LOG (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data END\n");
656 }
657
658
659 /**
660  * Allow a client to send more data.
661  *
662  * In case the client was already allowed to send data, do nothing.
663  *
664  * @param ch Channel.
665  * @param fwd Is this a FWD ACK? (FWD ACKs are sent to root)
666  */
667 static void
668 send_client_ack (struct CadetChannel *ch, int fwd)
669 {
670   struct CadetChannelReliability *rel = fwd ? ch->root_rel : ch->dest_rel;
671   struct CadetClient *c = fwd ? ch->root : ch->dest;
672
673   if (NULL == c)
674   {
675     GNUNET_break (GNUNET_NO != ch->destroy);
676     return;
677   }
678   LOG (GNUNET_ERROR_TYPE_DEBUG,
679        "  sending %s ack to client on channel %s\n",
680        GC_f2s (fwd), GCCH_2s (ch));
681
682   if (NULL == rel)
683   {
684     GNUNET_break (0);
685     return;
686   }
687
688   if (GNUNET_YES == rel->client_allowed)
689   {
690     LOG (GNUNET_ERROR_TYPE_DEBUG, "  already allowed\n");
691     return;
692   }
693   rel->client_allowed = GNUNET_YES;
694
695   GML_send_ack (c, fwd ? ch->lid_root : ch->lid_dest);
696 }
697
698
699 /**
700  * Notify the root that the destination rejected the channel.
701  *
702  * @param ch Rejected channel.
703  */
704 static void
705 send_client_nack (struct CadetChannel *ch)
706 {
707   if (NULL == ch->root)
708   {
709     GNUNET_break (0);
710     return;
711   }
712   GML_send_channel_nack (ch->root, ch->lid_root);
713 }
714
715
716 /**
717  * We haven't received an ACK after a certain time: restransmit the message.
718  *
719  * @param cls Closure (CadetChannelReliability with the message to restransmit)
720  * @param tc TaskContext.
721  */
722 static void
723 channel_retransmit_message (void *cls,
724                             const struct GNUNET_SCHEDULER_TaskContext *tc)
725 {
726   struct CadetChannelReliability *rel = cls;
727   struct CadetReliableMessage *copy;
728   struct CadetChannel *ch;
729   struct GNUNET_CADET_Data *payload;
730   int fwd;
731
732   rel->retry_task = NULL;
733   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
734     return;
735
736   ch = rel->ch;
737   copy = rel->head_sent;
738   if (NULL == copy)
739   {
740     GNUNET_break (0); // FIXME tripped in rps testcase
741     return;
742   }
743
744   payload = (struct GNUNET_CADET_Data *) &copy[1];
745   fwd = (rel == ch->root_rel);
746
747   /* Message not found in the queue that we are going to use. */
748   LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->mid);
749
750   GCCH_send_prebuilt_message (&payload->header, ch, fwd, copy);
751   GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
752 }
753
754
755 /**
756  * We haven't received an Channel ACK after a certain time: resend the CREATE.
757  *
758  * @param cls Closure (CadetChannelReliability of the channel to recreate)
759  * @param tc TaskContext.
760  */
761 static void
762 channel_recreate (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
763 {
764   struct CadetChannelReliability *rel = cls;
765
766   rel->retry_task = NULL;
767   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
768     return;
769
770   LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! RE-CREATE\n");
771   GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO);
772
773   if (rel == rel->ch->root_rel)
774   {
775     send_create (rel->ch);
776   }
777   else if (rel == rel->ch->dest_rel)
778   {
779     send_ack (rel->ch, GNUNET_YES);
780   }
781   else
782   {
783     GNUNET_break (0);
784   }
785
786 }
787
788
789 /**
790  * Message has been sent: start retransmission timer.
791  *
792  * @param cls Closure (queue structure).
793  * @param t Tunnel.
794  * @param q Queue handler (no longer valid).
795  * @param type Type of message.
796  * @param size Size of the message.
797  */
798 static void
799 ch_message_sent (void *cls,
800                  struct CadetTunnel *t,
801                  struct CadetTunnelQueue *q,
802                  uint16_t type, size_t size)
803 {
804   struct CadetChannelQueue *chq = cls;
805   struct CadetReliableMessage *copy = chq->copy;
806   struct CadetChannelReliability *rel;
807
808   LOG (GNUNET_ERROR_TYPE_DEBUG, "channel message sent callback %s\n",
809        GC_m2s (chq->type));
810
811   switch (chq->type)
812   {
813     case GNUNET_MESSAGE_TYPE_CADET_DATA:
814       LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT DATA MID %u\n", copy->mid);
815       GNUNET_assert (chq == copy->chq);
816       copy->timestamp = GNUNET_TIME_absolute_get ();
817       rel = copy->rel;
818       if (NULL == rel->retry_task)
819       {
820         LOG (GNUNET_ERROR_TYPE_DEBUG, "!! scheduling retry in 4 * %s\n",
821              GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
822                                                      GNUNET_YES));
823         if (0 != rel->expected_delay.rel_value_us)
824         {
825           LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay != 0\n");
826           rel->retry_timer =
827           GNUNET_TIME_relative_multiply (rel->expected_delay,
828                                          CADET_RETRANSMIT_MARGIN);
829         }
830         else
831         {
832           LOG (GNUNET_ERROR_TYPE_DEBUG, "!! delay reset\n");
833           rel->retry_timer = CADET_RETRANSMIT_TIME;
834         }
835         LOG (GNUNET_ERROR_TYPE_DEBUG, "!! using delay %s\n",
836              GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
837                                                      GNUNET_NO));
838         rel->retry_task =
839             GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
840                                           &channel_retransmit_message, rel);
841       }
842       else
843       {
844         LOG (GNUNET_ERROR_TYPE_DEBUG, "!! retry task %u\n", rel->retry_task);
845       }
846       copy->chq = NULL;
847       break;
848
849
850     case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
851     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
852     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
853       LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SENT %s\n", GC_m2s (chq->type));
854       rel = chq->rel;
855       GNUNET_assert (rel->uniq == chq);
856       rel->uniq = NULL;
857
858       if (CADET_CHANNEL_READY != rel->ch->state
859           && GNUNET_MESSAGE_TYPE_CADET_DATA_ACK != type
860           && GNUNET_NO == rel->ch->destroy)
861       {
862         GNUNET_assert (NULL == rel->retry_task);
863         LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! STD BACKOFF %s\n",
864              GNUNET_STRINGS_relative_time_to_string (rel->retry_timer,
865                                                      GNUNET_NO));
866         rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer);
867         rel->retry_task = GNUNET_SCHEDULER_add_delayed (rel->retry_timer,
868                                                         &channel_recreate, rel);
869       }
870       break;
871
872     default:
873       GNUNET_break (0);
874   }
875
876   GNUNET_free (chq);
877 }
878
879
880 /**
881  * send a channel create message.
882  *
883  * @param ch Channel for which to send.
884  */
885 static void
886 send_create (struct CadetChannel *ch)
887 {
888   struct GNUNET_CADET_ChannelCreate msgcc;
889
890   msgcc.header.size = htons (sizeof (msgcc));
891   msgcc.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE);
892   msgcc.chid = htonl (ch->gid);
893   msgcc.port = htonl (ch->port);
894   msgcc.opt = htonl (channel_get_options (ch));
895
896   GCCH_send_prebuilt_message (&msgcc.header, ch, GNUNET_YES, NULL);
897 }
898
899
900 /**
901  * Confirm we got a channel create or FWD ack.
902  *
903  * @param ch The channel to confirm.
904  * @param fwd Should we send a FWD ACK? (going dest->root)
905  */
906 static void
907 send_ack (struct CadetChannel *ch, int fwd)
908 {
909   struct GNUNET_CADET_ChannelManage msg;
910
911   msg.header.size = htons (sizeof (msg));
912   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK);
913   LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending channel %s ack for channel %s\n",
914        GC_f2s (fwd), GCCH_2s (ch));
915
916   msg.chid = htonl (ch->gid);
917   GCCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL);
918 }
919
920
921 /**
922  * Send a message and don't keep any info about it: we won't need to cancel it
923  * or resend it.
924  *
925  * @param msg Header of the message to fire away.
926  * @param ch Channel on which the message should go.
927  * @param force Is this a forced (undroppable) message?
928  */
929 static void
930 fire_and_forget (const struct GNUNET_MessageHeader *msg,
931                  struct CadetChannel *ch,
932                  int force)
933 {
934   GNUNET_break (NULL == GCT_send_prebuilt_message (msg, ch->t, NULL,
935                                                    force, NULL, NULL));
936 }
937
938
939 /**
940  * Notify that a channel create didn't succeed.
941  *
942  * @param ch The channel to reject.
943  */
944 static void
945 send_nack (struct CadetChannel *ch)
946 {
947   struct GNUNET_CADET_ChannelManage msg;
948
949   msg.header.size = htons (sizeof (msg));
950   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK);
951   LOG (GNUNET_ERROR_TYPE_DEBUG,
952        "  sending channel NACK for channel %s\n",
953        GCCH_2s (ch));
954
955   msg.chid = htonl (ch->gid);
956   GCCH_send_prebuilt_message (&msg.header, ch, GNUNET_NO, NULL);
957 }
958
959
960 /**
961  * Destroy all reliable messages queued for a channel,
962  * during a channel destruction.
963  * Frees the reliability structure itself.
964  *
965  * @param rel Reliability data for a channel.
966  */
967 static void
968 channel_rel_free_all (struct CadetChannelReliability *rel)
969 {
970   struct CadetReliableMessage *copy;
971   struct CadetReliableMessage *next;
972
973   if (NULL == rel)
974     return;
975
976   for (copy = rel->head_recv; NULL != copy; copy = next)
977   {
978     next = copy->next;
979     GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
980     LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH RECV %p\n", copy);
981     GNUNET_break (NULL == copy->chq);
982     GNUNET_free (copy);
983   }
984   for (copy = rel->head_sent; NULL != copy; copy = next)
985   {
986     next = copy->next;
987     GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
988     LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE BATCH %p\n", copy);
989     if (NULL != copy->chq)
990     {
991       if (NULL != copy->chq->tq)
992       {
993         GCT_cancel (copy->chq->tq);
994         /* ch_message_sent will free copy->q */
995       }
996       else
997       {
998         GNUNET_free (copy->chq);
999         GNUNET_break (0);
1000       }
1001     }
1002     GNUNET_free (copy);
1003   }
1004   if (NULL != rel->uniq && NULL != rel->uniq->tq)
1005   {
1006     GCT_cancel (rel->uniq->tq);
1007     /* ch_message_sent is called freeing uniq */
1008   }
1009   if (NULL != rel->retry_task)
1010   {
1011     GNUNET_SCHEDULER_cancel (rel->retry_task);
1012     rel->retry_task = NULL;
1013   }
1014   GNUNET_free (rel);
1015 }
1016
1017
1018 /**
1019  * Mark future messages as ACK'd.
1020  *
1021  * @param rel Reliability data.
1022  * @param msg DataACK message with a bitfield of future ACK'd messages.
1023  */
1024 static void
1025 channel_rel_free_sent (struct CadetChannelReliability *rel,
1026                        const struct GNUNET_CADET_DataACK *msg)
1027 {
1028   struct CadetReliableMessage *copy;
1029   struct CadetReliableMessage *next;
1030   uint64_t bitfield;
1031   uint64_t mask;
1032   uint32_t mid;
1033   uint32_t target;
1034   unsigned int i;
1035
1036   bitfield = msg->futures;
1037   mid = ntohl (msg->mid);
1038   LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable %u %llX\n", mid, bitfield);
1039   LOG (GNUNET_ERROR_TYPE_DEBUG, " rel %p, head %p\n", rel, rel->head_sent);
1040   for (i = 0, copy = rel->head_sent;
1041        i < 64 && NULL != copy && 0 != bitfield;
1042        i++)
1043   {
1044     LOG (GNUNET_ERROR_TYPE_DEBUG, " trying bit %u (mid %u)\n", i, mid + i + 1);
1045     mask = 0x1LL << i;
1046     if (0 == (bitfield & mask))
1047      continue;
1048
1049     LOG (GNUNET_ERROR_TYPE_DEBUG, " set!\n");
1050     /* Bit was set, clear the bit from the bitfield */
1051     bitfield &= ~mask;
1052
1053     /* The i-th bit was set. Do we have that copy? */
1054     /* Skip copies with mid < target */
1055     target = mid + i + 1;
1056     LOG (GNUNET_ERROR_TYPE_DEBUG, " target %u\n", target);
1057     while (NULL != copy && GC_is_pid_bigger (target, copy->mid))
1058       copy = copy->next;
1059
1060     /* Did we run out of copies? (previously freed, it's ok) */
1061     if (NULL == copy)
1062     {
1063       LOG (GNUNET_ERROR_TYPE_DEBUG, "run out of copies...\n");
1064       return;
1065     }
1066
1067     /* Did we overshoot the target? (previously freed, it's ok) */
1068     if (GC_is_pid_bigger (copy->mid, target))
1069     {
1070       LOG (GNUNET_ERROR_TYPE_DEBUG, " next copy %u\n", copy->mid);
1071       i += copy->mid - target - 1;   /* MID: 90, t = 85, i += 4 (i++ later) */
1072       mask = (0x1LL << (i + 1)) - 1; /* Mask = i-th bit and all before */
1073       bitfield &= ~mask;             /* Clear all bits up to MID - 1 */
1074       continue;
1075     }
1076
1077     /* Now copy->mid == target, free it */
1078     next = copy->next;
1079     GNUNET_break (GNUNET_YES != rel_message_free (copy, GNUNET_YES));
1080     copy = next;
1081   }
1082   LOG (GNUNET_ERROR_TYPE_DEBUG, "free_sent_reliable END\n");
1083 }
1084
1085
1086 /**
1087  * Destroy a reliable message after it has been acknowledged, either by
1088  * direct mid ACK or bitfield. Updates the appropriate data structures and
1089  * timers and frees all memory.
1090  *
1091  * @param copy Message that is no longer needed: remote peer got it.
1092  * @param update_time Is the timing information relevant?
1093  *                    If this message is ACK in a batch the timing information
1094  *                    is skewed by the retransmission, count only for the
1095  *                    retransmitted message.
1096  *
1097  * @return #GNUNET_YES if channel was destroyed as a result of the call,
1098  *         #GNUNET_NO otherwise.
1099  */
1100 static int
1101 rel_message_free (struct CadetReliableMessage *copy, int update_time)
1102 {
1103   struct CadetChannelReliability *rel;
1104   struct GNUNET_TIME_Relative time;
1105
1106   rel = copy->rel;
1107   LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Freeing %u\n", copy->mid);
1108   if (update_time)
1109   {
1110     time = GNUNET_TIME_absolute_get_duration (copy->timestamp);
1111     if (0 == rel->expected_delay.rel_value_us)
1112       rel->expected_delay = time;
1113     else
1114     {
1115       rel->expected_delay.rel_value_us *= 7;
1116       rel->expected_delay.rel_value_us += time.rel_value_us;
1117       rel->expected_delay.rel_value_us /= 8;
1118     }
1119     LOG (GNUNET_ERROR_TYPE_INFO, "!!!  took %s, new delay %s\n",
1120          GNUNET_STRINGS_relative_time_to_string (time, GNUNET_NO),
1121          GNUNET_STRINGS_relative_time_to_string (rel->expected_delay,
1122                                                  GNUNET_NO));
1123     rel->retry_timer = rel->expected_delay;
1124   }
1125   else
1126   {
1127     LOG (GNUNET_ERROR_TYPE_INFO, "!!! batch free, ignoring timing\n");
1128   }
1129   rel->ch->pending_messages--;
1130   if (NULL != copy->chq)
1131   {
1132     GCT_cancel (copy->chq->tq);
1133     /* copy->q is set to NULL by ch_message_sent */
1134   }
1135   GNUNET_CONTAINER_DLL_remove (rel->head_sent, rel->tail_sent, copy);
1136   LOG (GNUNET_ERROR_TYPE_DEBUG, " COPYFREE %p\n", copy);
1137   GNUNET_free (copy);
1138
1139   if (GNUNET_NO != rel->ch->destroy && 0 == rel->ch->pending_messages)
1140   {
1141     GCCH_destroy (rel->ch);
1142     return GNUNET_YES;
1143   }
1144   return GNUNET_NO;
1145 }
1146
1147
1148 /**
1149  * Channel was ACK'd by remote peer, mark as ready and cancel retransmission.
1150  *
1151  * @param ch Channel to mark as ready.
1152  * @param fwd Was the ACK message a FWD ACK? (dest->root, SYNACK)
1153  */
1154 static void
1155 channel_confirm (struct CadetChannel *ch, int fwd)
1156 {
1157   struct CadetChannelReliability *rel;
1158   enum CadetChannelState oldstate;
1159
1160   rel = fwd ? ch->root_rel : ch->dest_rel;
1161   if (NULL == rel)
1162   {
1163     GNUNET_break (GNUNET_NO != ch->destroy);
1164     return;
1165   }
1166   LOG (GNUNET_ERROR_TYPE_DEBUG, "  channel confirm %s %s\n",
1167        GC_f2s (fwd), GCCH_2s (ch));
1168   oldstate = ch->state;
1169   ch->state = CADET_CHANNEL_READY;
1170
1171   if (CADET_CHANNEL_READY != oldstate || GNUNET_YES == is_loopback (ch))
1172   {
1173     rel->client_ready = GNUNET_YES;
1174     rel->expected_delay = rel->retry_timer;
1175     LOG (GNUNET_ERROR_TYPE_DEBUG, "  !! retry timer confirm %s\n",
1176          GNUNET_STRINGS_relative_time_to_string (rel->retry_timer, GNUNET_NO));
1177     if (GCT_get_connections_buffer (ch->t) > 0 || GCT_is_loopback (ch->t))
1178       send_client_ack (ch, fwd);
1179
1180     if (NULL != rel->retry_task)
1181     {
1182       GNUNET_SCHEDULER_cancel (rel->retry_task);
1183       rel->retry_task = NULL;
1184     }
1185     else if (NULL != rel->uniq)
1186     {
1187       GCT_cancel (rel->uniq->tq);
1188       /* ch_message_sent will free and NULL uniq */
1189     }
1190     else if (GNUNET_NO == is_loopback (ch))
1191     {
1192       /* We SHOULD have been trying to retransmit this! */
1193       GNUNET_break (0);
1194     }
1195   }
1196
1197   /* In case of a FWD ACK (SYNACK) send a BCK ACK (ACK). */
1198   if (GNUNET_YES == fwd)
1199     send_ack (ch, GNUNET_NO);
1200 }
1201
1202
1203 /**
1204  * Save a copy to retransmit in case it gets lost.
1205  *
1206  * Initializes all needed callbacks and timers.
1207  *
1208  * @param ch Channel this message goes on.
1209  * @param msg Message to copy.
1210  * @param fwd Is this fwd traffic?
1211  */
1212 static struct CadetReliableMessage *
1213 channel_save_copy (struct CadetChannel *ch,
1214                    const struct GNUNET_MessageHeader *msg,
1215                    int fwd)
1216 {
1217   struct CadetChannelReliability *rel;
1218   struct CadetReliableMessage *copy;
1219   uint32_t mid;
1220   uint16_t type;
1221   uint16_t size;
1222
1223   rel = fwd ? ch->root_rel : ch->dest_rel;
1224   mid = rel->mid_send - 1;
1225   type = ntohs (msg->type);
1226   size = ntohs (msg->size);
1227
1228   LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SAVE %u %s\n", mid, GC_m2s (type));
1229   copy = GNUNET_malloc (sizeof (struct CadetReliableMessage) + size);
1230   LOG (GNUNET_ERROR_TYPE_DEBUG, "  at %p\n", copy);
1231   copy->mid = mid;
1232   copy->rel = rel;
1233   copy->type = type;
1234   memcpy (&copy[1], msg, size);
1235   GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy);
1236   ch->pending_messages++;
1237
1238   return copy;
1239 }
1240
1241
1242 /**
1243  * Create a new channel.
1244  *
1245  * @param t Tunnel this channel is in.
1246  * @param owner Client that owns the channel, NULL for foreign channels.
1247  * @param lid_root Local ID for root client.
1248  *
1249  * @return A new initialized channel. NULL on error.
1250  */
1251 static struct CadetChannel *
1252 channel_new (struct CadetTunnel *t,
1253              struct CadetClient *owner,
1254              CADET_ChannelNumber lid_root)
1255 {
1256   struct CadetChannel *ch;
1257
1258   ch = GNUNET_new (struct CadetChannel);
1259   ch->root = owner;
1260   ch->lid_root = lid_root;
1261   ch->t = t;
1262
1263   GNUNET_STATISTICS_update (stats, "# channels", 1, GNUNET_NO);
1264
1265   if (NULL != owner)
1266   {
1267     ch->gid = GCT_get_next_chid (t);
1268     GML_channel_add (owner, lid_root, ch);
1269   }
1270   GCT_add_channel (t, ch);
1271
1272   return ch;
1273 }
1274
1275
1276 /**
1277  * Handle a loopback message: call the appropriate handler for the message type.
1278  *
1279  * @param ch Channel this message is on.
1280  * @param msgh Message header.
1281  * @param fwd Is this FWD traffic?
1282  */
1283 void
1284 handle_loopback (struct CadetChannel *ch,
1285                  const struct GNUNET_MessageHeader *msgh,
1286                  int fwd)
1287 {
1288   uint16_t type;
1289
1290   type = ntohs (msgh->type);
1291   LOG (GNUNET_ERROR_TYPE_DEBUG,
1292        "Loopback %s %s message!\n",
1293        GC_f2s (fwd), GC_m2s (type));
1294
1295   switch (type)
1296   {
1297     case GNUNET_MESSAGE_TYPE_CADET_DATA:
1298       /* Don't send hop ACK, wait for client to ACK */
1299       LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! SEND loopback %u (%u)\n",
1300            ntohl (((struct GNUNET_CADET_Data *) msgh)->mid), ntohs (msgh->size));
1301       GCCH_handle_data (ch, (struct GNUNET_CADET_Data *) msgh, fwd);
1302       break;
1303
1304     case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
1305       GCCH_handle_data_ack (ch, (struct GNUNET_CADET_DataACK *) msgh, fwd);
1306       break;
1307
1308     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
1309       GCCH_handle_create (ch->t,
1310                           (struct GNUNET_CADET_ChannelCreate *) msgh);
1311       break;
1312
1313     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
1314       GCCH_handle_ack (ch,
1315                        (struct GNUNET_CADET_ChannelManage *) msgh,
1316                        fwd);
1317       break;
1318
1319     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
1320       GCCH_handle_nack (ch);
1321       break;
1322
1323     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
1324       GCCH_handle_destroy (ch,
1325                            (struct GNUNET_CADET_ChannelManage *) msgh,
1326                            fwd);
1327       break;
1328
1329     default:
1330       GNUNET_break_op (0);
1331       LOG (GNUNET_ERROR_TYPE_DEBUG,
1332            "end-to-end message not known (%u)\n",
1333            ntohs (msgh->type));
1334   }
1335 }
1336
1337
1338
1339 /******************************************************************************/
1340 /********************************    API    ***********************************/
1341 /******************************************************************************/
1342
1343 /**
1344  * Destroy a channel and free all resources.
1345  *
1346  * @param ch Channel to destroy.
1347  */
1348 void
1349 GCCH_destroy (struct CadetChannel *ch)
1350 {
1351   struct CadetClient *c;
1352   struct CadetTunnel *t;
1353
1354   if (NULL == ch)
1355     return;
1356   if (2 == ch->destroy)
1357     return; /* recursive call */
1358   ch->destroy = 2;
1359
1360   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying channel %s:%u\n",
1361               GCT_2s (ch->t), ch->gid);
1362   GCCH_debug (ch);
1363
1364   c = ch->root;
1365   if (NULL != c)
1366   {
1367     GML_channel_remove (c, ch->lid_root, ch);
1368   }
1369
1370   c = ch->dest;
1371   if (NULL != c)
1372   {
1373     GML_channel_remove (c, ch->lid_dest, ch);
1374   }
1375
1376   channel_rel_free_all (ch->root_rel);
1377   channel_rel_free_all (ch->dest_rel);
1378
1379   t = ch->t;
1380   GCT_remove_channel (t, ch);
1381   GNUNET_STATISTICS_update (stats, "# channels", -1, GNUNET_NO);
1382
1383   GNUNET_free (ch);
1384   GCT_destroy_if_empty (t);
1385 }
1386
1387
1388 /**
1389  * Get the channel's public ID.
1390  *
1391  * @param ch Channel.
1392  *
1393  * @return ID used to identify the channel with the remote peer.
1394  */
1395 CADET_ChannelNumber
1396 GCCH_get_id (const struct CadetChannel *ch)
1397 {
1398   return ch->gid;
1399 }
1400
1401
1402 /**
1403  * Get the channel tunnel.
1404  *
1405  * @param ch Channel to get the tunnel from.
1406  *
1407  * @return tunnel of the channel.
1408  */
1409 struct CadetTunnel *
1410 GCCH_get_tunnel (const struct CadetChannel *ch)
1411 {
1412   return ch->t;
1413 }
1414
1415
1416 /**
1417  * Get free buffer space towards the client on a specific channel.
1418  *
1419  * @param ch Channel.
1420  * @param fwd Is query about FWD traffic?
1421  *
1422  * @return Free buffer space [0 - 64]
1423  */
1424 unsigned int
1425 GCCH_get_buffer (struct CadetChannel *ch, int fwd)
1426 {
1427   struct CadetChannelReliability *rel;
1428
1429   rel = fwd ? ch->dest_rel : ch->root_rel;
1430   LOG (GNUNET_ERROR_TYPE_DEBUG, "   get buffer, channel %s\n", GCCH_2s (ch));
1431   GCCH_debug (ch);
1432   /* If rel is NULL it means that the end is not yet created,
1433    * most probably is a loopback channel at the point of sending
1434    * the ChannelCreate to itself.
1435    */
1436   if (NULL == rel)
1437   {
1438     LOG (GNUNET_ERROR_TYPE_DEBUG, "  rel is NULL: max\n");
1439     return 64;
1440   }
1441
1442   return (64 - rel->n_recv);
1443 }
1444
1445
1446 /**
1447  * Get flow control status of end point: is client allow to send?
1448  *
1449  * @param ch Channel.
1450  * @param fwd Is query about FWD traffic? (Request root status).
1451  *
1452  * @return #GNUNET_YES if client is allowed to send us data.
1453  */
1454 int
1455 GCCH_get_allowed (struct CadetChannel *ch, int fwd)
1456 {
1457   struct CadetChannelReliability *rel;
1458
1459   rel = fwd ? ch->root_rel : ch->dest_rel;
1460
1461   if (NULL == rel)
1462   {
1463     /* Probably shutting down: root/dest NULL'ed to mark disconnection */
1464     GNUNET_break (GNUNET_NO != ch->destroy);
1465     return 0;
1466   }
1467
1468   return rel->client_allowed;
1469 }
1470
1471
1472 /**
1473  * Is the root client for this channel on this peer?
1474  *
1475  * @param ch Channel.
1476  * @param fwd Is this for fwd traffic?
1477  *
1478  * @return #GNUNET_YES in case it is.
1479  */
1480 int
1481 GCCH_is_origin (struct CadetChannel *ch, int fwd)
1482 {
1483   struct CadetClient *c;
1484
1485   c = fwd ? ch->root : ch->dest;
1486   return NULL != c;
1487 }
1488
1489
1490 /**
1491  * Is the destination client for this channel on this peer?
1492  *
1493  * @param ch Channel.
1494  * @param fwd Is this for fwd traffic?
1495  *
1496  * @return #GNUNET_YES in case it is.
1497  */
1498 int
1499 GCCH_is_terminal (struct CadetChannel *ch, int fwd)
1500 {
1501   struct CadetClient *c;
1502
1503   c = fwd ? ch->dest : ch->root;
1504   return NULL != c;
1505 }
1506
1507
1508 /**
1509  * Send an end-to-end ACK message for the most recent in-sequence payload.
1510  *
1511  * If channel is not reliable, do nothing.
1512  *
1513  * @param ch Channel this is about.
1514  * @param fwd Is for FWD traffic? (ACK dest->owner)
1515  */
1516 void
1517 GCCH_send_data_ack (struct CadetChannel *ch, int fwd)
1518 {
1519   struct GNUNET_CADET_DataACK msg;
1520   struct CadetChannelReliability *rel;
1521   struct CadetReliableMessage *copy;
1522   unsigned int delta;
1523   uint64_t mask;
1524   uint32_t ack;
1525
1526   if (GNUNET_NO == ch->reliable)
1527     return;
1528
1529   rel = fwd ? ch->dest_rel : ch->root_rel;
1530   ack = rel->mid_recv - 1;
1531
1532   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_DATA_ACK);
1533   msg.header.size = htons (sizeof (msg));
1534   msg.chid = htonl (ch->gid);
1535   msg.mid = htonl (ack);
1536
1537   msg.futures = 0LL;
1538   for (copy = rel->head_recv; NULL != copy; copy = copy->next)
1539   {
1540     if (copy->type != GNUNET_MESSAGE_TYPE_CADET_DATA)
1541     {
1542       LOG (GNUNET_ERROR_TYPE_DEBUG, " Type %s, expected DATA\n",
1543            GC_m2s (copy->type));
1544       continue;
1545     }
1546     GNUNET_assert (GC_is_pid_bigger(copy->mid, ack));
1547     delta = copy->mid - (ack + 1);
1548     if (63 < delta)
1549       break;
1550     mask = 0x1LL << delta;
1551     msg.futures |= mask;
1552     LOG (GNUNET_ERROR_TYPE_DEBUG,
1553          " setting bit for %u (delta %u) (%llX) -> %llX\n",
1554          copy->mid, delta, mask, msg.futures);
1555   }
1556   LOG (GNUNET_ERROR_TYPE_INFO, "===> DATA_ACK for %u + %llX\n",
1557        ack, msg.futures);
1558
1559   GCCH_send_prebuilt_message (&msg.header, ch, !fwd, NULL);
1560   LOG (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n");
1561 }
1562
1563
1564 /**
1565  * Allow a client to send us more data, in case it was choked.
1566  *
1567  * @param ch Channel.
1568  * @param fwd Is this about FWD traffic? (Root client).
1569  */
1570 void
1571 GCCH_allow_client (struct CadetChannel *ch, int fwd)
1572 {
1573   struct CadetChannelReliability *rel;
1574   unsigned int buffer;
1575
1576   LOG (GNUNET_ERROR_TYPE_DEBUG, "GMCH allow\n");
1577
1578   if (CADET_CHANNEL_READY != ch->state)
1579   {
1580     LOG (GNUNET_ERROR_TYPE_DEBUG, " channel not ready yet!\n");
1581     return;
1582   }
1583
1584   if (GNUNET_YES == ch->reliable)
1585   {
1586     rel = fwd ? ch->root_rel : ch->dest_rel;
1587     if (NULL == rel)
1588     {
1589       GNUNET_break (GNUNET_NO != ch->destroy);
1590       return;
1591     }
1592     if (NULL != rel->head_sent)
1593     {
1594       if (64 <= rel->mid_send - rel->head_sent->mid)
1595       {
1596         LOG (GNUNET_ERROR_TYPE_DEBUG, " too big MID gap! Wait for ACK.\n");
1597         return;
1598       }
1599       else
1600       {
1601         LOG (GNUNET_ERROR_TYPE_DEBUG, " gap ok: %u - %u\n",
1602              rel->head_sent->mid, rel->mid_send);
1603         struct CadetReliableMessage *aux;
1604         for (aux = rel->head_sent; NULL != aux; aux = aux->next)
1605         {
1606           LOG (GNUNET_ERROR_TYPE_DEBUG, "   - sent MID %u\n", aux->mid);
1607         }
1608       }
1609     }
1610     else
1611     {
1612       LOG (GNUNET_ERROR_TYPE_DEBUG, " head sent is NULL\n");
1613     }
1614   }
1615
1616   if (is_loopback (ch))
1617     buffer = GCCH_get_buffer (ch, fwd);
1618   else
1619     buffer = GCT_get_connections_buffer (ch->t);
1620
1621   if (0 == buffer)
1622   {
1623     LOG (GNUNET_ERROR_TYPE_DEBUG, " no buffer space.\n");
1624     return;
1625   }
1626
1627   LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer space %u, allowing\n", buffer);
1628   send_client_ack (ch, fwd);
1629 }
1630
1631
1632 /**
1633  * Log channel info.
1634  *
1635  * @param ch Channel.
1636  */
1637 void
1638 GCCH_debug (struct CadetChannel *ch)
1639 {
1640   if (NULL == ch)
1641   {
1642     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CHANNEL ***\n");
1643     return;
1644   }
1645   LOG (GNUNET_ERROR_TYPE_DEBUG, "Channel %s:%X (%p)\n",
1646               GCT_2s (ch->t), ch->gid, ch);
1647   LOG (GNUNET_ERROR_TYPE_DEBUG, "  root %p/%p\n",
1648               ch->root, ch->root_rel);
1649   if (NULL != ch->root)
1650   {
1651     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cli %s\n", GML_2s (ch->root));
1652     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ready %s\n",
1653                 ch->root_rel->client_ready ? "YES" : "NO");
1654     LOG (GNUNET_ERROR_TYPE_DEBUG, "  id %X\n", ch->lid_root);
1655   }
1656   LOG (GNUNET_ERROR_TYPE_DEBUG, "  dest %p/%p\n",
1657               ch->dest, ch->dest_rel);
1658   if (NULL != ch->dest)
1659   {
1660     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cli %s\n", GML_2s (ch->dest));
1661     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ready %s\n",
1662                 ch->dest_rel->client_ready ? "YES" : "NO");
1663     LOG (GNUNET_ERROR_TYPE_DEBUG, "  id %X\n", ch->lid_dest);
1664   }
1665 }
1666
1667
1668 /**
1669  * Handle an ACK given by a client.
1670  *
1671  * Mark client as ready and send him any buffered data we could have for him.
1672  *
1673  * @param ch Channel.
1674  * @param fwd Is this a "FWD ACK"? (FWD ACKs are sent by dest and go BCK)
1675  */
1676 void
1677 GCCH_handle_local_ack (struct CadetChannel *ch, int fwd)
1678 {
1679   struct CadetChannelReliability *rel;
1680   struct CadetClient *c;
1681
1682   rel = fwd ? ch->dest_rel : ch->root_rel;
1683   c   = fwd ? ch->dest     : ch->root;
1684
1685   rel->client_ready = GNUNET_YES;
1686   send_client_buffered_data (ch, c, fwd);
1687
1688   if (GNUNET_YES == ch->destroy && 0 == rel->n_recv)
1689   {
1690     send_destroy (ch, GNUNET_YES);
1691     GCCH_destroy (ch);
1692     return;
1693   }
1694   /* if loopback is marked for destruction, no need to ACK to the other peer,
1695    * it requested the destruction and is already gone, therefore, else if.
1696    */
1697   else if (is_loopback (ch))
1698   {
1699     unsigned int buffer;
1700
1701     buffer = GCCH_get_buffer (ch, fwd);
1702     if (0 < buffer)
1703       GCCH_allow_client (ch, fwd);
1704
1705     return;
1706   }
1707   GCT_send_connection_acks (ch->t);
1708 }
1709
1710
1711 /**
1712  * Handle data given by a client.
1713  *
1714  * Check whether the client is allowed to send in this tunnel, save if channel
1715  * is reliable and send an ACK to the client if there is still buffer space
1716  * in the tunnel.
1717  *
1718  * @param ch Channel.
1719  * @param c Client which sent the data.
1720  * @param fwd Is this a FWD data?
1721  * @param message Data message.
1722  * @param size Size of data.
1723  *
1724  * @return GNUNET_OK if everything goes well, GNUNET_SYSERR in case of en error.
1725  */
1726 int
1727 GCCH_handle_local_data (struct CadetChannel *ch,
1728                         struct CadetClient *c, int fwd,
1729                         const struct GNUNET_MessageHeader *message,
1730                         size_t size)
1731 {
1732   struct CadetChannelReliability *rel;
1733   struct GNUNET_CADET_Data *payload;
1734   uint16_t p2p_size = sizeof(struct GNUNET_CADET_Data) + size;
1735   unsigned char cbuf[p2p_size];
1736   unsigned char buffer;
1737
1738   /* Is the client in the channel? */
1739   if ( !( (fwd &&
1740            ch->root == c)
1741          ||
1742           (!fwd &&
1743            ch->dest == c) ) )
1744   {
1745     GNUNET_break_op (0);
1746     return GNUNET_SYSERR;
1747   }
1748
1749   rel = fwd ? ch->root_rel : ch->dest_rel;
1750
1751   if (GNUNET_NO == rel->client_allowed)
1752   {
1753     GNUNET_break_op (0);
1754     return GNUNET_SYSERR;
1755   }
1756
1757   rel->client_allowed = GNUNET_NO;
1758
1759   /* Ok, everything is correct, send the message. */
1760   payload = (struct GNUNET_CADET_Data *) cbuf;
1761   payload->mid = htonl (rel->mid_send);
1762   rel->mid_send++;
1763   memcpy (&payload[1], message, size);
1764   payload->header.size = htons (p2p_size);
1765   payload->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_DATA);
1766   payload->chid = htonl (ch->gid);
1767   LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channel...\n");
1768   GCCH_send_prebuilt_message (&payload->header, ch, fwd, NULL);
1769
1770   if (is_loopback (ch))
1771     buffer = GCCH_get_buffer (ch, fwd);
1772   else
1773     buffer = GCT_get_connections_buffer (ch->t);
1774
1775   if (0 < buffer)
1776     GCCH_allow_client (ch, fwd);
1777
1778   return GNUNET_OK;
1779 }
1780
1781
1782 /**
1783  * Handle a channel destroy requested by a client.
1784  *
1785  * TODO: add "reason" field
1786  *
1787  * Destroy the channel and the tunnel in case this was the last channel.
1788  *
1789  * @param ch Channel.
1790  * @param c Client that requested the destruction (to avoid notifying him).
1791  * @param is_root Is the request coming from root?
1792  */
1793 void
1794 GCCH_handle_local_destroy (struct CadetChannel *ch,
1795                            struct CadetClient *c,
1796                            int is_root)
1797 {
1798   ch->destroy = GNUNET_YES;
1799   /* Cleanup after the tunnel */
1800   if (GNUNET_NO == is_root && c == ch->dest)
1801   {
1802     LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is destination.\n", GML_2s (c));
1803     GML_client_delete_channel (c, ch, ch->lid_dest);
1804     ch->dest = NULL;
1805   }
1806   if (GNUNET_YES == is_root && c == ch->root)
1807   {
1808     LOG (GNUNET_ERROR_TYPE_DEBUG, " Client %s is owner.\n", GML_2s (c));
1809     GML_client_delete_channel (c, ch, ch->lid_root);
1810     ch->root = NULL;
1811   }
1812
1813   send_destroy (ch, GNUNET_NO);
1814   if (0 == ch->pending_messages)
1815     GCCH_destroy (ch);
1816 }
1817
1818
1819 /**
1820  * Handle a channel create requested by a client.
1821  *
1822  * Create the channel and the tunnel in case this was the first0 channel.
1823  *
1824  * @param c Client that requested the creation (will be the root).
1825  * @param msg Create Channel message.
1826  *
1827  * @return GNUNET_OK if everything went fine, GNUNET_SYSERR otherwise.
1828  */
1829 int
1830 GCCH_handle_local_create (struct CadetClient *c,
1831                           struct GNUNET_CADET_ChannelMessage *msg)
1832 {
1833   struct CadetChannel *ch;
1834   struct CadetTunnel *t;
1835   struct CadetPeer *peer;
1836   CADET_ChannelNumber chid;
1837
1838   LOG (GNUNET_ERROR_TYPE_DEBUG, "  towards %s:%u\n",
1839               GNUNET_i2s (&msg->peer), ntohl (msg->port));
1840   chid = ntohl (msg->channel_id);
1841
1842   /* Sanity check for duplicate channel IDs */
1843   if (NULL != GML_channel_get (c, chid))
1844   {
1845     GNUNET_break (0);
1846     return GNUNET_SYSERR;
1847   }
1848
1849   peer = GCP_get (&msg->peer);
1850   GCP_add_tunnel (peer);
1851   t = GCP_get_tunnel (peer);
1852
1853   if (GCP_get_short_id (peer) == myid)
1854   {
1855     GCT_change_cstate (t, CADET_TUNNEL_READY);
1856   }
1857   else
1858   {
1859     /* FIXME change to a tunnel API, eliminate ch <-> peer connection */
1860     GCP_connect (peer);
1861   }
1862
1863   /* Create channel */
1864   ch = channel_new (t, c, chid);
1865   if (NULL == ch)
1866   {
1867     GNUNET_break (0);
1868     return GNUNET_SYSERR;
1869   }
1870   ch->port = ntohl (msg->port);
1871   channel_set_options (ch, ntohl (msg->opt));
1872
1873   /* In unreliable channels, we'll use the DLL to buffer BCK data */
1874   ch->root_rel = GNUNET_new (struct CadetChannelReliability);
1875   ch->root_rel->ch = ch;
1876   ch->root_rel->retry_timer = CADET_RETRANSMIT_TIME;
1877   ch->root_rel->expected_delay.rel_value_us = 0;
1878
1879   LOG (GNUNET_ERROR_TYPE_DEBUG, "CREATED CHANNEL %s\n", GCCH_2s (ch));
1880
1881   send_create (ch);
1882
1883   return GNUNET_OK;
1884 }
1885
1886
1887 /**
1888  * Handler for cadet network payload traffic.
1889  *
1890  * @param ch Channel for the message.
1891  * @param msg Unencryted data message.
1892  * @param fwd Is this message fwd? This only is meaningful in loopback channels.
1893  *            #GNUNET_YES if message is FWD on the respective channel (loopback)
1894  *            #GNUNET_NO if message is BCK on the respective channel (loopback)
1895  *            #GNUNET_SYSERR if message on a one-ended channel (remote)
1896  */
1897 void
1898 GCCH_handle_data (struct CadetChannel *ch,
1899                   const struct GNUNET_CADET_Data *msg,
1900                   int fwd)
1901 {
1902   struct CadetChannelReliability *rel;
1903   struct CadetClient *c;
1904   uint32_t mid;
1905
1906   /* If this is a remote (non-loopback) channel, find 'fwd'. */
1907   if (GNUNET_SYSERR == fwd)
1908   {
1909     if (is_loopback (ch))
1910     {
1911       /* It is a loopback channel after all... */
1912       GNUNET_break (0);
1913       return;
1914     }
1915     fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
1916   }
1917
1918   /*  Initialize FWD/BCK data */
1919   c   = fwd ? ch->dest     : ch->root;
1920   rel = fwd ? ch->dest_rel : ch->root_rel;
1921
1922   if (NULL == c)
1923   {
1924     GNUNET_break (GNUNET_NO != ch->destroy);
1925     return;
1926   }
1927
1928   if (CADET_CHANNEL_READY != ch->state)
1929   {
1930     if (GNUNET_NO == fwd)
1931     {
1932       /* If we are the root, this means the other peer has sent traffic before
1933        * receiving our ACK. Even if the SYNACK goes missing, no traffic should
1934        * be sent before the ACK.
1935        */
1936       GNUNET_break_op (0);
1937       return;
1938     }
1939     /* If we are the dest, this means that the SYNACK got to the root but
1940      * the ACK went missing. Treat this as an ACK.
1941      */
1942     channel_confirm (ch, GNUNET_NO);
1943   }
1944
1945   GNUNET_STATISTICS_update (stats, "# data received", 1, GNUNET_NO);
1946
1947   mid = ntohl (msg->mid);
1948   LOG (GNUNET_ERROR_TYPE_INFO, "<=== DATA %u %s on channel %s\n",
1949        mid, GC_f2s (fwd), GCCH_2s (ch));
1950
1951   if (GNUNET_NO == ch->reliable ||
1952       ( !GC_is_pid_bigger (rel->mid_recv, mid) &&
1953         GC_is_pid_bigger (rel->mid_recv + 64, mid) ) )
1954   {
1955     LOG (GNUNET_ERROR_TYPE_DEBUG, "RECV %u (%u)\n",
1956          mid, ntohs (msg->header.size));
1957     if (GNUNET_YES == ch->reliable)
1958     {
1959       /* Is this the exact next expected messasge? */
1960       if (mid == rel->mid_recv)
1961       {
1962         LOG (GNUNET_ERROR_TYPE_DEBUG, "as expected, sending to client\n");
1963         send_client_data (ch, msg, fwd);
1964       }
1965       else
1966       {
1967         LOG (GNUNET_ERROR_TYPE_DEBUG, "save for later\n");
1968         add_buffered_data (msg, rel);
1969       }
1970     }
1971     else
1972     {
1973       /* Tunnel is unreliable: send to clients directly */
1974       /* FIXME: accept Out Of Order traffic */
1975       rel->mid_recv = mid + 1;
1976       send_client_data (ch, msg, fwd);
1977     }
1978   }
1979   else
1980   {
1981     if (GC_is_pid_bigger (rel->mid_recv, mid))
1982     {
1983       GNUNET_break_op (0);
1984       LOG (GNUNET_ERROR_TYPE_WARNING,
1985           "MID %u on channel %s not expected (window: %u - %u). Dropping!\n",
1986           mid, GCCH_2s (ch), rel->mid_recv, rel->mid_recv + 63);
1987     }
1988     else
1989     {
1990       LOG (GNUNET_ERROR_TYPE_WARNING,
1991            "Duplicate MID %u, channel %s (expecting MID %u). Re-sending ACK!\n",
1992            mid, GCCH_2s (ch), rel->mid_recv);
1993       if (NULL != rel->uniq)
1994       {
1995         LOG (GNUNET_ERROR_TYPE_WARNING,
1996             "We are trying to send an ACK, but don't seem have the "
1997             "bandwidth. Try to increase your ats QUOTA in you config file\n");
1998       }
1999
2000     }
2001   }
2002
2003   GCCH_send_data_ack (ch, fwd);
2004 }
2005
2006
2007 /**
2008  * Handler for cadet network traffic end-to-end ACKs.
2009  *
2010  * @param ch Channel on which we got this message.
2011  * @param msg Data message.
2012  * @param fwd Is this message fwd? This only is meaningful in loopback channels.
2013  *            #GNUNET_YES if message is FWD on the respective channel (loopback)
2014  *            #GNUNET_NO if message is BCK on the respective channel (loopback)
2015  *            #GNUNET_SYSERR if message on a one-ended channel (remote)
2016  */
2017 void
2018 GCCH_handle_data_ack (struct CadetChannel *ch,
2019                       const struct GNUNET_CADET_DataACK *msg,
2020                       int fwd)
2021 {
2022   struct CadetChannelReliability *rel;
2023   struct CadetReliableMessage *copy;
2024   struct CadetReliableMessage *next;
2025   uint32_t ack;
2026   int work;
2027
2028   /* If this is a remote (non-loopback) channel, find 'fwd'. */
2029   if (GNUNET_SYSERR == fwd)
2030   {
2031     if (is_loopback (ch))
2032     {
2033       /* It is a loopback channel after all... */
2034       GNUNET_break (0);
2035       return;
2036     }
2037     /* Inverted: if message came 'FWD' is a 'BCK ACK'. */
2038     fwd = (NULL != ch->dest) ? GNUNET_NO : GNUNET_YES;
2039   }
2040
2041   ack = ntohl (msg->mid);
2042   LOG (GNUNET_ERROR_TYPE_INFO, "<=== %s ACK %u + %llX\n",
2043        GC_f2s (fwd), ack, msg->futures);
2044
2045   if (GNUNET_YES == fwd)
2046   {
2047     rel = ch->root_rel;
2048   }
2049   else
2050   {
2051     rel = ch->dest_rel;
2052   }
2053   if (NULL == rel)
2054   {
2055     GNUNET_break_op (GNUNET_NO != ch->destroy);
2056     return;
2057   }
2058
2059   /* Free ACK'd copies: no need to retransmit those anymore FIXME refactor */
2060   for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next)
2061   {
2062     if (GC_is_pid_bigger (copy->mid, ack))
2063     {
2064       LOG (GNUNET_ERROR_TYPE_DEBUG, "  head %u, out!\n", copy->mid);
2065       channel_rel_free_sent (rel, msg);
2066       break;
2067     }
2068     work = GNUNET_YES;
2069     LOG (GNUNET_ERROR_TYPE_DEBUG, "  id %u\n", copy->mid);
2070     next = copy->next;
2071     if (GNUNET_YES == rel_message_free (copy, GNUNET_YES))
2072       return;
2073   }
2074
2075   /* ACK client if needed and possible */
2076   GCCH_allow_client (ch, fwd);
2077
2078   /* If some message was free'd, update the retransmission delay */
2079   if (GNUNET_YES == work)
2080   {
2081     if (NULL != rel->retry_task)
2082     {
2083       GNUNET_SCHEDULER_cancel (rel->retry_task);
2084       rel->retry_task = NULL;
2085       if (NULL != rel->head_sent && NULL == rel->head_sent->chq)
2086       {
2087         struct GNUNET_TIME_Absolute new_target;
2088         struct GNUNET_TIME_Relative delay;
2089
2090         delay = GNUNET_TIME_relative_multiply (rel->retry_timer,
2091                                                CADET_RETRANSMIT_MARGIN);
2092         new_target = GNUNET_TIME_absolute_add (rel->head_sent->timestamp,
2093                                                delay);
2094         delay = GNUNET_TIME_absolute_get_remaining (new_target);
2095         rel->retry_task =
2096             GNUNET_SCHEDULER_add_delayed (delay,
2097                                           &channel_retransmit_message,
2098                                           rel);
2099       }
2100     }
2101     else
2102     {
2103       /* Work was done but no task was pending? Shouldn't happen! */
2104       GNUNET_break (0);
2105     }
2106   }
2107 }
2108
2109
2110 /**
2111  * Handler for channel create messages.
2112  *
2113  * Does not have fwd parameter because it's always 'FWD': channel is incoming.
2114  *
2115  * @param t Tunnel this channel will be in.
2116  * @param msg Channel crate message.
2117  */
2118 struct CadetChannel *
2119 GCCH_handle_create (struct CadetTunnel *t,
2120                     const struct GNUNET_CADET_ChannelCreate *msg)
2121 {
2122   CADET_ChannelNumber chid;
2123   struct CadetChannel *ch;
2124   struct CadetClient *c;
2125   int new_channel;
2126
2127   chid = ntohl (msg->chid);
2128   ch = GCT_get_channel (t, chid);
2129   if (NULL == ch)
2130   {
2131     /* Create channel */
2132     ch = channel_new (t, NULL, 0);
2133     ch->gid = chid;
2134     channel_set_options (ch, ntohl (msg->opt));
2135     new_channel = GNUNET_YES;
2136   }
2137   else
2138   {
2139     new_channel = GNUNET_NO;
2140   }
2141
2142   if (GNUNET_YES == new_channel || GCT_is_loopback (t))
2143   {
2144     /* Find a destination client */
2145     ch->port = ntohl (msg->port);
2146     LOG (GNUNET_ERROR_TYPE_DEBUG, "   port %u\n", ch->port);
2147     c = GML_client_get_by_port (ch->port);
2148     if (NULL == c)
2149     {
2150       LOG (GNUNET_ERROR_TYPE_DEBUG, "  no client has port registered\n");
2151       if (is_loopback (ch))
2152       {
2153         LOG (GNUNET_ERROR_TYPE_DEBUG, "  loopback: destroy on handler\n");
2154         send_nack (ch);
2155       }
2156       else
2157       {
2158         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not loopback: destroy now\n");
2159         send_nack (ch);
2160         GCCH_destroy (ch);
2161       }
2162       return NULL;
2163     }
2164     else
2165     {
2166       LOG (GNUNET_ERROR_TYPE_DEBUG, "  client %p has port registered\n", c);
2167     }
2168
2169     add_destination (ch, c);
2170     if (GNUNET_YES == ch->reliable)
2171       LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Reliable\n");
2172     else
2173       LOG (GNUNET_ERROR_TYPE_DEBUG, "!!! Not Reliable\n");
2174
2175     send_client_create (ch);
2176     ch->state =  CADET_CHANNEL_SENT;
2177   }
2178   else
2179   {
2180     LOG (GNUNET_ERROR_TYPE_DEBUG, "  duplicate create channel\n");
2181     if (NULL != ch->dest_rel->retry_task)
2182     {
2183       LOG (GNUNET_ERROR_TYPE_DEBUG, "  clearing retry task\n");
2184       /* we were waiting to re-send our 'SYNACK', wait no more! */
2185       GNUNET_SCHEDULER_cancel (ch->dest_rel->retry_task);
2186       ch->dest_rel->retry_task = NULL;
2187     }
2188     else if (NULL != ch->dest_rel->uniq)
2189     {
2190       /* we are waiting to for our 'SYNACK' to leave the queue, all done! */
2191       return ch;
2192     }
2193   }
2194   send_ack (ch, GNUNET_YES);
2195
2196   return ch;
2197 }
2198
2199
2200 /**
2201  * Handler for channel NACK messages.
2202  *
2203  * NACK messages always go dest -> root, no need for 'fwd' or 'msg' parameter.
2204  *
2205  * @param ch Channel.
2206  */
2207 void
2208 GCCH_handle_nack (struct CadetChannel *ch)
2209 {
2210   send_client_nack (ch);
2211   GCCH_destroy (ch);
2212 }
2213
2214
2215 /**
2216  * Handler for channel ack messages.
2217  *
2218  * @param ch Channel.
2219  * @param msg Message.
2220  * @param fwd Is this message fwd? This only is meaningful in loopback channels.
2221  *            #GNUNET_YES if message is FWD on the respective channel (loopback)
2222  *            #GNUNET_NO if message is BCK on the respective channel (loopback)
2223  *            #GNUNET_SYSERR if message on a one-ended channel (remote)
2224  */
2225 void
2226 GCCH_handle_ack (struct CadetChannel *ch,
2227                  const struct GNUNET_CADET_ChannelManage *msg,
2228                  int fwd)
2229 {
2230   /* If this is a remote (non-loopback) channel, find 'fwd'. */
2231   if (GNUNET_SYSERR == fwd)
2232   {
2233     if (is_loopback (ch))
2234     {
2235       /* It is a loopback channel after all... */
2236       GNUNET_break (0);
2237       return;
2238     }
2239     fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
2240   }
2241
2242   channel_confirm (ch, !fwd);
2243 }
2244
2245
2246 /**
2247  * Handler for channel destroy messages.
2248  *
2249  * @param ch Channel to be destroyed of.
2250  * @param msg Message.
2251  * @param fwd Is this message fwd? This only is meaningful in loopback channels.
2252  *            #GNUNET_YES if message is FWD on the respective channel (loopback)
2253  *            #GNUNET_NO if message is BCK on the respective channel (loopback)
2254  *            #GNUNET_SYSERR if message on a one-ended channel (remote)
2255  */
2256 void
2257 GCCH_handle_destroy (struct CadetChannel *ch,
2258                      const struct GNUNET_CADET_ChannelManage *msg,
2259                      int fwd)
2260 {
2261   struct CadetChannelReliability *rel;
2262
2263   /* If this is a remote (non-loopback) channel, find 'fwd'. */
2264   if (GNUNET_SYSERR == fwd)
2265   {
2266     if (is_loopback (ch))
2267     {
2268       /* It is a loopback channel after all... */
2269       GNUNET_break (0);
2270       return;
2271     }
2272     fwd = (NULL != ch->dest) ? GNUNET_YES : GNUNET_NO;
2273   }
2274
2275   GCCH_debug (ch);
2276   if ( (fwd && NULL == ch->dest) || (!fwd && NULL == ch->root) )
2277   {
2278     /* Not for us (don't destroy twice a half-open loopback channel) */
2279     return;
2280   }
2281
2282   rel = fwd ? ch->dest_rel : ch->root_rel;
2283   if (0 == rel->n_recv)
2284   {
2285     send_destroy (ch, GNUNET_YES);
2286     GCCH_destroy (ch);
2287   }
2288   else
2289   {
2290     ch->destroy = GNUNET_YES;
2291   }
2292 }
2293
2294
2295 /**
2296  * Sends an already built message on a channel.
2297  *
2298  * If the channel is on a loopback tunnel, notifies the appropriate destination
2299  * client locally.
2300  *
2301  * On a normal channel passes the message to the tunnel for encryption and
2302  * sending on a connection.
2303  *
2304  * This function DOES NOT save the message for retransmission.
2305  *
2306  * @param message Message to send. Function makes a copy of it.
2307  * @param ch Channel on which this message is transmitted.
2308  * @param fwd Is this a fwd message?
2309  * @param existing_copy This is a retransmission, don't save a copy.
2310  */
2311 void
2312 GCCH_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2313                             struct CadetChannel *ch, int fwd,
2314                             void *existing_copy)
2315 {
2316   struct CadetChannelQueue *chq;
2317   uint16_t type;
2318
2319   type = ntohs (message->type);
2320   LOG (GNUNET_ERROR_TYPE_INFO, "===> %s %s on channel %s\n",
2321        GC_m2s (type), GC_f2s (fwd), GCCH_2s (ch));
2322
2323   if (GCT_is_loopback (ch->t))
2324   {
2325     handle_loopback (ch, message, fwd);
2326     return;
2327   }
2328
2329   switch (type)
2330   {
2331     struct GNUNET_CADET_Data *payload;
2332     case GNUNET_MESSAGE_TYPE_CADET_DATA:
2333
2334       payload = (struct GNUNET_CADET_Data *) message;
2335       LOG (GNUNET_ERROR_TYPE_INFO, "===> %s %u\n",
2336            GC_m2s (type), ntohl (payload->mid));
2337       if (GNUNET_YES == ch->reliable)
2338       {
2339         chq = GNUNET_new (struct CadetChannelQueue);
2340         chq->type = type;
2341         if (NULL == existing_copy)
2342           chq->copy = channel_save_copy (ch, message, fwd);
2343         else
2344         {
2345           chq->copy = (struct CadetReliableMessage *) existing_copy;
2346           if (NULL != chq->copy->chq)
2347           {
2348             /* Last retransmission was queued but not yet sent!
2349              * This retransmission was scheduled by a ch_message_sent which
2350              * followed a very fast RTT, so the tiny delay made the
2351              * retransmission function to execute before the previous
2352              * retransmitted message even had a chance to leave the peer.
2353              * Cancel this message and wait until the pending
2354              * retransmission leaves the peer and ch_message_sent starts
2355              * the timer for the next one.
2356              */
2357             GNUNET_free (chq);
2358             LOG (GNUNET_ERROR_TYPE_DEBUG,
2359                  "  exisitng copy not yet transmitted!\n");
2360             return;
2361           }
2362           LOG (GNUNET_ERROR_TYPE_DEBUG,
2363                "  using existing copy: %p {r:%p q:%p t:%u}\n",
2364                existing_copy,
2365                chq->copy->rel, chq->copy->chq, chq->copy->type);
2366         }
2367         LOG (GNUNET_ERROR_TYPE_DEBUG, "  new chq: %p\n", chq);
2368         chq->copy->chq = chq;
2369         chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL,
2370                                              GNUNET_YES,
2371                                              &ch_message_sent, chq);
2372         /* q itself is stored in copy */
2373         GNUNET_assert (NULL != chq->tq || GNUNET_NO != ch->destroy);
2374       }
2375       else
2376       {
2377         fire_and_forget (message, ch, GNUNET_NO);
2378       }
2379       break;
2380
2381
2382     case GNUNET_MESSAGE_TYPE_CADET_DATA_ACK:
2383     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
2384     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_ACK:
2385       chq = GNUNET_new (struct CadetChannelQueue);
2386       chq->type = type;
2387       chq->rel = fwd ? ch->root_rel : ch->dest_rel;
2388       if (NULL != chq->rel->uniq)
2389       {
2390         if (NULL != chq->rel->uniq->tq)
2391         {
2392           GCT_cancel (chq->rel->uniq->tq);
2393           /* ch_message_sent is called, freeing and NULLing uniq */
2394           GNUNET_break (NULL == chq->rel->uniq);
2395         }
2396         else
2397         {
2398           GNUNET_break (0);
2399           GNUNET_free (chq->rel->uniq);
2400         }
2401       }
2402
2403       chq->tq = GCT_send_prebuilt_message (message, ch->t, NULL, GNUNET_YES,
2404                                            &ch_message_sent, chq);
2405       if (NULL == chq->tq)
2406       {
2407         GNUNET_break (0);
2408         GCT_debug (ch->t, GNUNET_ERROR_TYPE_ERROR);
2409         GNUNET_free (chq);
2410         chq = NULL;
2411         return;
2412       }
2413       chq->rel->uniq = chq;
2414       break;
2415
2416
2417     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
2418     case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
2419       fire_and_forget (message, ch, GNUNET_YES);
2420       break;
2421
2422
2423     default:
2424       GNUNET_break (0);
2425       LOG (GNUNET_ERROR_TYPE_DEBUG, "type %s unknown!\n", GC_m2s (type));
2426       fire_and_forget (message, ch, GNUNET_YES);
2427   }
2428 }
2429
2430
2431 /**
2432  * Get the static string for identification of the channel.
2433  *
2434  * @param ch Channel.
2435  *
2436  * @return Static string with the channel IDs.
2437  */
2438 const char *
2439 GCCH_2s (const struct CadetChannel *ch)
2440 {
2441   static char buf[64];
2442
2443   if (NULL == ch)
2444     return "(NULL Channel)";
2445
2446   SPRINTF (buf, "%s:%u gid:%X (%X / %X)",
2447            GCT_2s (ch->t), ch->port, ch->gid, ch->lid_root, ch->lid_dest);
2448
2449   return buf;
2450 }