b07f9911e98f66c0d57ee7352cb4c6d515750066
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_connection.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2015 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/gnunet-service-cadet_connection.c
22  * @brief GNUnet CADET service connection handling
23  * @author Bartlomiej Polot
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_statistics_service.h"
28 #include "cadet_path.h"
29 #include "cadet_protocol.h"
30 #include "cadet.h"
31 #include "gnunet-service-cadet_connection.h"
32 #include "gnunet-service-cadet_peer.h"
33 #include "gnunet-service-cadet_tunnel.h"
34
35
36 /**
37  * Should we run somewhat expensive checks on our invariants?
38  */
39 #define CHECK_INVARIANTS 0
40
41
42 #define LOG(level, ...) GNUNET_log_from (level,"cadet-con",__VA_ARGS__)
43 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
44
45
46 #define CADET_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
47                                   GNUNET_TIME_UNIT_MINUTES,\
48                                   10)
49 #define AVG_MSGS                32
50
51
52 /******************************************************************************/
53 /********************************   STRUCTS  **********************************/
54 /******************************************************************************/
55
56 /**
57  * Struct to encapsulate all the Flow Control information to a peer to which
58  * we are directly connected (on a core level).
59  */
60 struct CadetFlowControl
61 {
62   /**
63    * Connection this controls.
64    */
65   struct CadetConnection *c;
66
67   /**
68    * How many messages are in the queue on this connection.
69    */
70   unsigned int queue_n;
71
72   /**
73    * How many messages do we accept in the queue.
74    */
75   unsigned int queue_max;
76
77   /**
78    * ID of the last packet sent towards the peer.
79    */
80   uint32_t last_pid_sent;
81
82   /**
83    * ID of the last packet received from the peer.
84    */
85   uint32_t last_pid_recv;
86
87   /**
88    * Bitmap of past 32 messages received:
89    * - LSB being @c last_pid_recv.
90    * - MSB being @c last_pid_recv - 31 (mod UINTMAX).
91    */
92   uint32_t recv_bitmap;
93
94   /**
95    * Last ACK sent to the peer (peer can't send more than this PID).
96    */
97   uint32_t last_ack_sent;
98
99   /**
100    * Last ACK sent towards the origin (for traffic towards leaf node).
101    */
102   uint32_t last_ack_recv;
103
104   /**
105    * Task to poll the peer in case of a lost ACK causes stall.
106    */
107   struct GNUNET_SCHEDULER_Task *poll_task;
108
109   /**
110    * How frequently to poll for ACKs.
111    */
112   struct GNUNET_TIME_Relative poll_time;
113
114   /**
115    * Queued poll message, to cancel if not necessary anymore (got ACK).
116    */
117   struct CadetConnectionQueue *poll_msg;
118
119   /**
120    * Queued poll message, to cancel if not necessary anymore (got ACK).
121    */
122   struct CadetConnectionQueue *ack_msg;
123 };
124
125 /**
126  * Keep a record of the last messages sent on this connection.
127  */
128 struct CadetConnectionPerformance
129 {
130   /**
131    * Circular buffer for storing measurements.
132    */
133   double usecsperbyte[AVG_MSGS];
134
135   /**
136    * Running average of @c usecsperbyte.
137    */
138   double avg;
139
140   /**
141    * How many values of @c usecsperbyte are valid.
142    */
143   uint16_t size;
144
145   /**
146    * Index of the next "free" position in @c usecsperbyte.
147    */
148   uint16_t idx;
149 };
150
151
152 /**
153  * Struct containing all information regarding a connection to a peer.
154  */
155 struct CadetConnection
156 {
157   /**
158    * Tunnel this connection is part of.
159    */
160   struct CadetTunnel *t;
161
162   /**
163    * Flow control information for traffic fwd.
164    */
165   struct CadetFlowControl fwd_fc;
166
167   /**
168    * Flow control information for traffic bck.
169    */
170   struct CadetFlowControl bck_fc;
171
172   /**
173    * Measure connection performance on the endpoint.
174    */
175   struct CadetConnectionPerformance *perf;
176
177   /**
178    * ID of the connection.
179    */
180   struct GNUNET_CADET_Hash id;
181
182   /**
183    * Path being used for the tunnel. At the origin of the connection
184    * it's a pointer to the destination's path pool, otherwise just a copy.
185    */
186   struct CadetPeerPath *path;
187
188   /**
189    * Task to keep the used paths alive at the owner,
190    * time tunnel out on all the other peers.
191    */
192   struct GNUNET_SCHEDULER_Task *fwd_maintenance_task;
193
194   /**
195    * Task to keep the used paths alive at the destination,
196    * time tunnel out on all the other peers.
197    */
198   struct GNUNET_SCHEDULER_Task *bck_maintenance_task;
199
200   /**
201    * Queue handle for maintainance traffic. One handle for FWD and BCK since
202    * one peer never needs to maintain both directions (no loopback connections).
203    */
204   struct CadetPeerQueue *maintenance_q;
205
206   /**
207    * Should equal #get_next_hop(), or NULL if that peer disconnected.
208    */
209   struct CadetPeer *next_peer;
210
211   /**
212    * Should equal #get_prev_hop(), or NULL if that peer disconnected.
213    */
214   struct CadetPeer *prev_peer;
215
216   /**
217    * State of the connection.
218    */
219   enum CadetConnectionState state;
220
221   /**
222    * Position of the local peer in the path.
223    */
224   unsigned int own_pos;
225
226   /**
227    * Pending message count.
228    */
229   unsigned int pending_messages;
230
231   /**
232    * Destroy flag:
233    * - if 0, connection in use.
234    * - if 1, destroy on last message.
235    * - if 2, connection is being destroyed don't re-enter.
236    */
237   int destroy;
238
239   /**
240    * In-connection-map flag. Sometimes, when @e destroy is set but
241    * actual destruction is delayed to enable us to finish processing
242    * queues (i.e. in the direction that is still working), we remove
243    * the connection from the map to prevent it from still being
244    * found (and used) by accident. This flag is set to #GNUNET_YES
245    * for a connection that is not in the #connections map.  Should
246    * only be #GNUNET_YES if #destroy is also non-zero.
247    */
248   int was_removed;
249
250   /**
251    * Counter to do exponential backoff when creating a connection (max 64).
252    */
253   unsigned short create_retry;
254
255   /**
256    * Task to check if connection has duplicates.
257    */
258   struct GNUNET_SCHEDULER_Task *check_duplicates_task;
259 };
260
261
262 /**
263  * Handle for messages queued but not yet sent.
264  */
265 struct CadetConnectionQueue
266 {
267   /**
268    * Peer queue handle, to cancel if necessary.
269    */
270   struct CadetPeerQueue *q;
271
272   /**
273    * Continuation to call once sent.
274    */
275   GCC_sent cont;
276
277   /**
278    * Closure for @e cont.
279    */
280   void *cont_cls;
281
282   /**
283    * Was this a forced message? (Do not account for it)
284    */
285   int forced;
286 };
287
288 /******************************************************************************/
289 /*******************************   GLOBALS  ***********************************/
290 /******************************************************************************/
291
292 /**
293  * Global handle to the statistics service.
294  */
295 extern struct GNUNET_STATISTICS_Handle *stats;
296
297 /**
298  * Local peer own ID (memory efficient handle).
299  */
300 extern GNUNET_PEER_Id myid;
301
302 /**
303  * Local peer own ID (full value).
304  */
305 extern struct GNUNET_PeerIdentity my_full_id;
306
307 /**
308  * Connections known, indexed by cid (CadetConnection).
309  */
310 static struct GNUNET_CONTAINER_MultiHashMap *connections;
311
312 /**
313  * How many connections are we willing to maintain.
314  * Local connections are always allowed, even if there are more connections than max.
315  */
316 static unsigned long long max_connections;
317
318 /**
319  * How many messages *in total* are we willing to queue, divide by number of
320  * connections to get connection queue size.
321  */
322 static unsigned long long max_msgs_queue;
323
324 /**
325  * How often to send path keepalives. Paths timeout after 4 missed.
326  */
327 static struct GNUNET_TIME_Relative refresh_connection_time;
328
329 /**
330  * How often to send path create / ACKs.
331  */
332 static struct GNUNET_TIME_Relative create_connection_time;
333
334
335 /******************************************************************************/
336 /********************************   STATIC  ***********************************/
337 /******************************************************************************/
338
339
340
341 #if 0 // avoid compiler warning for unused static function
342 static void
343 fc_debug (struct CadetFlowControl *fc)
344 {
345   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
346               fc->last_pid_recv, fc->last_ack_sent);
347   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
348               fc->last_pid_sent, fc->last_ack_recv);
349   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
350               fc->queue_n, fc->queue_max);
351 }
352
353 static void
354 connection_debug (struct CadetConnection *c)
355 {
356   if (NULL == c)
357   {
358     LOG (GNUNET_ERROR_TYPE_INFO, "DEBUG NULL CONNECTION\n");
359     return;
360   }
361   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
362               peer2s (c->t->peer), GCC_2s (c));
363   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
364               c->state, c->pending_messages);
365   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
366   fc_debug (&c->fwd_fc);
367   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
368   fc_debug (&c->bck_fc);
369 }
370 #endif
371
372
373 /**
374  * Schedule next keepalive task, taking in consideration
375  * the connection state and number of retries.
376  *
377  * @param c Connection for which to schedule the next keepalive.
378  * @param fwd Direction for the next keepalive.
379  */
380 static void
381 schedule_next_keepalive (struct CadetConnection *c, int fwd);
382
383
384 /**
385  * Resets the connection timeout task, some other message has done the
386  * task's job.
387  * - For the first peer on the direction this means to send
388  *   a keepalive or a path confirmation message (either create or ACK).
389  * - For all other peers, this means to destroy the connection,
390  *   due to lack of activity.
391  * Starts the timeout if no timeout was running (connection just created).
392  *
393  * @param c Connection whose timeout to reset.
394  * @param fwd Is this forward?
395  */
396 static void
397 connection_reset_timeout (struct CadetConnection *c, int fwd);
398
399
400 /**
401  * Get string description for tunnel state. Reentrant.
402  *
403  * @param s Tunnel state.
404  *
405  * @return String representation.
406  */
407 static const char *
408 GCC_state2s (enum CadetConnectionState s)
409 {
410   switch (s)
411   {
412     case CADET_CONNECTION_NEW:
413       return "CADET_CONNECTION_NEW";
414     case CADET_CONNECTION_SENT:
415       return "CADET_CONNECTION_SENT";
416     case CADET_CONNECTION_ACK:
417       return "CADET_CONNECTION_ACK";
418     case CADET_CONNECTION_READY:
419       return "CADET_CONNECTION_READY";
420     case CADET_CONNECTION_DESTROYED:
421       return "CADET_CONNECTION_DESTROYED";
422     case CADET_CONNECTION_BROKEN:
423       return "CADET_CONNECTION_BROKEN";
424     default:
425       GNUNET_break (0);
426       LOG (GNUNET_ERROR_TYPE_ERROR, " conn state %u unknown!\n", s);
427       return "CADET_CONNECTION_STATE_ERROR";
428   }
429 }
430
431
432 /**
433  * Initialize a Flow Control structure to the initial state.
434  *
435  * @param fc Flow Control structure to initialize.
436  */
437 static void
438 fc_init (struct CadetFlowControl *fc)
439 {
440   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
441   fc->last_pid_recv = (uint32_t) -1;
442   fc->last_ack_sent = (uint32_t) 0;
443   fc->last_ack_recv = (uint32_t) 0;
444   fc->poll_task = NULL;
445   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
446   fc->queue_n = 0;
447   fc->queue_max = (max_msgs_queue / max_connections) + 1;
448 }
449
450
451 /**
452  * Find a connection.
453  *
454  * @param cid Connection ID.
455  */
456 static struct CadetConnection *
457 connection_get (const struct GNUNET_CADET_Hash *cid)
458 {
459   return GNUNET_CONTAINER_multihashmap_get (connections, GC_h2hc (cid));
460 }
461
462
463 static void
464 connection_change_state (struct CadetConnection* c,
465                          enum CadetConnectionState state)
466 {
467   LOG (GNUNET_ERROR_TYPE_DEBUG,
468        "Connection %s state %s -> %s\n",
469        GCC_2s (c), GCC_state2s (c->state), GCC_state2s (state));
470   if (CADET_CONNECTION_DESTROYED <= c->state) /* Destroyed or broken. */
471   {
472     LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
473     return;
474   }
475   c->state = state;
476   if (CADET_CONNECTION_READY == state)
477     c->create_retry = 1;
478 }
479
480
481 /**
482  * Callback called when a queued ACK message is sent.
483  *
484  * @param cls Closure (FC).
485  * @param c Connection this message was on.
486  * @param q Queue handler this call invalidates.
487  * @param type Type of message sent.
488  * @param fwd Was this a FWD going message?
489  * @param size Size of the message.
490  */
491 static void
492 ack_sent (void *cls,
493           struct CadetConnection *c,
494           struct CadetConnectionQueue *q,
495           uint16_t type, int fwd, size_t size)
496 {
497   struct CadetFlowControl *fc = cls;
498
499   fc->ack_msg = NULL;
500 }
501
502
503 /**
504  * Send an ACK on the connection, informing the predecessor about
505  * the available buffer space. Should not be called in case the peer
506  * is origin (no predecessor) in the @c fwd direction.
507  *
508  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
509  * the ACK itself goes "back" (dest->root).
510  *
511  * @param c Connection on which to send the ACK.
512  * @param buffer How much space free to advertise?
513  * @param fwd Is this FWD ACK? (Going dest -> root)
514  * @param force Don't optimize out.
515  */
516 static void
517 send_ack (struct CadetConnection *c, unsigned int buffer, int fwd, int force)
518 {
519   struct CadetFlowControl *next_fc;
520   struct CadetFlowControl *prev_fc;
521   struct GNUNET_CADET_ACK msg;
522   uint32_t ack;
523   int delta;
524
525   /* If origin, there is no connection to send ACKs. Wrong function! */
526   GCC_check_connections ();
527   if (GCC_is_origin (c, fwd))
528   {
529     LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s is origin in %s\n",
530          GCC_2s (c), GC_f2s (fwd));
531     GNUNET_break (0);
532     return;
533   }
534
535   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
536   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
537
538   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection send %s ack on %s\n",
539        GC_f2s (fwd), GCC_2s (c));
540
541   /* Check if we need to transmit the ACK. */
542   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
543   if (3 < delta && buffer < delta && GNUNET_NO == force)
544   {
545     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
546     LOG (GNUNET_ERROR_TYPE_DEBUG,
547          "  last pid recv: %u, last ack sent: %u\n",
548          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
549     GCC_check_connections ();
550     return;
551   }
552
553   /* Ok, ACK might be necessary, what PID to ACK? */
554   ack = prev_fc->last_pid_recv + buffer;
555   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
556   LOG (GNUNET_ERROR_TYPE_DEBUG,
557        " last pid %u, last ack %u, qmax %u, q %u\n",
558        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
559        next_fc->queue_max, next_fc->queue_n);
560   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
561   {
562     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
563     GCC_check_connections ();
564     return;
565   }
566
567   /* Check if message is already in queue */
568   if (NULL != prev_fc->ack_msg)
569   {
570     if (GC_is_pid_bigger (ack, prev_fc->last_ack_sent))
571     {
572       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
573       GCC_cancel (prev_fc->ack_msg);
574       /* GCC_cancel triggers ack_sent(), which clears fc->ack_msg */
575     }
576     else
577     {
578       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
579       GCC_check_connections ();
580       return;
581     }
582   }
583
584   prev_fc->last_ack_sent = ack;
585
586   /* Build ACK message and send on conn */
587   msg.header.size = htons (sizeof (msg));
588   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_ACK);
589   msg.ack = htonl (ack);
590   msg.cid = c->id;
591
592   prev_fc->ack_msg = GCC_send_prebuilt_message (&msg.header, 0, ack, c,
593                                                 !fwd, GNUNET_YES,
594                                                 &ack_sent, prev_fc);
595   GNUNET_assert (NULL != prev_fc->ack_msg);
596   GCC_check_connections ();
597 }
598
599
600 /**
601  * Callback called when a connection queued message is sent.
602  *
603  * Calculates the average time and connection packet tracking.
604  *
605  * @param cls Closure (ConnectionQueue Handle).
606  * @param c Connection this message was on.
607  * @param sent Was it really sent? (Could have been canceled)
608  * @param type Type of message sent.
609  * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
610  * @param fwd Was this a FWD going message?
611  * @param size Size of the message.
612  * @param wait Time spent waiting for core (only the time for THIS message)
613  * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise.
614  */
615 static int
616 conn_message_sent (void *cls,
617                    struct CadetConnection *c, int sent,
618                    uint16_t type, uint32_t pid, int fwd, size_t size,
619                    struct GNUNET_TIME_Relative wait)
620 {
621   struct CadetConnectionPerformance *p;
622   struct CadetFlowControl *fc;
623   struct CadetConnectionQueue *q = cls;
624   double usecsperbyte;
625   int forced;
626
627   GCC_check_connections ();
628   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
629
630   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
631
632   fc = fwd ? &c->fwd_fc : &c->bck_fc;
633   LOG (GNUNET_ERROR_TYPE_DEBUG, " %ssent %s %s pid %u\n",
634        sent ? "" : "not ", GC_f2s (fwd), GC_m2s (type), pid);
635   if (NULL != q)
636   {
637     forced = q->forced;
638     if (NULL != q->cont)
639     {
640       LOG (GNUNET_ERROR_TYPE_DEBUG, " calling cont\n");
641       q->cont (q->cont_cls, c, q, type, fwd, size);
642     }
643     GNUNET_free (q);
644   }
645   else if (type == GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED
646            || type == GNUNET_MESSAGE_TYPE_CADET_AX)
647   {
648     /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
649     forced = GNUNET_YES;
650   }
651   else
652   {
653     forced = GNUNET_NO;
654   }
655   if (NULL == c)
656   {
657     if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
658         && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
659     {
660       LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
661            GC_m2s (type));
662     }
663     GCC_check_connections ();
664     return GNUNET_NO;
665   }
666   LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P- %p %u\n", c, c->pending_messages);
667   c->pending_messages--;
668   if ( (GNUNET_YES == c->destroy) &&
669        (0 == c->pending_messages) )
670   {
671     LOG (GNUNET_ERROR_TYPE_DEBUG,
672          "!  destroying connection!\n");
673     GCC_destroy (c);
674     GCC_check_connections ();
675     return GNUNET_YES;
676   }
677   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
678   switch (type)
679   {
680     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
681     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
682       c->maintenance_q = NULL;
683       /* Don't trigger a keepalive for sent ACKs, only SYN and SYNACKs */
684       if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE == type || !fwd)
685         schedule_next_keepalive (c, fwd);
686       break;
687
688     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
689     case GNUNET_MESSAGE_TYPE_CADET_AX:
690       if (GNUNET_YES == sent)
691       {
692         GNUNET_assert (NULL != q);
693         fc->last_pid_sent = pid;
694         if (GC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
695           GCC_start_poll (c, fwd);
696         GCC_send_ack (c, fwd, GNUNET_NO);
697         connection_reset_timeout (c, fwd);
698       }
699
700       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
701       if (GNUNET_NO == forced)
702       {
703         fc->queue_n--;
704         LOG (GNUNET_ERROR_TYPE_DEBUG,
705             "!   accounting pid %u\n",
706             fc->last_pid_sent);
707       }
708       else
709       {
710         LOG (GNUNET_ERROR_TYPE_DEBUG,
711              "!   forced, Q_N not accounting pid %u\n",
712              fc->last_pid_sent);
713       }
714       break;
715
716     case GNUNET_MESSAGE_TYPE_CADET_KX:
717       if (GNUNET_YES == sent)
718         connection_reset_timeout (c, fwd);
719       break;
720
721     case GNUNET_MESSAGE_TYPE_CADET_POLL:
722       fc->poll_msg = NULL;
723       break;
724
725     case GNUNET_MESSAGE_TYPE_CADET_ACK:
726       fc->ack_msg = NULL;
727       break;
728
729     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
730     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
731       break;
732
733     default:
734       LOG (GNUNET_ERROR_TYPE_ERROR, "%s unknown\n", GC_m2s (type));
735       GNUNET_break (0);
736       break;
737   }
738   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
739
740   if (NULL == c->perf)
741     return GNUNET_NO; /* Only endpoints are interested in timing. */
742
743   p = c->perf;
744   usecsperbyte = ((double) wait.rel_value_us) / size;
745   if (p->size == AVG_MSGS)
746   {
747     /* Array is full. Substract oldest value, add new one and store. */
748     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
749     p->usecsperbyte[p->idx] = usecsperbyte;
750     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
751   }
752   else
753   {
754     /* Array not yet full. Add current value to avg and store. */
755     p->usecsperbyte[p->idx] = usecsperbyte;
756     p->avg *= p->size;
757     p->avg += p->usecsperbyte[p->idx];
758     p->size++;
759     p->avg /= p->size;
760   }
761   p->idx = (p->idx + 1) % AVG_MSGS;
762   GCC_check_connections ();
763   return GNUNET_NO;
764 }
765
766
767 /**
768  * Get the previous hop in a connection
769  *
770  * @param c Connection.
771  *
772  * @return Previous peer in the connection.
773  */
774 static struct CadetPeer *
775 get_prev_hop (const struct CadetConnection *c)
776 {
777   GNUNET_PEER_Id id;
778
779   if (NULL == c->path)
780     return NULL;
781   LOG (GNUNET_ERROR_TYPE_DEBUG,
782        " get prev hop %s [%u/%u]\n",
783        GCC_2s (c), c->own_pos, c->path->length);
784   if (0 == c->own_pos || c->path->length < 2)
785     id = c->path->peers[0];
786   else
787     id = c->path->peers[c->own_pos - 1];
788
789   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
790        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
791
792   return GCP_get_short (id, GNUNET_YES);
793 }
794
795
796 /**
797  * Get the next hop in a connection
798  *
799  * @param c Connection.
800  *
801  * @return Next peer in the connection.
802  */
803 static struct CadetPeer *
804 get_next_hop (const struct CadetConnection *c)
805 {
806   GNUNET_PEER_Id id;
807
808   if (NULL == c->path)
809     return NULL;
810
811   LOG (GNUNET_ERROR_TYPE_DEBUG, " get next hop %s [%u/%u]\n",
812        GCC_2s (c), c->own_pos, c->path->length);
813   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
814     id = c->path->peers[c->path->length - 1];
815   else
816     id = c->path->peers[c->own_pos + 1];
817
818   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
819        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
820
821   return GCP_get_short (id, GNUNET_YES);
822 }
823
824
825 /**
826  * Check that the direct neighbours (previous and next hop)
827  * are properly associated with this connection.
828  *
829  * @param c connection to check
830  */
831 static void
832 check_neighbours (const struct CadetConnection *c)
833 {
834   if (NULL == c->path)
835     return; /* nothing to check */
836   GCP_check_connection (get_next_hop (c), c);
837   GCP_check_connection (get_prev_hop (c), c);
838 }
839
840
841 /**
842  * Helper for #check_connections().  Calls #check_neighbours().
843  *
844  * @param cls NULL
845  * @param key ignored
846  * @param value the `struct CadetConnection` to check
847  * @return #GNUNET_OK (continue to iterate)
848  */
849 static int
850 check_connection (void *cls,
851                   const struct GNUNET_HashCode *key,
852                   void *value)
853 {
854   struct CadetConnection *c = value;
855
856   check_neighbours (c);
857   return GNUNET_OK;
858 }
859
860
861 /**
862  * Check invariants for all connections using #check_neighbours().
863  */
864 void
865 GCC_check_connections ()
866 {
867   if (0 == CHECK_INVARIANTS)
868     return;
869   if (NULL == connections)
870     return;
871   GNUNET_CONTAINER_multihashmap_iterate (connections,
872                                          &check_connection,
873                                          NULL);
874 }
875
876
877 /**
878  * Get the hop in a connection.
879  *
880  * @param c Connection.
881  * @param fwd Next in the FWD direction?
882  *
883  * @return Next peer in the connection.
884  */
885 static struct CadetPeer *
886 get_hop (struct CadetConnection *c, int fwd)
887 {
888   return (fwd) ? get_next_hop (c) : get_prev_hop (c);
889 }
890
891
892 /**
893  * Get a bit mask for a message received out-of-order.
894  *
895  * @param last_pid_recv Last PID we received prior to the out-of-order.
896  * @param ooo_pid PID of the out-of-order message.
897  */
898 static uint32_t
899 get_recv_bitmask (uint32_t last_pid_recv, uint32_t ooo_pid)
900 {
901   return 1 << (last_pid_recv - ooo_pid);
902 }
903
904
905 /**
906  * Check is an out-of-order message is ok:
907  * - at most 31 messages behind.
908  * - not duplicate.
909  *
910  * @param last_pid_recv Last in-order PID received.
911  */
912 static int
913 is_ooo_ok (uint32_t last_pid_recv, uint32_t ooo_pid, uint32_t ooo_bitmap)
914 {
915   uint32_t mask;
916
917   if (GC_is_pid_bigger (last_pid_recv - 31, ooo_pid))
918     return GNUNET_NO;
919
920   mask = get_recv_bitmask (last_pid_recv, ooo_pid);
921   if (0 != (ooo_bitmap & mask))
922     return GNUNET_NO;
923
924   return GNUNET_YES;
925 }
926
927
928 /**
929  * Is traffic coming from this sender 'FWD' traffic?
930  *
931  * @param c Connection to check.
932  * @param sender Peer identity of neighbor.
933  *
934  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
935  *         the traffic is 'FWD'.
936  *         #GNUNET_NO for BCK.
937  *         #GNUNET_SYSERR for errors.
938  */
939 static int
940 is_fwd (const struct CadetConnection *c,
941         const struct GNUNET_PeerIdentity *sender)
942 {
943   GNUNET_PEER_Id id;
944
945   id = GNUNET_PEER_search (sender);
946   if (GCP_get_short_id (get_prev_hop (c)) == id)
947     return GNUNET_YES;
948
949   if (GCP_get_short_id (get_next_hop (c)) == id)
950     return GNUNET_NO;
951
952   GNUNET_break (0);
953   return GNUNET_SYSERR;
954 }
955
956
957 /**
958  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
959  * or a first CONNECTION_ACK directed to us.
960  *
961  * @param connection Connection to confirm.
962  * @param fwd Should we send it FWD? (root->dest)
963  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
964  */
965 static void
966 send_connection_ack (struct CadetConnection *connection, int fwd)
967 {
968   struct CadetTunnel *t;
969
970   GCC_check_connections ();
971   t = connection->t;
972   LOG (GNUNET_ERROR_TYPE_INFO, "--> {%14s ACK} on conn %s\n",
973        GC_f2s (!fwd), GCC_2s (connection));
974   GCP_queue_add (get_hop (connection, fwd), NULL,
975                  GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, 0, 0,
976                  sizeof (struct GNUNET_CADET_ConnectionACK),
977                  connection, fwd, &conn_message_sent, NULL);
978   connection->pending_messages++;
979   if (CADET_TUNNEL_NEW == GCT_get_cstate (t))
980     GCT_change_cstate (t, CADET_TUNNEL_WAITING);
981   if (CADET_CONNECTION_READY != connection->state)
982     connection_change_state (connection, CADET_CONNECTION_SENT);
983   GCC_check_connections ();
984 }
985
986
987 /**
988  * Send a notification that a connection is broken.
989  *
990  * @param c Connection that is broken.
991  * @param id1 Peer that has disconnected.
992  * @param id2 Peer that has disconnected.
993  * @param fwd Direction towards which to send it.
994  */
995 static void
996 send_broken (struct CadetConnection *c,
997              const struct GNUNET_PeerIdentity *id1,
998              const struct GNUNET_PeerIdentity *id2,
999              int fwd)
1000 {
1001   struct GNUNET_CADET_ConnectionBroken msg;
1002
1003   GCC_check_connections ();
1004   msg.header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
1005   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
1006   msg.cid = c->id;
1007   msg.peer1 = *id1;
1008   msg.peer2 = *id2;
1009   GNUNET_assert (NULL ==
1010                  GCC_send_prebuilt_message (&msg.header, 0, 0, c, fwd,
1011                                             GNUNET_YES, NULL, NULL));
1012   GCC_check_connections ();
1013 }
1014
1015
1016 /**
1017  * Send a notification that a connection is broken, when a connection
1018  * isn't even known to the local peer or soon to be destroyed.
1019  *
1020  * @param connection_id Connection ID.
1021  * @param id1 Peer that has disconnected, probably local peer.
1022  * @param id2 Peer that has disconnected can be NULL if unknown.
1023  * @param peer Peer to notify (neighbor who sent the connection).
1024  */
1025 static void
1026 send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id,
1027                      const struct GNUNET_PeerIdentity *id1,
1028                      const struct GNUNET_PeerIdentity *id2,
1029                      const struct GNUNET_PeerIdentity *peer_id)
1030 {
1031   struct GNUNET_CADET_ConnectionBroken *msg;
1032   struct CadetPeer *neighbor;
1033
1034   GCC_check_connections ();
1035   LOG (GNUNET_ERROR_TYPE_INFO, "--> BROKEN on unknown connection %s\n",
1036        GNUNET_h2s (GC_h2hc (connection_id)));
1037
1038   msg = GNUNET_new (struct GNUNET_CADET_ConnectionBroken);
1039   msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
1040   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
1041   msg->cid = *connection_id;
1042   msg->peer1 = *id1;
1043   if (NULL != id2)
1044     msg->peer2 = *id2;
1045   else
1046     memset (&msg->peer2, 0, sizeof (msg->peer2));
1047   neighbor = GCP_get (peer_id, GNUNET_NO); /* We MUST know neighbor. */
1048   GNUNET_assert (NULL != neighbor);
1049   GCP_queue_add (neighbor, msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1050                  0, 2, sizeof (struct GNUNET_CADET_ConnectionBroken),
1051                  NULL, GNUNET_SYSERR, /* connection, fwd */
1052                  NULL, NULL); /* continuation */
1053   GCC_check_connections ();
1054 }
1055
1056
1057 /**
1058  * Send keepalive packets for a connection.
1059  *
1060  * @param c Connection to keep alive..
1061  * @param fwd Is this a FWD keepalive? (owner -> dest).
1062  */
1063 static void
1064 send_connection_keepalive (struct CadetConnection *c, int fwd)
1065 {
1066   struct GNUNET_MessageHeader msg;
1067   struct CadetFlowControl *fc;
1068
1069   GCC_check_connections ();
1070   LOG (GNUNET_ERROR_TYPE_INFO,
1071        "keepalive %s for connection %s\n",
1072        GC_f2s (fwd), GCC_2s (c));
1073
1074   GNUNET_assert (NULL != c->t);
1075   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1076   if (0 < fc->queue_n || GNUNET_YES == GCT_has_queued_traffic (c->t))
1077   {
1078     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, traffic in queue\n");
1079     return;
1080   }
1081
1082   GNUNET_STATISTICS_update (stats, "# keepalives sent", 1, GNUNET_NO);
1083
1084   GNUNET_assert (NULL != c->t);
1085   msg.size = htons (sizeof (msg));
1086   msg.type = htons (GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE);
1087
1088   GNUNET_assert (NULL ==
1089                  GCT_send_prebuilt_message (&msg, c->t, c,
1090                                             GNUNET_NO, NULL, NULL));
1091   GCC_check_connections ();
1092 }
1093
1094
1095 /**
1096  * Send CONNECTION_{CREATE/ACK} packets for a connection.
1097  *
1098  * @param c Connection for which to send the message.
1099  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
1100  */
1101 static void
1102 connection_recreate (struct CadetConnection *c, int fwd)
1103 {
1104   LOG (GNUNET_ERROR_TYPE_DEBUG,
1105        "sending connection recreate\n");
1106   if (fwd)
1107     GCC_send_create (c);
1108   else
1109     send_connection_ack (c, GNUNET_NO);
1110 }
1111
1112
1113 /**
1114  * Generic connection timer management.
1115  * Depending on the role of the peer in the connection will send the
1116  * appropriate message (build or keepalive)
1117  *
1118  * @param c Conncetion to maintain.
1119  * @param fwd Is FWD?
1120  */
1121 static void
1122 connection_maintain (struct CadetConnection *c, int fwd)
1123 {
1124   if (GNUNET_NO != c->destroy)
1125   {
1126     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, being destroyed\n");
1127     return;
1128   }
1129
1130   if (NULL == c->t)
1131   {
1132     GNUNET_break (0);
1133     GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
1134     return;
1135   }
1136
1137   if (CADET_TUNNEL_SEARCHING == GCT_get_cstate (c->t))
1138   {
1139     /* If status is SEARCHING, why is there a connection? Should be WAITING */
1140     GNUNET_break (0);
1141     GCT_debug (c->t, GNUNET_ERROR_TYPE_ERROR);
1142     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, tunnel SEARCHING\n");
1143     schedule_next_keepalive (c, fwd);
1144     return;
1145   }
1146   switch (c->state)
1147   {
1148     case CADET_CONNECTION_NEW:
1149       GNUNET_break (0);
1150       /* fall-through */
1151     case CADET_CONNECTION_SENT:
1152       connection_recreate (c, fwd);
1153       break;
1154     case CADET_CONNECTION_READY:
1155       send_connection_keepalive (c, fwd);
1156       break;
1157     default:
1158       break;
1159   }
1160 }
1161
1162
1163 /**
1164  * Keep the connection alive.
1165  *
1166  * @param c Connection to keep alive.
1167  * @param fwd Direction.
1168  * @param shutdown Are we shutting down? (Don't send traffic)
1169  *                 Non-zero value for true, not necessarily GNUNET_YES.
1170  */
1171 static void
1172 connection_keepalive (struct CadetConnection *c, int fwd, int shutdown)
1173 {
1174   GCC_check_connections ();
1175   LOG (GNUNET_ERROR_TYPE_DEBUG,
1176        "%s keepalive for %s\n",
1177        GC_f2s (fwd), GCC_2s (c));
1178
1179   if (fwd)
1180     c->fwd_maintenance_task = NULL;
1181   else
1182     c->bck_maintenance_task = NULL;
1183
1184   if (GNUNET_NO != shutdown)
1185     return;
1186
1187   connection_maintain (c, fwd);
1188   GCC_check_connections ();
1189   /* Next execution will be scheduled by message_sent or _maintain*/
1190 }
1191
1192
1193 /**
1194  * Keep the connection alive in the FWD direction.
1195  *
1196  * @param cls Closure (connection to keepalive).
1197  * @param tc TaskContext.
1198  */
1199 static void
1200 connection_fwd_keepalive (void *cls,
1201                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1202 {
1203   GCC_check_connections ();
1204   connection_keepalive ((struct CadetConnection *) cls,
1205                         GNUNET_YES,
1206                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1207   GCC_check_connections ();
1208 }
1209
1210
1211 /**
1212  * Keep the connection alive in the BCK direction.
1213  *
1214  * @param cls Closure (connection to keepalive).
1215  * @param tc TaskContext.
1216  */
1217 static void
1218 connection_bck_keepalive (void *cls,
1219                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1220 {
1221   GCC_check_connections ();
1222   connection_keepalive ((struct CadetConnection *) cls,
1223                         GNUNET_NO,
1224                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1225   GCC_check_connections ();
1226 }
1227
1228
1229 /**
1230  * Schedule next keepalive task, taking in consideration
1231  * the connection state and number of retries.
1232  *
1233  * If the peer is not the origin, do nothing.
1234  *
1235  * @param c Connection for which to schedule the next keepalive.
1236  * @param fwd Direction for the next keepalive.
1237  */
1238 static void
1239 schedule_next_keepalive (struct CadetConnection *c, int fwd)
1240 {
1241   struct GNUNET_TIME_Relative delay;
1242   struct GNUNET_SCHEDULER_Task * *task_id;
1243   GNUNET_SCHEDULER_TaskCallback keepalive_task;
1244
1245   GCC_check_connections ();
1246   if (GNUNET_NO == GCC_is_origin (c, fwd))
1247     return;
1248
1249   /* Calculate delay to use, depending on the state of the connection */
1250   if (CADET_CONNECTION_READY == c->state)
1251   {
1252     delay = refresh_connection_time;
1253   }
1254   else
1255   {
1256     if (1 > c->create_retry)
1257       c->create_retry = 1;
1258     delay = GNUNET_TIME_relative_multiply (create_connection_time,
1259                                            c->create_retry);
1260     if (c->create_retry < 64)
1261       c->create_retry *= 2;
1262   }
1263
1264   /* Select direction-dependent parameters */
1265   if (GNUNET_YES == fwd)
1266   {
1267     task_id = &c->fwd_maintenance_task;
1268     keepalive_task = &connection_fwd_keepalive;
1269   }
1270   else
1271   {
1272     task_id = &c->bck_maintenance_task;
1273     keepalive_task = &connection_bck_keepalive;
1274   }
1275
1276   /* Check that no one scheduled it before us */
1277   if (NULL != *task_id)
1278   {
1279     /* No need for a _break. It can happen for instance when sending a SYNACK
1280      * for a duplicate SYN: the first SYNACK scheduled the task. */
1281     GNUNET_SCHEDULER_cancel (*task_id);
1282   }
1283
1284   /* Schedule the task */
1285   *task_id = GNUNET_SCHEDULER_add_delayed (delay,
1286                                            keepalive_task,
1287                                            c);
1288   LOG (GNUNET_ERROR_TYPE_DEBUG, "next keepalive in %s\n",
1289        GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1290   GCC_check_connections ();
1291 }
1292
1293
1294 /**
1295  * @brief Re-initiate traffic on this connection if necessary.
1296  *
1297  * Check if there is traffic queued towards this peer
1298  * and the core transmit handle is NULL (traffic was stalled).
1299  * If so, call core tmt rdy.
1300  *
1301  * @param c Connection on which initiate traffic.
1302  * @param fwd Is this about fwd traffic?
1303  */
1304 static void
1305 connection_unlock_queue (struct CadetConnection *c, int fwd)
1306 {
1307   struct CadetPeer *peer;
1308
1309   GCC_check_connections ();
1310   LOG (GNUNET_ERROR_TYPE_DEBUG,
1311        "connection_unlock_queue %s on %s\n",
1312        GC_f2s (fwd), GCC_2s (c));
1313
1314   if (GCC_is_terminal (c, fwd))
1315   {
1316     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal, can unlock!\n");
1317     return;
1318   }
1319
1320   peer = get_hop (c, fwd);
1321   GCP_queue_unlock (peer, c);
1322   GCC_check_connections ();
1323 }
1324
1325
1326 /**
1327  * Cancel all transmissions that belong to a certain connection.
1328  *
1329  * If the connection is scheduled for destruction and no more messages are left,
1330  * the connection will be destroyed by the continuation call.
1331  *
1332  * @param c Connection which to cancel. Might be destroyed during this call.
1333  * @param fwd Cancel fwd traffic?
1334  */
1335 static void
1336 connection_cancel_queues (struct CadetConnection *c,
1337                           int fwd)
1338 {
1339   struct CadetFlowControl *fc;
1340   struct CadetPeer *peer;
1341
1342   GCC_check_connections ();
1343   LOG (GNUNET_ERROR_TYPE_DEBUG,
1344        "Cancel %s queues for connection %s\n",
1345        GC_f2s (fwd), GCC_2s (c));
1346   if (NULL == c)
1347   {
1348     GNUNET_break (0);
1349     return;
1350   }
1351
1352   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1353   if (NULL != fc->poll_task)
1354   {
1355     GNUNET_SCHEDULER_cancel (fc->poll_task);
1356     fc->poll_task = NULL;
1357     LOG (GNUNET_ERROR_TYPE_DEBUG, "Cancel POLL in ccq for fc %p\n", fc);
1358   }
1359   peer = get_hop (c, fwd);
1360   GCP_queue_cancel (peer, c);
1361   GCC_check_connections ();
1362 }
1363
1364
1365 /**
1366  * Function called if a connection has been stalled for a while,
1367  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1368  *
1369  * @param cls Closure (poll ctx).
1370  * @param tc TaskContext.
1371  */
1372 static void
1373 connection_poll (void *cls,
1374                  const struct GNUNET_SCHEDULER_TaskContext *tc);
1375
1376
1377 /**
1378  * Callback called when a queued POLL message is sent.
1379  *
1380  * @param cls Closure (FC).
1381  * @param c Connection this message was on.
1382  * @param q Queue handler this call invalidates.
1383  * @param type Type of message sent.
1384  * @param fwd Was this a FWD going message?
1385  * @param size Size of the message.
1386  */
1387 static void
1388 poll_sent (void *cls,
1389            struct CadetConnection *c,
1390            struct CadetConnectionQueue *q,
1391            uint16_t type, int fwd, size_t size)
1392 {
1393   struct CadetFlowControl *fc = cls;
1394
1395   if (2 == c->destroy)
1396   {
1397     LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL canceled on shutdown\n");
1398     return;
1399   }
1400   LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL sent for %s, scheduling new one!\n",
1401        GCC_2s (c));
1402   fc->poll_msg = NULL;
1403   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
1404   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1405                                                 &connection_poll,
1406                                                 fc);
1407   LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
1408 }
1409
1410
1411 /**
1412  * Function called if a connection has been stalled for a while,
1413  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1414  *
1415  * @param cls Closure (poll ctx).
1416  * @param tc TaskContext.
1417  */
1418 static void
1419 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1420 {
1421   struct CadetFlowControl *fc = cls;
1422   struct GNUNET_CADET_Poll msg;
1423   struct CadetConnection *c;
1424   int fwd;
1425
1426   fc->poll_task = NULL;
1427   GCC_check_connections ();
1428   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1429   {
1430     return;
1431   }
1432
1433   c = fc->c;
1434   fwd = fc == &c->fwd_fc;
1435   LOG (GNUNET_ERROR_TYPE_DEBUG, "Polling connection %s %s\n",
1436        GCC_2s (c),  GC_f2s (fwd));
1437
1438   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_POLL);
1439   msg.header.size = htons (sizeof (msg));
1440   msg.pid = htonl (fc->last_pid_sent);
1441   LOG (GNUNET_ERROR_TYPE_DEBUG, " last pid sent: %u\n", fc->last_pid_sent);
1442   fc->poll_msg =
1443       GCC_send_prebuilt_message (&msg.header, 0, fc->last_pid_sent, c,
1444                                  fc == &c->fwd_fc, GNUNET_YES, &poll_sent, fc);
1445   GNUNET_assert (NULL != fc->poll_msg);
1446   GCC_check_connections ();
1447 }
1448
1449
1450 /**
1451  * Resend all queued messages for a connection on other connections of the
1452  * same tunnel, if possible. The connection WILL BE DESTROYED by this function.
1453  *
1454  * @param c Connection whose messages to resend.
1455  * @param fwd Resend fwd messages?
1456  */
1457 static void
1458 resend_messages_and_destroy (struct CadetConnection *c, int fwd)
1459 {
1460   struct GNUNET_MessageHeader *out_msg;
1461   struct CadetTunnel *t = c->t;
1462   struct CadetPeer *neighbor;
1463   unsigned int pending;
1464   int destroyed;
1465
1466   GCC_check_connections ();
1467   c->state = CADET_CONNECTION_DESTROYED;
1468   c->destroy = GNUNET_YES;
1469
1470   destroyed = GNUNET_NO;
1471   neighbor = get_hop (c, fwd);
1472   pending = c->pending_messages;
1473
1474   while (NULL != (out_msg = GCP_connection_pop (neighbor, c, &destroyed)))
1475   {
1476     if (NULL != t)
1477       GCT_resend_message (out_msg, t);
1478     GNUNET_free (out_msg);
1479   }
1480
1481   /* All pending messages should have been popped,
1482    * and the connection destroyed by the continuation.
1483    */
1484   if (GNUNET_YES != destroyed)
1485   {
1486     if (0 != pending)
1487     {
1488       GNUNET_break (0);
1489       GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
1490       if (NULL != t) GCT_debug (t, GNUNET_ERROR_TYPE_ERROR);
1491     }
1492     GCC_destroy (c);
1493   }
1494   GCC_check_connections ();
1495 }
1496
1497
1498 /**
1499  * Generic connection timeout implementation.
1500  *
1501  * Timeout function due to lack of keepalive/traffic from an endpoint.
1502  * Destroys connection if called.
1503  *
1504  * @param c Connection to destroy.
1505  * @param fwd Was the timeout from the origin? (FWD timeout)
1506  */
1507 static void
1508 connection_timeout (struct CadetConnection *c, int fwd)
1509 {
1510   struct CadetFlowControl *reverse_fc;
1511
1512   GCC_check_connections ();
1513   reverse_fc = fwd ? &c->bck_fc : &c->fwd_fc;
1514
1515   LOG (GNUNET_ERROR_TYPE_INFO,
1516        "Connection %s %s timed out. Destroying.\n",
1517        GCC_2s (c),
1518        GC_f2s (fwd));
1519   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
1520
1521   if (GCC_is_origin (c, fwd)) /* Loopback? Something is wrong! */
1522   {
1523     GNUNET_break (0);
1524     return;
1525   }
1526
1527   /* If dest, salvage queued traffic. */
1528   if (GCC_is_origin (c, !fwd))
1529   {
1530     const struct GNUNET_PeerIdentity *next_hop;
1531
1532     next_hop = GCP_get_id (fwd ? get_prev_hop (c) : get_next_hop (c));
1533     send_broken_unknown (&c->id, &my_full_id, NULL, next_hop);
1534     if (0 < reverse_fc->queue_n)
1535       resend_messages_and_destroy (c, !fwd);
1536     GCC_check_connections ();
1537     return;
1538   }
1539
1540   GCC_destroy (c);
1541   GCC_check_connections ();
1542 }
1543
1544
1545 /**
1546  * Timeout function due to lack of keepalive/traffic from the owner.
1547  * Destroys connection if called.
1548  *
1549  * @param cls Closure (connection to destroy).
1550  * @param tc TaskContext.
1551  */
1552 static void
1553 connection_fwd_timeout (void *cls,
1554                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1555 {
1556   struct CadetConnection *c = cls;
1557
1558   c->fwd_maintenance_task = NULL;
1559   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1560     return;
1561   GCC_check_connections ();
1562   connection_timeout (c, GNUNET_YES);
1563   GCC_check_connections ();
1564 }
1565
1566
1567 /**
1568  * Timeout function due to lack of keepalive/traffic from the destination.
1569  * Destroys connection if called.
1570  *
1571  * @param cls Closure (connection to destroy).
1572  * @param tc TaskContext
1573  */
1574 static void
1575 connection_bck_timeout (void *cls,
1576                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1577 {
1578   struct CadetConnection *c = cls;
1579
1580   c->bck_maintenance_task = NULL;
1581   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1582     return;
1583   GCC_check_connections ();
1584   connection_timeout (c, GNUNET_NO);
1585   GCC_check_connections ();
1586 }
1587
1588
1589 /**
1590  * Resets the connection timeout task, some other message has done the
1591  * task's job.
1592  * - For the first peer on the direction this means to send
1593  *   a keepalive or a path confirmation message (either create or ACK).
1594  * - For all other peers, this means to destroy the connection,
1595  *   due to lack of activity.
1596  * Starts the timeout if no timeout was running (connection just created).
1597  *
1598  * @param c Connection whose timeout to reset.
1599  * @param fwd Is this forward?
1600  *
1601  * TODO use heap to improve efficiency of scheduler.
1602  */
1603 static void
1604 connection_reset_timeout (struct CadetConnection *c, int fwd)
1605 {
1606   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GC_f2s (fwd));
1607   if (GCC_is_origin (c, fwd)) /* Startpoint */
1608   {
1609     schedule_next_keepalive (c, fwd);
1610   }
1611   else /* Relay, endpoint. */
1612   {
1613     struct GNUNET_TIME_Relative delay;
1614     struct GNUNET_SCHEDULER_Task * *ti;
1615     GNUNET_SCHEDULER_TaskCallback f;
1616
1617     ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1618
1619     if (NULL != *ti)
1620       GNUNET_SCHEDULER_cancel (*ti);
1621     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1622     LOG (GNUNET_ERROR_TYPE_DEBUG, "  timing out in %s\n",
1623          GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_NO));
1624     f = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1625     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1626   }
1627 }
1628
1629
1630 /**
1631  * Iterator to compare each connection's path with the path of a new connection.
1632  *
1633  * If the connection conincides, the c member of path is set to the connection
1634  * and the destroy flag of the connection is set.
1635  *
1636  * @param cls Closure (new path).
1637  * @param c Connection in the tunnel to check.
1638  */
1639 static void
1640 check_path (void *cls, struct CadetConnection *c)
1641 {
1642   struct CadetConnection *new_conn = cls;
1643   struct CadetPeerPath *path = new_conn->path;
1644
1645   LOG (GNUNET_ERROR_TYPE_DEBUG, "  checking %s (%p), length %u\n",
1646        GCC_2s (c), c, c->path->length);
1647
1648   if (c != new_conn
1649       && c->destroy == GNUNET_NO
1650       && c->state != CADET_CONNECTION_BROKEN
1651       && c->state != CADET_CONNECTION_DESTROYED
1652       && path_equivalent (path, c->path))
1653   {
1654     new_conn->destroy = GNUNET_YES;
1655     new_conn->path->c = c;
1656     LOG (GNUNET_ERROR_TYPE_DEBUG, "  MATCH!\n");
1657   }
1658 }
1659
1660
1661 /**
1662  * Finds out if this path is already being used by an existing connection.
1663  *
1664  * Checks the tunnel towards the destination to see if it contains
1665  * any connection with the same path.
1666  *
1667  * If the existing connection is ready, it is kept.
1668  * Otherwise if the sender has a smaller ID that ours, we accept it (and
1669  * the peer will eventually reject our attempt).
1670  *
1671  * @param path Path to check.
1672  * @return #GNUNET_YES if the tunnel has a connection with the same path,
1673  *         #GNUNET_NO otherwise.
1674  */
1675 static int
1676 does_connection_exist (struct CadetConnection *conn)
1677 {
1678   struct CadetPeer *p;
1679   struct CadetTunnel *t;
1680   struct CadetConnection *c;
1681
1682   p = GCP_get_short (conn->path->peers[0], GNUNET_NO);
1683   if (NULL == p)
1684     return GNUNET_NO;
1685   t = GCP_get_tunnel (p);
1686   if (NULL == t)
1687     return GNUNET_NO;
1688
1689   LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking for duplicates\n");
1690
1691   GCT_iterate_connections (t, &check_path, conn);
1692
1693   if (GNUNET_YES == conn->destroy)
1694   {
1695     c = conn->path->c;
1696     conn->destroy = GNUNET_NO;
1697     conn->path->c = conn;
1698     LOG (GNUNET_ERROR_TYPE_DEBUG, " found duplicate of %s\n", GCC_2s (conn));
1699     LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate: %s\n", GCC_2s (c));
1700     GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
1701     if (CADET_CONNECTION_READY == c->state)
1702     {
1703       /* The other peer confirmed a live connection with this path,
1704        * why is it trying to duplicate it. */
1705       GNUNET_STATISTICS_update (stats, "# duplicate connections", 1, GNUNET_NO);
1706       return GNUNET_YES;
1707     }
1708     LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate not valid, connection unique\n");
1709     return GNUNET_NO;
1710   }
1711   else
1712   {
1713     LOG (GNUNET_ERROR_TYPE_DEBUG, " %s has no duplicates\n", GCC_2s (conn));
1714     return GNUNET_NO;
1715   }
1716 }
1717
1718
1719 /**
1720  * @brief Check if the tunnel this connection belongs to has any other
1721  * connection with the same path, and destroy one if so.
1722  *
1723  * @param cls Closure (connection to check).
1724  * @param tc Task context.
1725  */
1726 static void
1727 check_duplicates (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1728 {
1729   struct CadetConnection *c = cls;
1730
1731   c->check_duplicates_task = NULL;
1732   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1733     return;
1734
1735   if (GNUNET_YES == does_connection_exist (c))
1736   {
1737     GCT_debug (c->t, GNUNET_ERROR_TYPE_DEBUG);
1738     send_broken (c, &my_full_id, &my_full_id, GCC_is_origin (c, GNUNET_YES));
1739     GCC_destroy (c);
1740   }
1741 }
1742
1743
1744
1745 /**
1746  * Wait for enough time to let any dead connections time out and check for
1747  * any remaining duplicates.
1748  *
1749  * @param c Connection that is a potential duplicate.
1750  */
1751 static void
1752 schedule_check_duplicates (struct CadetConnection *c)
1753 {
1754   struct GNUNET_TIME_Relative delay;
1755
1756   if (NULL != c->check_duplicates_task)
1757     return;
1758
1759   delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 5);
1760   c->check_duplicates_task = GNUNET_SCHEDULER_add_delayed (delay,
1761                                                            &check_duplicates,
1762                                                            c);
1763 }
1764
1765
1766
1767 /**
1768  * Add the connection to the list of both neighbors.
1769  *
1770  * @param c Connection.
1771  *
1772  * @return #GNUNET_OK if everything went fine
1773  *         #GNUNET_SYSERR if the was an error and @c c is malformed.
1774  */
1775 static int
1776 register_neighbors (struct CadetConnection *c)
1777 {
1778   c->next_peer = get_next_hop (c);
1779   c->prev_peer = get_prev_hop (c);
1780   GNUNET_assert (c->next_peer != c->prev_peer);
1781   LOG (GNUNET_ERROR_TYPE_DEBUG,
1782        "register neighbors for connection %s\n",
1783        GCC_2s (c));
1784   path_debug (c->path);
1785   LOG (GNUNET_ERROR_TYPE_DEBUG,
1786        "own pos %u\n", c->own_pos);
1787   LOG (GNUNET_ERROR_TYPE_DEBUG,
1788        "putting connection %s to next peer %p\n",
1789        GCC_2s (c),
1790        c->next_peer);
1791   LOG (GNUNET_ERROR_TYPE_DEBUG, "next peer %p %s\n",
1792        c->next_peer,
1793        GCP_2s (c->next_peer));
1794   LOG (GNUNET_ERROR_TYPE_DEBUG,
1795        "putting connection %s to prev peer %p\n",
1796        GCC_2s (c),
1797        c->prev_peer);
1798   LOG (GNUNET_ERROR_TYPE_DEBUG,
1799        "prev peer %p %s\n",
1800        c->prev_peer,
1801        GCP_2s (c->prev_peer));
1802
1803   if ( (GNUNET_NO == GCP_is_neighbor (c->next_peer)) ||
1804        (GNUNET_NO == GCP_is_neighbor (c->prev_peer)) )
1805   {
1806     if (GCC_is_origin (c, GNUNET_YES))
1807       GNUNET_STATISTICS_update (stats, "# local bad paths", 1, GNUNET_NO);
1808     GNUNET_STATISTICS_update (stats, "# bad paths", 1, GNUNET_NO);
1809
1810     LOG (GNUNET_ERROR_TYPE_DEBUG,
1811          "  register neighbors failed\n");
1812     LOG (GNUNET_ERROR_TYPE_DEBUG,
1813          "  prev: %s, neighbor?: %d\n",
1814          GCP_2s (c->prev_peer),
1815          GCP_is_neighbor (c->prev_peer));
1816     LOG (GNUNET_ERROR_TYPE_DEBUG,
1817          "  next: %s, neighbor?: %d\n",
1818          GCP_2s (c->next_peer),
1819          GCP_is_neighbor (c->next_peer));
1820     return GNUNET_SYSERR;
1821   }
1822   GCP_add_connection (c->next_peer, c, GNUNET_NO);
1823   GCP_add_connection (c->prev_peer, c, GNUNET_YES);
1824
1825   return GNUNET_OK;
1826 }
1827
1828
1829 /**
1830  * Remove the connection from the list of both neighbors.
1831  *
1832  * @param c Connection.
1833  */
1834 static void
1835 unregister_neighbors (struct CadetConnection *c)
1836 {
1837 //  struct CadetPeer *peer; FIXME dont use next_peer, prev_peer
1838   /* Either already unregistered or never got registered, it's ok either way. */
1839   if (NULL == c->path)
1840     return;
1841   if (NULL != c->next_peer)
1842   {
1843     GCP_remove_connection (c->next_peer, c);
1844     c->next_peer = NULL;
1845   }
1846   if (NULL != c->prev_peer)
1847   {
1848     GCP_remove_connection (c->prev_peer, c);
1849     c->prev_peer = NULL;
1850   }
1851 }
1852
1853
1854 /**
1855  * Bind the connection to the peer and the tunnel to that peer.
1856  *
1857  * If the peer has no tunnel, create one. Update tunnel and connection
1858  * data structres to reflect new status.
1859  *
1860  * @param c Connection.
1861  * @param peer Peer.
1862  */
1863 static void
1864 add_to_peer (struct CadetConnection *c,
1865              struct CadetPeer *peer)
1866 {
1867   GCP_add_tunnel (peer);
1868   c->t = GCP_get_tunnel (peer);
1869   GCT_add_connection (c->t, c);
1870 }
1871
1872
1873 /**
1874  * Log receipt of message on stderr (INFO level).
1875  *
1876  * @param message Message received.
1877  * @param peer Peer who sent the message.
1878  * @param hash Connection ID.
1879  */
1880 static void
1881 log_message (const struct GNUNET_MessageHeader *message,
1882              const struct GNUNET_PeerIdentity *peer,
1883              const struct GNUNET_CADET_Hash *hash)
1884 {
1885   uint16_t size;
1886
1887   size = ntohs (message->size);
1888   LOG (GNUNET_ERROR_TYPE_INFO, "\n");
1889   LOG (GNUNET_ERROR_TYPE_INFO, "\n");
1890   LOG (GNUNET_ERROR_TYPE_INFO, "<-- %s on conn %s from %s, %6u bytes\n",
1891        GC_m2s (ntohs (message->type)), GNUNET_h2s (GC_h2hc (hash)),
1892        GNUNET_i2s (peer), (unsigned int) size);
1893 }
1894
1895 /******************************************************************************/
1896 /********************************    API    ***********************************/
1897 /******************************************************************************/
1898
1899 /**
1900  * Core handler for connection creation.
1901  *
1902  * @param cls Closure (unused).
1903  * @param peer Sender (neighbor).
1904  * @param message Message.
1905  * @return #GNUNET_OK to keep the connection open,
1906  *         #GNUNET_SYSERR to close it (signal serious error)
1907  */
1908 int
1909 GCC_handle_create (void *cls,
1910                    const struct GNUNET_PeerIdentity *peer,
1911                    const struct GNUNET_MessageHeader *message)
1912 {
1913   struct GNUNET_CADET_ConnectionCreate *msg;
1914   struct GNUNET_PeerIdentity *id;
1915   struct GNUNET_CADET_Hash *cid;
1916   struct CadetPeerPath *path;
1917   struct CadetPeer *dest_peer;
1918   struct CadetPeer *orig_peer;
1919   struct CadetConnection *c;
1920   unsigned int own_pos;
1921   uint16_t size;
1922
1923   GCC_check_connections ();
1924   /* Check size */
1925   size = ntohs (message->size);
1926   if (size < sizeof (struct GNUNET_CADET_ConnectionCreate))
1927   {
1928     GNUNET_break_op (0);
1929     return GNUNET_OK;
1930   }
1931
1932   /* Calculate hops */
1933   size -= sizeof (struct GNUNET_CADET_ConnectionCreate);
1934   if (size % sizeof (struct GNUNET_PeerIdentity))
1935   {
1936     GNUNET_break_op (0);
1937     return GNUNET_OK;
1938   }
1939   if (0 != size % sizeof (struct GNUNET_PeerIdentity))
1940   {
1941     GNUNET_break_op (0);
1942     return GNUNET_OK;
1943   }
1944   size /= sizeof (struct GNUNET_PeerIdentity);
1945   if (1 > size)
1946   {
1947     GNUNET_break_op (0);
1948     return GNUNET_OK;
1949   }
1950   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1951
1952   /* Get parameters */
1953   msg = (struct GNUNET_CADET_ConnectionCreate *) message;
1954   cid = &msg->cid;
1955   log_message (message, peer, cid);
1956   id = (struct GNUNET_PeerIdentity *) &msg[1];
1957   LOG (GNUNET_ERROR_TYPE_DEBUG, "    origin: %s\n", GNUNET_i2s (id));
1958
1959   /* Create connection */
1960   c = connection_get (cid);
1961   if (NULL == c)
1962   {
1963     path = path_build_from_peer_ids ((struct GNUNET_PeerIdentity *) &msg[1],
1964                                      size, myid, &own_pos);
1965     if (NULL == path)
1966     {
1967       /* Path was malformed, probably our own ID was not in it. */
1968       GNUNET_STATISTICS_update (stats, "# malformed paths", 1, GNUNET_NO);
1969       GNUNET_break_op (0);
1970       return GNUNET_OK;
1971     }
1972
1973     if (0 == own_pos)
1974     {
1975       /* We received this request from a neighbor, we cannot be origin */
1976       GNUNET_STATISTICS_update (stats, "# fake paths", 1, GNUNET_NO);
1977       GNUNET_break_op (0);
1978       path_destroy (path);
1979       return GNUNET_OK;
1980     }
1981
1982     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1983     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1984     c = GCC_new (cid, NULL, path, own_pos);
1985     if (NULL == c)
1986     {
1987       if (path->length - 1 == own_pos)
1988       {
1989         /* If we are destination, why did the creation fail? */
1990         GNUNET_break (0);
1991         path_destroy (path);
1992         GCC_check_connections ();
1993         return GNUNET_OK;
1994       }
1995       send_broken_unknown (cid, &my_full_id,
1996                            GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
1997                            peer);
1998       path_destroy (path);
1999       GCC_check_connections ();
2000       return GNUNET_OK;
2001     }
2002     GCP_add_path_to_all (path, GNUNET_NO);
2003     connection_reset_timeout (c, GNUNET_YES);
2004   }
2005   else
2006   {
2007     path = path_duplicate (c->path);
2008   }
2009   if (CADET_CONNECTION_NEW == c->state)
2010     connection_change_state (c, CADET_CONNECTION_SENT);
2011
2012   /* Remember peers */
2013   dest_peer = GCP_get (&id[size - 1], GNUNET_YES);
2014   orig_peer = GCP_get (&id[0], GNUNET_YES);
2015
2016   /* Is it a connection to us? */
2017   if (c->own_pos == path->length - 1)
2018   {
2019     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
2020     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
2021
2022     add_to_peer (c, orig_peer);
2023     if (GNUNET_YES == does_connection_exist (c))
2024     {
2025       /* Peer created a connection equal to one we think exists
2026        * and is fine.
2027        * Solution: Keep both and postpone disambiguation. In the meantime
2028        * the connection will time out or peer will inform us it is broken.
2029        *
2030        * Other options:
2031        * - Use explicit duplicate.
2032        * - Accept new conn and destroy the old. (interruption in higher level)
2033        * - Keep the one with higher ID / created by peer with higher ID. */
2034        schedule_check_duplicates (c);
2035     }
2036
2037     if (CADET_TUNNEL_NEW == GCT_get_cstate (c->t))
2038       GCT_change_cstate (c->t,  CADET_TUNNEL_WAITING);
2039
2040     send_connection_ack (c, GNUNET_NO);
2041     if (CADET_CONNECTION_SENT == c->state)
2042       connection_change_state (c, CADET_CONNECTION_ACK);
2043   }
2044   else
2045   {
2046     /* It's for somebody else! Retransmit. */
2047     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
2048     GCP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
2049     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
2050     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c,
2051                                                       GNUNET_YES, GNUNET_YES,
2052                                                       NULL, NULL));
2053   }
2054   path_destroy (path);
2055   GCC_check_connections ();
2056   return GNUNET_OK;
2057 }
2058
2059
2060 /**
2061  * Core handler for path confirmations.
2062  *
2063  * @param cls closure
2064  * @param message message
2065  * @param peer peer identity this notification is about
2066  * @return #GNUNET_OK to keep the connection open,
2067  *         #GNUNET_SYSERR to close it (signal serious error)
2068  */
2069 int
2070 GCC_handle_confirm (void *cls,
2071                     const struct GNUNET_PeerIdentity *peer,
2072                     const struct GNUNET_MessageHeader *message)
2073 {
2074   struct GNUNET_CADET_ConnectionACK *msg;
2075   struct CadetConnection *c;
2076   struct CadetPeerPath *p;
2077   struct CadetPeer *pi;
2078   enum CadetConnectionState oldstate;
2079   int fwd;
2080
2081   GCC_check_connections ();
2082   msg = (struct GNUNET_CADET_ConnectionACK *) message;
2083   log_message (message, peer, &msg->cid);
2084   c = connection_get (&msg->cid);
2085   if (NULL == c)
2086   {
2087     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
2088                               1, GNUNET_NO);
2089     LOG (GNUNET_ERROR_TYPE_DEBUG,
2090          "  don't know the connection!\n");
2091     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2092     GCC_check_connections ();
2093     return GNUNET_OK;
2094   }
2095
2096   oldstate = c->state;
2097   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
2098   pi = GCP_get (peer, GNUNET_YES);
2099   if (get_next_hop (c) == pi)
2100   {
2101     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
2102     fwd = GNUNET_NO;
2103     if (CADET_CONNECTION_SENT == oldstate)
2104       connection_change_state (c, CADET_CONNECTION_ACK);
2105   }
2106   else if (get_prev_hop (c) == pi)
2107   {
2108     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FINAL ACK\n");
2109     fwd = GNUNET_YES;
2110     connection_change_state (c, CADET_CONNECTION_READY);
2111   }
2112   else
2113   {
2114     GNUNET_break_op (0);
2115     return GNUNET_OK;
2116   }
2117
2118   connection_reset_timeout (c, fwd);
2119
2120   /* Add path to peers? */
2121   p = c->path;
2122   if (NULL != p)
2123   {
2124     GCP_add_path_to_all (p, GNUNET_YES);
2125   }
2126   else
2127   {
2128     GNUNET_break (0);
2129   }
2130
2131   /* Message for us as creator? */
2132   if (GCC_is_origin (c, GNUNET_YES))
2133   {
2134     if (GNUNET_NO != fwd)
2135     {
2136       GNUNET_break_op (0);
2137       return GNUNET_OK;
2138     }
2139     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
2140
2141     /* If just created, cancel the short timeout and start a long one */
2142     if (CADET_CONNECTION_SENT == oldstate)
2143       connection_reset_timeout (c, GNUNET_YES);
2144
2145     /* Change connection state */
2146     connection_change_state (c, CADET_CONNECTION_READY);
2147     send_connection_ack (c, GNUNET_YES);
2148
2149     /* Change tunnel state, trigger KX */
2150     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2151       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2152     GCC_check_connections ();
2153     return GNUNET_OK;
2154   }
2155
2156   /* Message for us as destination? */
2157   if (GCC_is_terminal (c, GNUNET_YES))
2158   {
2159     if (GNUNET_YES != fwd)
2160     {
2161       GNUNET_break_op (0);
2162       return GNUNET_OK;
2163     }
2164     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
2165
2166     /* If just created, cancel the short timeout and start a long one */
2167     if (CADET_CONNECTION_ACK == oldstate)
2168       connection_reset_timeout (c, GNUNET_NO);
2169
2170     /* Change tunnel state */
2171     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2172       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2173     GCC_check_connections ();
2174     return GNUNET_OK;
2175   }
2176
2177   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2178   GNUNET_assert (NULL ==
2179                  GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2180                                             GNUNET_YES, NULL, NULL));
2181   GCC_check_connections ();
2182   return GNUNET_OK;
2183 }
2184
2185
2186 /**
2187  * Core handler for notifications of broken connections.
2188  *
2189  * @param cls Closure (unused).
2190  * @param id Peer identity of sending neighbor.
2191  * @param message Message.
2192  * @return #GNUNET_OK to keep the connection open,
2193  *         #GNUNET_SYSERR to close it (signal serious error)
2194  */
2195 int
2196 GCC_handle_broken (void* cls,
2197                    const struct GNUNET_PeerIdentity* id,
2198                    const struct GNUNET_MessageHeader* message)
2199 {
2200   struct GNUNET_CADET_ConnectionBroken *msg;
2201   struct CadetConnection *c;
2202   struct CadetTunnel *t;
2203   int pending;
2204   int fwd;
2205
2206   GCC_check_connections ();
2207   msg = (struct GNUNET_CADET_ConnectionBroken *) message;
2208   log_message (message, id, &msg->cid);
2209   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
2210               GNUNET_i2s (&msg->peer1));
2211   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
2212               GNUNET_i2s (&msg->peer2));
2213   c = connection_get (&msg->cid);
2214   if (NULL == c)
2215   {
2216     LOG (GNUNET_ERROR_TYPE_DEBUG, "  duplicate CONNECTION_BROKEN\n");
2217     GCC_check_connections ();
2218     return GNUNET_OK;
2219   }
2220
2221   t = c->t;
2222
2223   fwd = is_fwd (c, id);
2224   c->destroy = GNUNET_YES;
2225   if (GCC_is_terminal (c, fwd))
2226   {
2227     struct CadetPeer *endpoint;
2228
2229     if (NULL == t)
2230     {
2231       /* A terminal connection should not have 't' set to NULL. */
2232       GNUNET_break (0);
2233       GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
2234       return GNUNET_OK;
2235     }
2236     endpoint = GCP_get_short (c->path->peers[c->path->length - 1], GNUNET_YES);
2237     if (2 < c->path->length)
2238       path_invalidate (c->path);
2239     GCP_notify_broken_link (endpoint, &msg->peer1, &msg->peer2);
2240
2241     c->state = CADET_CONNECTION_BROKEN;
2242     GCT_remove_connection (t, c);
2243     c->t = NULL;
2244
2245     pending = c->pending_messages;
2246     if (0 < pending)
2247       resend_messages_and_destroy (c, !fwd);
2248     else
2249       GCC_destroy (c);
2250   }
2251   else
2252   {
2253     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2254                                                       GNUNET_YES, NULL, NULL));
2255     connection_cancel_queues (c, !fwd);
2256   }
2257   GCC_check_connections ();
2258   return GNUNET_OK;
2259 }
2260
2261
2262 /**
2263  * Core handler for tunnel destruction
2264  *
2265  * @param cls Closure (unused).
2266  * @param peer Peer identity of sending neighbor.
2267  * @param message Message.
2268  * @return #GNUNET_OK to keep the connection open,
2269  *         #GNUNET_SYSERR to close it (signal serious error)
2270  */
2271 int
2272 GCC_handle_destroy (void *cls,
2273                     const struct GNUNET_PeerIdentity *peer,
2274                     const struct GNUNET_MessageHeader *message)
2275 {
2276   const struct GNUNET_CADET_ConnectionDestroy *msg;
2277   struct CadetConnection *c;
2278   int fwd;
2279
2280   GCC_check_connections ();
2281   msg = (const struct GNUNET_CADET_ConnectionDestroy *) message;
2282   log_message (message, peer, &msg->cid);
2283   c = connection_get (&msg->cid);
2284   if (NULL == c)
2285   {
2286     /* Probably already got the message from another path,
2287      * destroyed the tunnel and retransmitted to children.
2288      * Safe to ignore.
2289      */
2290     GNUNET_STATISTICS_update (stats,
2291                               "# control on unknown connection",
2292                               1, GNUNET_NO);
2293     LOG (GNUNET_ERROR_TYPE_DEBUG,
2294          "  connection unknown: already destroyed?\n");
2295     GCC_check_connections ();
2296     return GNUNET_OK;
2297   }
2298   fwd = is_fwd (c, peer);
2299   if (GNUNET_SYSERR == fwd)
2300   {
2301     GNUNET_break_op (0); /* FIXME */
2302     return GNUNET_OK;
2303   }
2304   if (GNUNET_NO == GCC_is_terminal (c, fwd))
2305   {
2306     GNUNET_assert (NULL ==
2307                    GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2308                                               GNUNET_YES, NULL, NULL));
2309   }
2310   else if (0 == c->pending_messages)
2311   {
2312     LOG (GNUNET_ERROR_TYPE_DEBUG,
2313          "  directly destroying connection!\n");
2314     GCC_destroy (c);
2315     GCC_check_connections ();
2316     return GNUNET_OK;
2317   }
2318   c->destroy = GNUNET_YES;
2319   c->state = CADET_CONNECTION_DESTROYED;
2320   if (NULL != c->t)
2321   {
2322     GCT_remove_connection (c->t, c);
2323     c->t = NULL;
2324   }
2325   GCC_check_connections ();
2326   return GNUNET_OK;
2327 }
2328
2329
2330 /**
2331  * Check the message against internal state and test if it goes FWD or BCK.
2332  *
2333  * Updates the PID, state and timeout values for the connection.
2334  *
2335  * @param message Message to check. It must belong to an existing connection.
2336  * @param minimum_size The message cannot be smaller than this value.
2337  * @param cid Connection ID (even if @a c is NULL, the ID is still needed).
2338  * @param c Connection this message should belong. If NULL, check fails.
2339  * @param neighbor Neighbor that sent the message.
2340  */
2341 static int
2342 check_message (const struct GNUNET_MessageHeader *message,
2343                size_t minimum_size,
2344                const struct GNUNET_CADET_Hash* cid,
2345                struct CadetConnection *c,
2346                const struct GNUNET_PeerIdentity *neighbor,
2347                uint32_t pid)
2348 {
2349   GNUNET_PEER_Id neighbor_id;
2350   struct CadetFlowControl *fc;
2351   struct CadetPeer *hop;
2352   int fwd;
2353   uint16_t type;
2354
2355   /* Check size */
2356   if (ntohs (message->size) < minimum_size)
2357   {
2358     GNUNET_break_op (0);
2359     LOG (GNUNET_ERROR_TYPE_WARNING, "Size %u < %u\n",
2360          ntohs (message->size), minimum_size);
2361     return GNUNET_SYSERR;
2362   }
2363
2364   /* Check connection */
2365   if (NULL == c)
2366   {
2367     GNUNET_STATISTICS_update (stats,
2368                               "# unknown connection",
2369                               1, GNUNET_NO);
2370     LOG (GNUNET_ERROR_TYPE_DEBUG,
2371          "%s on unknown connection %s\n",
2372          GC_m2s (ntohs (message->type)),
2373          GNUNET_h2s (GC_h2hc (cid)));
2374     send_broken_unknown (cid,
2375                          &my_full_id,
2376                          NULL,
2377                          neighbor);
2378     return GNUNET_SYSERR;
2379   }
2380
2381   /* Check if origin is as expected */
2382   neighbor_id = GNUNET_PEER_search (neighbor);
2383   hop = get_prev_hop (c);
2384   if (neighbor_id == GCP_get_short_id (hop))
2385   {
2386     fwd = GNUNET_YES;
2387   }
2388   else
2389   {
2390     hop = get_next_hop (c);
2391     GNUNET_break (hop == c->next_peer);
2392     if (neighbor_id == GCP_get_short_id (hop))
2393     {
2394       fwd = GNUNET_NO;
2395     }
2396     else
2397     {
2398       /* Unexpected peer sending traffic on a connection. */
2399       GNUNET_break_op (0);
2400       return GNUNET_SYSERR;
2401     }
2402   }
2403
2404   /* Check PID for payload messages */
2405   type = ntohs (message->type);
2406   if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == type
2407       || GNUNET_MESSAGE_TYPE_CADET_AX == type)
2408   {
2409     fc = fwd ? &c->bck_fc : &c->fwd_fc;
2410     LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u (expected %u - %u)\n",
2411          pid, fc->last_pid_recv + 1, fc->last_ack_sent);
2412     if (GC_is_pid_bigger (pid, fc->last_ack_sent))
2413     {
2414       GNUNET_break_op (0);
2415       GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
2416       LOG (GNUNET_ERROR_TYPE_WARNING, "Received PID %u, (prev %u), ACK %u\n",
2417           pid, fc->last_pid_recv, fc->last_ack_sent);
2418       return GNUNET_SYSERR;
2419     }
2420     if (GC_is_pid_bigger (pid, fc->last_pid_recv))
2421     {
2422       unsigned int delta;
2423
2424       delta = pid - fc->last_pid_recv;
2425       fc->last_pid_recv = pid;
2426       fc->recv_bitmap <<= delta;
2427       fc->recv_bitmap |= 1;
2428     }
2429     else
2430     {
2431       GNUNET_STATISTICS_update (stats, "# out of order PID", 1, GNUNET_NO);
2432       if (GNUNET_NO == is_ooo_ok (fc->last_pid_recv, pid, fc->recv_bitmap))
2433       {
2434         LOG (GNUNET_ERROR_TYPE_WARNING, "PID %u unexpected (%u+), dropping!\n",
2435              pid, fc->last_pid_recv - 31);
2436         return GNUNET_SYSERR;
2437       }
2438       fc->recv_bitmap |= get_recv_bitmask (fc->last_pid_recv, pid);
2439     }
2440   }
2441
2442   /* Count as connection confirmation. */
2443   if (CADET_CONNECTION_SENT == c->state || CADET_CONNECTION_ACK == c->state)
2444   {
2445     connection_change_state (c, CADET_CONNECTION_READY);
2446     if (NULL != c->t)
2447     {
2448       if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2449         GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2450     }
2451   }
2452   connection_reset_timeout (c, fwd);
2453
2454   return fwd;
2455 }
2456
2457
2458 /**
2459  * Generic handler for cadet network encrypted traffic.
2460  *
2461  * @param peer Peer identity this notification is about.
2462  * @param msg Encrypted message.
2463  * @return #GNUNET_OK to keep the connection open,
2464  *         #GNUNET_SYSERR to close it (signal serious error)
2465  */
2466 static int
2467 handle_cadet_encrypted (const struct GNUNET_PeerIdentity *peer,
2468                         const struct GNUNET_MessageHeader *message)
2469 {
2470   const struct GNUNET_CADET_Encrypted *otr_msg;
2471   const struct GNUNET_CADET_AX *ax_msg;
2472   const struct GNUNET_CADET_Hash* cid;
2473   struct CadetConnection *c;
2474   size_t minumum_size;
2475   size_t overhead;
2476   uint32_t pid;
2477   uint32_t ttl;
2478   int fwd;
2479
2480   GCC_check_connections ();
2481   if (GNUNET_MESSAGE_TYPE_CADET_AX == ntohs (message->type))
2482   {
2483     overhead = sizeof (struct GNUNET_CADET_AX);
2484     ax_msg = (const struct GNUNET_CADET_AX *) message;
2485     cid = &ax_msg->cid;
2486     pid = ntohl (ax_msg->pid);
2487     otr_msg = NULL;
2488   }
2489   else
2490   {
2491     overhead = sizeof (struct GNUNET_CADET_Encrypted);
2492     otr_msg = (const struct GNUNET_CADET_Encrypted *) message;
2493     cid = &otr_msg->cid;
2494     pid = ntohl (otr_msg->pid);
2495   }
2496
2497   log_message (message, peer, cid);
2498
2499   minumum_size = sizeof (struct GNUNET_MessageHeader) + overhead;
2500   c = connection_get (cid);
2501   fwd = check_message (message,
2502                        minumum_size,
2503                        cid,
2504                        c,
2505                        peer,
2506                        pid);
2507
2508   /* If something went wrong, discard message. */
2509   if (GNUNET_SYSERR == fwd)
2510   {
2511     GCC_check_connections ();
2512     return GNUNET_OK;
2513   }
2514
2515   /* Is this message for us? */
2516   if (GCC_is_terminal (c, fwd))
2517   {
2518     GNUNET_STATISTICS_update (stats, "# received encrypted", 1, GNUNET_NO);
2519
2520     if (NULL == c->t)
2521     {
2522       GNUNET_break (GNUNET_NO != c->destroy);
2523       return GNUNET_OK;
2524     }
2525     GCT_handle_encrypted (c->t, message);
2526     GCC_send_ack (c, fwd, GNUNET_NO);
2527     GCC_check_connections ();
2528     return GNUNET_OK;
2529   }
2530
2531   /* Message not for us: forward to next hop */
2532   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2533   if (NULL != otr_msg) /* only otr has ttl */
2534   {
2535     ttl = ntohl (otr_msg->ttl);
2536     LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
2537     if (ttl == 0)
2538     {
2539       GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
2540       LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
2541       GCC_send_ack (c, fwd, GNUNET_NO);
2542       GCC_check_connections ();
2543       return GNUNET_OK;
2544     }
2545   }
2546
2547   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2548   GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2549                                                     GNUNET_NO, NULL, NULL));
2550   GCC_check_connections ();
2551   return GNUNET_OK;
2552 }
2553
2554
2555 /**
2556  * Generic handler for cadet network encrypted traffic.
2557  *
2558  * @param peer Peer identity this notification is about.
2559  * @param msg Encrypted message.
2560  * @return #GNUNET_OK to keep the connection open,
2561  *         #GNUNET_SYSERR to close it (signal serious error)
2562  */
2563 static int
2564 handle_cadet_kx (const struct GNUNET_PeerIdentity *peer,
2565                  const struct GNUNET_CADET_KX *msg)
2566 {
2567   const struct GNUNET_CADET_Hash* cid;
2568   struct CadetConnection *c;
2569   size_t expected_size;
2570   int fwd;
2571
2572   GCC_check_connections ();
2573   cid = &msg->cid;
2574   log_message (&msg->header, peer, cid);
2575
2576   expected_size = sizeof (struct GNUNET_CADET_KX)
2577                   + sizeof (struct GNUNET_MessageHeader);
2578   c = connection_get (cid);
2579   fwd = check_message (&msg->header,
2580                        expected_size,
2581                        cid,
2582                        c,
2583                        peer,
2584                        0);
2585
2586   /* If something went wrong, discard message. */
2587   if (GNUNET_SYSERR == fwd)
2588     return GNUNET_OK;
2589
2590   /* Is this message for us? */
2591   if (GCC_is_terminal (c, fwd))
2592   {
2593     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2594     GNUNET_STATISTICS_update (stats, "# received KX", 1, GNUNET_NO);
2595     if (NULL == c->t)
2596     {
2597       GNUNET_break (0);
2598       return GNUNET_OK;
2599     }
2600     GCT_handle_kx (c->t, &msg[1].header);
2601     GCC_check_connections ();
2602     return GNUNET_OK;
2603   }
2604
2605   /* Message not for us: forward to next hop */
2606   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2607   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2608   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
2609                                                     GNUNET_NO, NULL, NULL));
2610   GCC_check_connections ();
2611   return GNUNET_OK;
2612 }
2613
2614
2615 /**
2616  * Core handler for key exchange traffic (ephemeral key, ping, pong).
2617  *
2618  * @param cls Closure (unused).
2619  * @param message Message received.
2620  * @param peer Peer who sent the message.
2621  * @return #GNUNET_OK to keep the connection open,
2622  *         #GNUNET_SYSERR to close it (signal serious error)
2623  */
2624 int
2625 GCC_handle_kx (void *cls,
2626                const struct GNUNET_PeerIdentity *peer,
2627                const struct GNUNET_MessageHeader *message)
2628 {
2629   GCC_check_connections ();
2630   return handle_cadet_kx (peer, (struct GNUNET_CADET_KX *) message);
2631 }
2632
2633
2634 /**
2635  * Core handler for encrypted cadet network traffic (channel mgmt, data).
2636  *
2637  * @param cls Closure (unused).
2638  * @param message Message received.
2639  * @param peer Peer who sent the message.
2640  * @return #GNUNET_OK to keep the connection open,
2641  *         #GNUNET_SYSERR to close it (signal serious error)
2642  */
2643 int
2644 GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
2645                       const struct GNUNET_MessageHeader *message)
2646 {
2647   GCC_check_connections ();
2648   return handle_cadet_encrypted (peer, message);
2649 }
2650
2651
2652 /**
2653  * Core handler for cadet network traffic point-to-point acks.
2654  *
2655  * @param cls closure
2656  * @param message message
2657  * @param peer peer identity this notification is about
2658  * @return #GNUNET_OK to keep the connection open,
2659  *         #GNUNET_SYSERR to close it (signal serious error)
2660  */
2661 int
2662 GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
2663                 const struct GNUNET_MessageHeader *message)
2664 {
2665   struct GNUNET_CADET_ACK *msg;
2666   struct CadetConnection *c;
2667   struct CadetFlowControl *fc;
2668   GNUNET_PEER_Id id;
2669   uint32_t ack;
2670   int fwd;
2671
2672   GCC_check_connections ();
2673   msg = (struct GNUNET_CADET_ACK *) message;
2674   log_message (message, peer, &msg->cid);
2675   c = connection_get (&msg->cid);
2676   if (NULL == c)
2677   {
2678     GNUNET_STATISTICS_update (stats,
2679                               "# ack on unknown connection",
2680                               1,
2681                               GNUNET_NO);
2682     send_broken_unknown (&msg->cid,
2683                          &my_full_id,
2684                          NULL,
2685                          peer);
2686     GCC_check_connections ();
2687     return GNUNET_OK;
2688   }
2689
2690   /* Is this a forward or backward ACK? */
2691   id = GNUNET_PEER_search (peer);
2692   if (GCP_get_short_id (get_next_hop (c)) == id)
2693   {
2694     fc = &c->fwd_fc;
2695     fwd = GNUNET_YES;
2696   }
2697   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2698   {
2699     fc = &c->bck_fc;
2700     fwd = GNUNET_NO;
2701   }
2702   else
2703   {
2704     GNUNET_break_op (0);
2705     return GNUNET_OK;
2706   }
2707
2708   ack = ntohl (msg->ack);
2709   LOG (GNUNET_ERROR_TYPE_DEBUG, " %s ACK %u (was %u)\n",
2710        GC_f2s (fwd), ack, fc->last_ack_recv);
2711   if (GC_is_pid_bigger (ack, fc->last_ack_recv))
2712     fc->last_ack_recv = ack;
2713
2714   /* Cancel polling if the ACK is big enough. */
2715   if (NULL != fc->poll_task &&
2716       GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2717   {
2718     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
2719     GNUNET_SCHEDULER_cancel (fc->poll_task);
2720     fc->poll_task = NULL;
2721     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
2722   }
2723
2724   connection_unlock_queue (c, fwd);
2725   GCC_check_connections ();
2726   return GNUNET_OK;
2727 }
2728
2729
2730 /**
2731  * Core handler for cadet network traffic point-to-point ack polls.
2732  *
2733  * @param cls closure
2734  * @param message message
2735  * @param peer peer identity this notification is about
2736  * @return #GNUNET_OK to keep the connection open,
2737  *         #GNUNET_SYSERR to close it (signal serious error)
2738  */
2739 int
2740 GCC_handle_poll (void *cls,
2741                  const struct GNUNET_PeerIdentity *peer,
2742                  const struct GNUNET_MessageHeader *message)
2743 {
2744   struct GNUNET_CADET_Poll *msg;
2745   struct CadetConnection *c;
2746   struct CadetFlowControl *fc;
2747   GNUNET_PEER_Id id;
2748   uint32_t pid;
2749   int fwd;
2750
2751   GCC_check_connections ();
2752   msg = (struct GNUNET_CADET_Poll *) message;
2753   log_message (message, peer, &msg->cid);
2754   c = connection_get (&msg->cid);
2755   if (NULL == c)
2756   {
2757     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
2758                               GNUNET_NO);
2759     LOG (GNUNET_ERROR_TYPE_DEBUG,
2760          "POLL message on unknown connection %s!\n",
2761          GNUNET_h2s (GC_h2hc (&msg->cid)));
2762     send_broken_unknown (&msg->cid,
2763                          &my_full_id,
2764                          NULL,
2765                          peer);
2766     GCC_check_connections ();
2767     return GNUNET_OK;
2768   }
2769
2770   /* Is this a forward or backward ACK?
2771    * Note: a poll should never be needed in a loopback case,
2772    * since there is no possiblility of packet loss there, so
2773    * this way of discerining FWD/BCK should not be a problem.
2774    */
2775   id = GNUNET_PEER_search (peer);
2776   if (GCP_get_short_id (get_next_hop (c)) == id)
2777   {
2778     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
2779     fc = &c->fwd_fc;
2780   }
2781   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2782   {
2783     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
2784     fc = &c->bck_fc;
2785   }
2786   else
2787   {
2788     GNUNET_break_op (0);
2789     return GNUNET_OK;
2790   }
2791
2792   pid = ntohl (msg->pid);
2793   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
2794   fc->last_pid_recv = pid;
2795   fwd = fc == &c->bck_fc;
2796   GCC_send_ack (c, fwd, GNUNET_YES);
2797   GCC_check_connections ();
2798
2799   return GNUNET_OK;
2800 }
2801
2802
2803 /**
2804  * Send an ACK on the appropriate connection/channel, depending on
2805  * the direction and the position of the peer.
2806  *
2807  * @param c Which connection to send the hop-by-hop ACK.
2808  * @param fwd Is this a fwd ACK? (will go dest->root).
2809  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
2810  */
2811 void
2812 GCC_send_ack (struct CadetConnection *c, int fwd, int force)
2813 {
2814   unsigned int buffer;
2815
2816   GCC_check_connections ();
2817   LOG (GNUNET_ERROR_TYPE_DEBUG, "GCC send %s ACK on %s\n",
2818        GC_f2s (fwd), GCC_2s (c));
2819
2820   if (NULL == c)
2821   {
2822     GNUNET_break (0);
2823     return;
2824   }
2825
2826   if (GNUNET_NO != c->destroy)
2827   {
2828     LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
2829     GCC_check_connections ();
2830     return;
2831   }
2832
2833   /* Get available buffer space */
2834   if (GCC_is_terminal (c, fwd))
2835   {
2836     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
2837     buffer = GCT_get_channels_buffer (c->t);
2838   }
2839   else
2840   {
2841     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
2842     buffer = GCC_get_buffer (c, fwd);
2843   }
2844   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
2845   if (0 == buffer && GNUNET_NO == force)
2846   {
2847     GCC_check_connections ();
2848     return;
2849   }
2850
2851   /* Send available buffer space */
2852   if (GCC_is_origin (c, fwd))
2853   {
2854     GNUNET_assert (NULL != c->t);
2855     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2856     GCT_unchoke_channels (c->t);
2857   }
2858   else
2859   {
2860     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2861     send_ack (c, buffer, fwd, force);
2862   }
2863   GCC_check_connections ();
2864 }
2865
2866
2867 /**
2868  * Initialize the connections subsystem
2869  *
2870  * @param c Configuration handle.
2871  */
2872 void
2873 GCC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2874 {
2875   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2876   if (GNUNET_OK !=
2877       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_MSGS_QUEUE",
2878                                              &max_msgs_queue))
2879   {
2880     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2881                                "CADET", "MAX_MSGS_QUEUE", "MISSING");
2882     GNUNET_SCHEDULER_shutdown ();
2883     return;
2884   }
2885
2886   if (GNUNET_OK !=
2887       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_CONNECTIONS",
2888                                              &max_connections))
2889   {
2890     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2891                                "CADET", "MAX_CONNECTIONS", "MISSING");
2892     GNUNET_SCHEDULER_shutdown ();
2893     return;
2894   }
2895
2896   if (GNUNET_OK !=
2897       GNUNET_CONFIGURATION_get_value_time (c, "CADET", "REFRESH_CONNECTION_TIME",
2898                                            &refresh_connection_time))
2899   {
2900     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2901                                "CADET", "REFRESH_CONNECTION_TIME", "MISSING");
2902     GNUNET_SCHEDULER_shutdown ();
2903     return;
2904   }
2905   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2906   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
2907 }
2908
2909
2910 /**
2911  * Destroy each connection on shutdown.
2912  *
2913  * @param cls Closure (unused).
2914  * @param key Current key code (CID, unused).
2915  * @param value Value in the hash map (`struct CadetConnection`)
2916  *
2917  * @return #GNUNET_YES, because we should continue to iterate
2918  */
2919 static int
2920 shutdown_iterator (void *cls,
2921                    const struct GNUNET_HashCode *key,
2922                    void *value)
2923 {
2924   struct CadetConnection *c = value;
2925
2926   c->state = CADET_CONNECTION_DESTROYED;
2927   GCC_destroy (c);
2928   return GNUNET_YES;
2929 }
2930
2931
2932 /**
2933  * Shut down the connections subsystem.
2934  */
2935 void
2936 GCC_shutdown (void)
2937 {
2938   GCC_check_connections ();
2939   GNUNET_CONTAINER_multihashmap_iterate (connections,
2940                                          &shutdown_iterator,
2941                                          NULL);
2942   GNUNET_CONTAINER_multihashmap_destroy (connections);
2943   connections = NULL;
2944 }
2945
2946
2947 /**
2948  * Create a connection.
2949  *
2950  * @param cid Connection ID (either created locally or imposed remotely).
2951  * @param t Tunnel this connection belongs to (or NULL);
2952  * @param path Path this connection has to use (copy is made).
2953  * @param own_pos Own position in the @c path path.
2954  *
2955  * @return Newly created connection, NULL in case of error (own id not in path).
2956  */
2957 struct CadetConnection *
2958 GCC_new (const struct GNUNET_CADET_Hash *cid,
2959          struct CadetTunnel *t,
2960          struct CadetPeerPath *path,
2961          unsigned int own_pos)
2962 {
2963   struct CadetConnection *c;
2964   struct CadetPeerPath *p;
2965
2966   GCC_check_connections ();
2967   p = path_duplicate (path);
2968   GNUNET_assert (NULL != p);
2969   c = GNUNET_new (struct CadetConnection);
2970   c->id = *cid;
2971   GNUNET_assert (GNUNET_OK ==
2972                  GNUNET_CONTAINER_multihashmap_put (connections,
2973                                                     GCC_get_h (c), c,
2974                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2975   fc_init (&c->fwd_fc);
2976   fc_init (&c->bck_fc);
2977   c->fwd_fc.c = c;
2978   c->bck_fc.c = c;
2979
2980   c->t = t;
2981   GNUNET_assert (own_pos <= p->length - 1);
2982   c->own_pos = own_pos;
2983   c->path = p;
2984   p->c = c;
2985   if (GNUNET_OK != register_neighbors (c))
2986   {
2987     if (0 == own_pos)
2988     {
2989       /* We were the origin of this request, this means we have invalid
2990        * info about the paths to reach the destination. We must invalidate
2991        * the *original* path to avoid trying it again in the next minute.
2992        */
2993       path_invalidate (path);
2994       c->t = NULL;
2995     }
2996     path_destroy (c->path);
2997     c->path = NULL;
2998     GCC_destroy (c);
2999     return NULL;
3000   }
3001   LOG (GNUNET_ERROR_TYPE_INFO, "New connection %s\n", GCC_2s (c));
3002   GCC_check_connections ();
3003   return c;
3004 }
3005
3006
3007 void
3008 GCC_destroy (struct CadetConnection *c)
3009 {
3010   GCC_check_connections ();
3011   if (NULL == c)
3012   {
3013     GNUNET_break (0);
3014     return;
3015   }
3016
3017   if (2 == c->destroy) /* cancel queues -> GCP_queue_cancel -> q_destroy -> */
3018     return;            /* -> message_sent -> GCC_destroy. Don't loop. */
3019   c->destroy = 2;
3020
3021   LOG (GNUNET_ERROR_TYPE_DEBUG,
3022        "destroying connection %s\n",
3023        GCC_2s (c));
3024   LOG (GNUNET_ERROR_TYPE_DEBUG,
3025        " fc's f: %p, b: %p\n",
3026        &c->fwd_fc, &c->bck_fc);
3027   LOG (GNUNET_ERROR_TYPE_DEBUG,
3028        " fc tasks f: %u, b: %u\n",
3029        c->fwd_fc.poll_task,
3030        c->bck_fc.poll_task);
3031
3032   /* Cancel all traffic */
3033   if (NULL != c->path)
3034   {
3035     connection_cancel_queues (c, GNUNET_YES);
3036     connection_cancel_queues (c, GNUNET_NO);
3037   }
3038   unregister_neighbors (c);
3039   path_destroy (c->path);
3040   c->path = NULL;
3041
3042   /* Cancel maintainance task (keepalive/timeout) */
3043   if (NULL != c->fwd_fc.poll_msg)
3044   {
3045     GCC_cancel (c->fwd_fc.poll_msg);
3046     LOG (GNUNET_ERROR_TYPE_DEBUG,
3047          " POLL msg FWD canceled\n");
3048   }
3049   if (NULL != c->bck_fc.poll_msg)
3050   {
3051     GCC_cancel (c->bck_fc.poll_msg);
3052     LOG (GNUNET_ERROR_TYPE_DEBUG,
3053          " POLL msg BCK canceled\n");
3054   }
3055
3056   /* Delete from tunnel */
3057   if (NULL != c->t)
3058     GCT_remove_connection (c->t, c);
3059
3060   if (NULL != c->check_duplicates_task)
3061     GNUNET_SCHEDULER_cancel (c->check_duplicates_task);
3062   if (NULL != c->fwd_maintenance_task)
3063     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
3064   if (NULL != c->bck_maintenance_task)
3065     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
3066   if (NULL != c->fwd_fc.poll_task)
3067   {
3068     GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
3069     LOG (GNUNET_ERROR_TYPE_DEBUG, " POLL task FWD canceled\n");
3070   }
3071   if (NULL != c->bck_fc.poll_task)
3072   {
3073     GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
3074     LOG (GNUNET_ERROR_TYPE_DEBUG, " POLL task BCK canceled\n");
3075   }
3076   if (GNUNET_NO == c->was_removed)
3077   {
3078     GNUNET_break (GNUNET_YES ==
3079                   GNUNET_CONTAINER_multihashmap_remove (connections,
3080                                                         GCC_get_h (c),
3081                                                         c));
3082   }
3083   GNUNET_STATISTICS_update (stats,
3084                             "# connections",
3085                             -1,
3086                             GNUNET_NO);
3087   GNUNET_free (c);
3088   GCC_check_connections ();
3089 }
3090
3091
3092 /**
3093  * Get the connection ID.
3094  *
3095  * @param c Connection to get the ID from.
3096  *
3097  * @return ID of the connection.
3098  */
3099 const struct GNUNET_CADET_Hash *
3100 GCC_get_id (const struct CadetConnection *c)
3101 {
3102   return &c->id;
3103 }
3104
3105
3106 /**
3107  * Get the connection ID.
3108  *
3109  * @param c Connection to get the ID from.
3110  *
3111  * @return ID of the connection.
3112  */
3113 const struct GNUNET_HashCode *
3114 GCC_get_h (const struct CadetConnection *c)
3115 {
3116   return GC_h2hc (&c->id);
3117 }
3118
3119
3120 /**
3121  * Get the connection path.
3122  *
3123  * @param c Connection to get the path from.
3124  *
3125  * @return path used by the connection.
3126  */
3127 const struct CadetPeerPath *
3128 GCC_get_path (const struct CadetConnection *c)
3129 {
3130   if (GNUNET_NO == c->destroy)
3131     return c->path;
3132   return NULL;
3133 }
3134
3135
3136 /**
3137  * Get the connection state.
3138  *
3139  * @param c Connection to get the state from.
3140  *
3141  * @return state of the connection.
3142  */
3143 enum CadetConnectionState
3144 GCC_get_state (const struct CadetConnection *c)
3145 {
3146   return c->state;
3147 }
3148
3149 /**
3150  * Get the connection tunnel.
3151  *
3152  * @param c Connection to get the tunnel from.
3153  *
3154  * @return tunnel of the connection.
3155  */
3156 struct CadetTunnel *
3157 GCC_get_tunnel (const struct CadetConnection *c)
3158 {
3159   return c->t;
3160 }
3161
3162
3163 /**
3164  * Get free buffer space in a connection.
3165  *
3166  * @param c Connection.
3167  * @param fwd Is query about FWD traffic?
3168  *
3169  * @return Free buffer space [0 - max_msgs_queue/max_connections]
3170  */
3171 unsigned int
3172 GCC_get_buffer (struct CadetConnection *c, int fwd)
3173 {
3174   struct CadetFlowControl *fc;
3175
3176   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3177
3178   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Get %s buffer on %s: %u - %u\n",
3179        GC_f2s (fwd), GCC_2s (c), fc->queue_max, fc->queue_n);
3180   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
3181
3182   return (fc->queue_max - fc->queue_n);
3183 }
3184
3185
3186 /**
3187  * Get how many messages have we allowed to send to us from a direction.
3188  *
3189  * @param c Connection.
3190  * @param fwd Are we asking about traffic from FWD (BCK messages)?
3191  *
3192  * @return last_ack_sent - last_pid_recv
3193  */
3194 unsigned int
3195 GCC_get_allowed (struct CadetConnection *c, int fwd)
3196 {
3197   struct CadetFlowControl *fc;
3198
3199   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3200   if (CADET_CONNECTION_READY != c->state
3201       || GC_is_pid_bigger (fc->last_pid_recv, fc->last_ack_sent))
3202   {
3203     return 0;
3204   }
3205   return (fc->last_ack_sent - fc->last_pid_recv);
3206 }
3207
3208
3209 /**
3210  * Get messages queued in a connection.
3211  *
3212  * @param c Connection.
3213  * @param fwd Is query about FWD traffic?
3214  *
3215  * @return Number of messages queued.
3216  */
3217 unsigned int
3218 GCC_get_qn (struct CadetConnection *c, int fwd)
3219 {
3220   struct CadetFlowControl *fc;
3221
3222   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3223
3224   return fc->queue_n;
3225 }
3226
3227
3228 /**
3229  * Get next PID to use.
3230  *
3231  * @param c Connection.
3232  * @param fwd Is query about FWD traffic?
3233  *
3234  * @return Last PID used + 1.
3235  */
3236 unsigned int
3237 GCC_get_pid (struct CadetConnection *c, int fwd)
3238 {
3239   struct CadetFlowControl *fc;
3240
3241   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3242
3243   return fc->last_pid_sent + 1;
3244 }
3245
3246
3247 /**
3248  * Allow the connection to advertise a buffer of the given size.
3249  *
3250  * The connection will send an @c fwd ACK message (so: in direction !fwd)
3251  * allowing up to last_pid_recv + buffer.
3252  *
3253  * @param c Connection.
3254  * @param buffer How many more messages the connection can accept.
3255  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
3256  */
3257 void
3258 GCC_allow (struct CadetConnection *c, unsigned int buffer, int fwd)
3259 {
3260   LOG (GNUNET_ERROR_TYPE_DEBUG, "  allowing %s %u messages %s\n",
3261        GCC_2s (c), buffer, GC_f2s (fwd));
3262   send_ack (c, buffer, fwd, GNUNET_NO);
3263 }
3264
3265
3266 /**
3267  * Notify other peers on a connection of a broken link. Mark connections
3268  * to destroy after all traffic has been sent.
3269  *
3270  * @param c Connection on which there has been a disconnection.
3271  * @param peer Peer that disconnected.
3272  */
3273 void
3274 GCC_neighbor_disconnected (struct CadetConnection *c, struct CadetPeer *peer)
3275 {
3276   struct CadetPeer *hop;
3277   char peer_name[16];
3278   int fwd;
3279
3280   GCC_check_connections ();
3281   strncpy (peer_name, GCP_2s (peer), 16);
3282   peer_name[15] = '\0';
3283   LOG (GNUNET_ERROR_TYPE_DEBUG,
3284        "shutting down %s, %s disconnected\n",
3285        GCC_2s (c), peer_name);
3286   hop = get_prev_hop (c);
3287   if (NULL == hop)
3288   {
3289     /* Path was NULL, we should have deleted the connection. */
3290     GNUNET_break (0);
3291     return;
3292   }
3293   fwd = (peer == hop);
3294   if ( (GNUNET_YES == GCC_is_terminal (c, fwd)) ||
3295        (GNUNET_NO != c->destroy) )
3296   {
3297     /* Local shutdown, or other peer already down (hence 'c->destroy');
3298        so there is no one to notify about this, just clean up. */
3299     GCC_destroy (c);
3300     GCC_check_connections ();
3301     return;
3302   }
3303   send_broken (c, &my_full_id, GCP_get_id (peer), fwd);
3304   /* Connection will have at least one pending message
3305    * (the one we just scheduled), so delay destruction
3306    * and remove from map so we don't use accidentally. */
3307   c->destroy = GNUNET_YES;
3308   c->state = CADET_CONNECTION_DESTROYED;
3309   GNUNET_assert (GNUNET_NO == c->was_removed);
3310   c->was_removed = GNUNET_YES;
3311   GNUNET_break (GNUNET_YES ==
3312                 GNUNET_CONTAINER_multihashmap_remove (connections,
3313                                                       GCC_get_h (c),
3314                                                       c));
3315   /* Cancel queue in the direction that just died. */
3316   connection_cancel_queues (c, ! fwd);
3317   GCC_stop_poll (c, ! fwd);
3318   unregister_neighbors (c);
3319   GCC_check_connections ();
3320 }
3321
3322
3323 /**
3324  * Is this peer the first one on the connection?
3325  *
3326  * @param c Connection.
3327  * @param fwd Is this about fwd traffic?
3328  *
3329  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
3330  */
3331 int
3332 GCC_is_origin (struct CadetConnection *c, int fwd)
3333 {
3334   if (!fwd && c->path->length - 1 == c->own_pos )
3335     return GNUNET_YES;
3336   if (fwd && 0 == c->own_pos)
3337     return GNUNET_YES;
3338   return GNUNET_NO;
3339 }
3340
3341
3342 /**
3343  * Is this peer the last one on the connection?
3344  *
3345  * @param c Connection.
3346  * @param fwd Is this about fwd traffic?
3347  *            Note that the ROOT is the terminal for BCK traffic!
3348  *
3349  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
3350  */
3351 int
3352 GCC_is_terminal (struct CadetConnection *c, int fwd)
3353 {
3354   return GCC_is_origin (c, ! fwd);
3355 }
3356
3357
3358 /**
3359  * See if we are allowed to send by the next hop in the given direction.
3360  *
3361  * @param c Connection.
3362  * @param fwd Is this about fwd traffic?
3363  *
3364  * @return #GNUNET_YES in case it's OK to send.
3365  */
3366 int
3367 GCC_is_sendable (struct CadetConnection *c, int fwd)
3368 {
3369   struct CadetFlowControl *fc;
3370
3371   LOG (GNUNET_ERROR_TYPE_DEBUG,
3372        " checking sendability of %s traffic on %s\n",
3373        GC_f2s (fwd), GCC_2s (c));
3374   if (NULL == c)
3375   {
3376     GNUNET_break (0);
3377     return GNUNET_YES;
3378   }
3379   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3380   LOG (GNUNET_ERROR_TYPE_DEBUG,
3381        " last ack recv: %u, last pid sent: %u\n",
3382        fc->last_ack_recv, fc->last_pid_sent);
3383   if (GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
3384   {
3385     LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable\n");
3386     return GNUNET_YES;
3387   }
3388   LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n");
3389   return GNUNET_NO;
3390 }
3391
3392
3393 /**
3394  * Check if this connection is a direct one (never trim a direct connection).
3395  *
3396  * @param c Connection.
3397  *
3398  * @return #GNUNET_YES in case it's a direct connection, #GNUNET_NO otherwise.
3399  */
3400 int
3401 GCC_is_direct (struct CadetConnection *c)
3402 {
3403   return (c->path->length == 2) ? GNUNET_YES : GNUNET_NO;
3404 }
3405
3406
3407 /**
3408  * Sends an already built message on a connection, properly registering
3409  * all used resources.
3410  *
3411  * @param message Message to send. Function makes a copy of it.
3412  *                If message is not hop-by-hop, decrements TTL of copy.
3413  * @param payload_type Type of payload, in case the message is encrypted.
3414  * @param c Connection on which this message is transmitted.
3415  * @param fwd Is this a fwd message?
3416  * @param force Force the connection to accept the message (buffer overfill).
3417  * @param cont Continuation called once message is sent. Can be NULL.
3418  * @param cont_cls Closure for @c cont.
3419  *
3420  * @return Handle to cancel the message before it's sent.
3421  *         NULL on error or if @c cont is NULL.
3422  *         Invalid on @c cont call.
3423  */
3424 struct CadetConnectionQueue *
3425 GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
3426                            uint16_t payload_type, uint32_t payload_id,
3427                            struct CadetConnection *c, int fwd, int force,
3428                            GCC_sent cont, void *cont_cls)
3429 {
3430   struct CadetFlowControl *fc;
3431   struct CadetConnectionQueue *q;
3432   void *data;
3433   size_t size;
3434   uint16_t type;
3435   int droppable;
3436
3437   GCC_check_connections ();
3438   size = ntohs (message->size);
3439   data = GNUNET_malloc (size);
3440   memcpy (data, message, size);
3441   type = ntohs (message->type);
3442   LOG (GNUNET_ERROR_TYPE_INFO,
3443        "--> %s (%s %4u) on conn %s (%p) %s (%u bytes)\n",
3444        GC_m2s (type), GC_m2s (payload_type), payload_id, GCC_2s (c), c,
3445        GC_f2s(fwd), size);
3446
3447   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3448   droppable = GNUNET_NO == force;
3449   switch (type)
3450   {
3451     struct GNUNET_CADET_AX        *axmsg;
3452     struct GNUNET_CADET_Encrypted *emsg;
3453     struct GNUNET_CADET_KX        *kmsg;
3454     struct GNUNET_CADET_ACK       *amsg;
3455     struct GNUNET_CADET_Poll      *pmsg;
3456     struct GNUNET_CADET_ConnectionDestroy *dmsg;
3457     struct GNUNET_CADET_ConnectionBroken  *bmsg;
3458     uint32_t ttl;
3459
3460     case GNUNET_MESSAGE_TYPE_CADET_AX:
3461     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
3462       if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == type)
3463       {
3464         emsg = (struct GNUNET_CADET_Encrypted *) data;
3465         ttl = ntohl (emsg->ttl);
3466         if (0 == ttl)
3467         {
3468           GNUNET_break_op (0);
3469           GNUNET_free (data);
3470           return NULL;
3471         }
3472         emsg->cid = c->id;
3473         emsg->ttl = htonl (ttl - 1);
3474       }
3475       else
3476       {
3477         axmsg = (struct GNUNET_CADET_AX *) data;
3478         axmsg->cid = c->id;
3479       }
3480       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
3481       LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
3482       LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
3483       if (GNUNET_YES == droppable)
3484       {
3485         fc->queue_n++;
3486       }
3487       else
3488       {
3489         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not droppable, Q_N stays the same\n");
3490       }
3491       break;
3492
3493     case GNUNET_MESSAGE_TYPE_CADET_KX:
3494       kmsg = (struct GNUNET_CADET_KX *) data;
3495       kmsg->cid = c->id;
3496       break;
3497
3498     case GNUNET_MESSAGE_TYPE_CADET_ACK:
3499       amsg = (struct GNUNET_CADET_ACK *) data;
3500       amsg->cid = c->id;
3501       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
3502       droppable = GNUNET_NO;
3503       break;
3504
3505     case GNUNET_MESSAGE_TYPE_CADET_POLL:
3506       pmsg = (struct GNUNET_CADET_Poll *) data;
3507       pmsg->cid = c->id;
3508       LOG (GNUNET_ERROR_TYPE_DEBUG, " POLL %u\n", ntohl (pmsg->pid));
3509       droppable = GNUNET_NO;
3510       break;
3511
3512     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
3513       dmsg = (struct GNUNET_CADET_ConnectionDestroy *) data;
3514       dmsg->cid = c->id;
3515       break;
3516
3517     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
3518       bmsg = (struct GNUNET_CADET_ConnectionBroken *) data;
3519       bmsg->cid = c->id;
3520       break;
3521
3522     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
3523     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
3524       break;
3525
3526     default:
3527       GNUNET_break (0);
3528       GNUNET_free (data);
3529       return NULL;
3530   }
3531
3532   if (fc->queue_n > fc->queue_max && droppable)
3533   {
3534     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
3535                               1, GNUNET_NO);
3536     GNUNET_break (0);
3537     LOG (GNUNET_ERROR_TYPE_DEBUG, "queue full: %u/%u\n",
3538          fc->queue_n, fc->queue_max);
3539     if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == type
3540         || GNUNET_MESSAGE_TYPE_CADET_AX == type)
3541     {
3542       fc->queue_n--;
3543     }
3544     GNUNET_free (data);
3545     return NULL; /* Drop this message */
3546   }
3547
3548   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %s %u\n",
3549        GCC_2s (c), c->pending_messages);
3550   c->pending_messages++;
3551
3552   q = GNUNET_new (struct CadetConnectionQueue);
3553   q->forced = !droppable;
3554   q->q = GCP_queue_add (get_hop (c, fwd), data, type, payload_type, payload_id,
3555                         size, c, fwd, &conn_message_sent, q);
3556   if (NULL == q->q)
3557   {
3558     LOG (GNUNET_ERROR_TYPE_DEBUG, "dropping msg on %s, NULL q\n", GCC_2s (c));
3559     GNUNET_free (data);
3560     GNUNET_free (q);
3561     GCC_check_connections ();
3562     return NULL;
3563   }
3564   q->cont = cont;
3565   q->cont_cls = cont_cls;
3566   GCC_check_connections ();
3567   return (NULL == cont) ? NULL : q;
3568 }
3569
3570
3571 /**
3572  * Cancel a previously sent message while it's in the queue.
3573  *
3574  * ONLY can be called before the continuation given to the send function
3575  * is called. Once the continuation is called, the message is no longer in the
3576  * queue.
3577  *
3578  * @param q Handle to the queue.
3579  */
3580 void
3581 GCC_cancel (struct CadetConnectionQueue *q)
3582 {
3583   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GCC cancel message\n");
3584
3585   /* queue destroy calls message_sent, which calls q->cont and frees q */
3586   GCP_queue_destroy (q->q, GNUNET_YES, GNUNET_NO, 0);
3587   GCC_check_connections ();
3588 }
3589
3590
3591 /**
3592  * Sends a CREATE CONNECTION message for a path to a peer.
3593  * Changes the connection and tunnel states if necessary.
3594  *
3595  * @param connection Connection to create.
3596  */
3597 void
3598 GCC_send_create (struct CadetConnection *connection)
3599 {
3600   enum CadetTunnelCState state;
3601   size_t size;
3602
3603   GCC_check_connections ();
3604   size = sizeof (struct GNUNET_CADET_ConnectionCreate);
3605   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
3606
3607   LOG (GNUNET_ERROR_TYPE_INFO, "--> %s on conn %s  (%u bytes)\n",
3608        GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE),
3609        GCC_2s (connection), size);
3610   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
3611        connection, connection->pending_messages);
3612   connection->pending_messages++;
3613
3614   connection->maintenance_q =
3615     GCP_queue_add (get_next_hop (connection), NULL,
3616                    GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0, 0,
3617                    size, connection, GNUNET_YES, &conn_message_sent, NULL);
3618
3619   state = GCT_get_cstate (connection->t);
3620   if (CADET_TUNNEL_SEARCHING == state || CADET_TUNNEL_NEW == state)
3621     GCT_change_cstate (connection->t, CADET_TUNNEL_WAITING);
3622   if (CADET_CONNECTION_NEW == connection->state)
3623     connection_change_state (connection, CADET_CONNECTION_SENT);
3624   GCC_check_connections ();
3625 }
3626
3627
3628 /**
3629  * Send a message to all peers in this connection that the connection
3630  * is no longer valid.
3631  *
3632  * If some peer should not receive the message, it should be zero'ed out
3633  * before calling this function.
3634  *
3635  * @param c The connection whose peers to notify.
3636  */
3637 void
3638 GCC_send_destroy (struct CadetConnection *c)
3639 {
3640   struct GNUNET_CADET_ConnectionDestroy msg;
3641
3642   if (GNUNET_YES == c->destroy)
3643     return;
3644   GCC_check_connections ();
3645   msg.header.size = htons (sizeof (msg));
3646   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
3647   msg.cid = c->id;
3648   LOG (GNUNET_ERROR_TYPE_DEBUG,
3649               "  sending connection destroy for connection %s\n",
3650               GCC_2s (c));
3651
3652   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_YES))
3653     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c,
3654                                                       GNUNET_YES, GNUNET_YES,
3655                                                       NULL, NULL));
3656   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_NO))
3657     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c,
3658                                                       GNUNET_NO, GNUNET_YES,
3659                                                       NULL, NULL));
3660   c->destroy = GNUNET_YES;
3661   c->state = CADET_CONNECTION_DESTROYED;
3662   GCC_check_connections ();
3663 }
3664
3665
3666 /**
3667  * @brief Start a polling timer for the connection.
3668  *
3669  * When a neighbor does not accept more traffic on the connection it could be
3670  * caused by a simple congestion or by a lost ACK. Polling enables to check
3671  * for the lastest ACK status for a connection.
3672  *
3673  * @param c Connection.
3674  * @param fwd Should we poll in the FWD direction?
3675  */
3676 void
3677 GCC_start_poll (struct CadetConnection *c, int fwd)
3678 {
3679   struct CadetFlowControl *fc;
3680
3681   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3682   LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL %s requested\n",
3683        GC_f2s (fwd));
3684   if (NULL != fc->poll_task || NULL != fc->poll_msg)
3685   {
3686     LOG (GNUNET_ERROR_TYPE_DEBUG, "  POLL not needed (%p, %p)\n",
3687          fc->poll_task, fc->poll_msg);
3688     return;
3689   }
3690   LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL started on request\n");
3691   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
3692                                                 &connection_poll,
3693                                                 fc);
3694 }
3695
3696
3697 /**
3698  * @brief Stop polling a connection for ACKs.
3699  *
3700  * Once we have enough ACKs for future traffic, polls are no longer necessary.
3701  *
3702  * @param c Connection.
3703  * @param fwd Should we stop the poll in the FWD direction?
3704  */
3705 void
3706 GCC_stop_poll (struct CadetConnection *c, int fwd)
3707 {
3708   struct CadetFlowControl *fc;
3709
3710   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3711   if (NULL != fc->poll_task)
3712   {
3713     GNUNET_SCHEDULER_cancel (fc->poll_task);
3714     fc->poll_task = NULL;
3715   }
3716 }
3717
3718
3719 /**
3720  * Get a (static) string for a connection.
3721  *
3722  * @param c Connection.
3723  */
3724 const char *
3725 GCC_2s (const struct CadetConnection *c)
3726 {
3727   if (NULL == c)
3728     return "NULL";
3729
3730   if (NULL != c->t)
3731   {
3732     static char buf[128];
3733
3734     SPRINTF (buf, "%s (->%s)",
3735              GNUNET_h2s (GC_h2hc (GCC_get_id (c))), GCT_2s (c->t));
3736     return buf;
3737   }
3738   return GNUNET_h2s (GC_h2hc (&c->id));
3739 }
3740
3741
3742 /**
3743  * Log all possible info about the connection state.
3744  *
3745  * @param c Connection to debug.
3746  * @param level Debug level to use.
3747  */
3748 void
3749 GCC_debug (const struct CadetConnection *c, enum GNUNET_ErrorType level)
3750 {
3751   int do_log;
3752   char *s;
3753
3754   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
3755                                        "cadet-con",
3756                                        __FILE__, __FUNCTION__, __LINE__);
3757   if (0 == do_log)
3758     return;
3759
3760   if (NULL == c)
3761   {
3762     LOG2 (level, "CCC DEBUG NULL CONNECTION\n");
3763     return;
3764   }
3765
3766   LOG2 (level, "CCC DEBUG CONNECTION %s\n", GCC_2s (c));
3767   s = path_2s (c->path);
3768   LOG2 (level, "CCC  path %s, own pos: %u\n", s, c->own_pos);
3769   GNUNET_free (s);
3770   LOG2 (level, "CCC  state: %s, destroy: %u\n",
3771         GCC_state2s (c->state), c->destroy);
3772   LOG2 (level, "CCC  pending messages: %u\n", c->pending_messages);
3773   if (NULL != c->perf)
3774     LOG2 (level, "CCC  us/byte: %f\n", c->perf->avg);
3775
3776   LOG2 (level, "CCC  FWD flow control:\n");
3777   LOG2 (level, "CCC   queue: %u/%u\n", c->fwd_fc.queue_n, c->fwd_fc.queue_max);
3778   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3779         c->fwd_fc.last_pid_sent, c->fwd_fc.last_pid_recv);
3780   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3781         c->fwd_fc.last_ack_sent, c->fwd_fc.last_ack_recv);
3782   LOG2 (level, "CCC   recv PID bitmap: %X\n", c->fwd_fc.recv_bitmap);
3783   LOG2 (level, "CCC   poll: task %d, msg  %p, msg_ack %p)\n",
3784         c->fwd_fc.poll_task, c->fwd_fc.poll_msg, c->fwd_fc.ack_msg);
3785
3786   LOG2 (level, "CCC  BCK flow control:\n");
3787   LOG2 (level, "CCC   queue: %u/%u\n", c->bck_fc.queue_n, c->bck_fc.queue_max);
3788   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3789         c->bck_fc.last_pid_sent, c->bck_fc.last_pid_recv);
3790   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3791         c->bck_fc.last_ack_sent, c->bck_fc.last_ack_recv);
3792   LOG2 (level, "CCC   recv PID bitmap: %X\n", c->bck_fc.recv_bitmap);
3793   LOG2 (level, "CCC   poll: task %d, msg  %p, msg_ack %p)\n",
3794         c->bck_fc.poll_task, c->bck_fc.poll_msg, c->bck_fc.ack_msg);
3795
3796   LOG2 (level, "CCC DEBUG CONNECTION END\n");
3797 }