- On a new EPHM, do a immediate rekey
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_connection.c
1 /*
2      This file is part of GNUnet.
3      (C) 2001-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file cadet/gnunet-service-cadet_connection.c
23  * @brief GNUnet CADET service connection handling
24  * @author Bartlomiej Polot
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30 #include "gnunet_statistics_service.h"
31
32 #include "cadet_path.h"
33 #include "cadet_protocol.h"
34 #include "cadet.h"
35 #include "gnunet-service-cadet_connection.h"
36 #include "gnunet-service-cadet_peer.h"
37 #include "gnunet-service-cadet_tunnel.h"
38
39
40 #define LOG(level, ...) GNUNET_log_from (level,"cadet-con",__VA_ARGS__)
41 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
42
43
44 #define CADET_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
45                                   GNUNET_TIME_UNIT_MINUTES,\
46                                   10)
47 #define AVG_MSGS                32
48
49
50 /******************************************************************************/
51 /********************************   STRUCTS  **********************************/
52 /******************************************************************************/
53
54 /**
55  * Struct to encapsulate all the Flow Control information to a peer to which
56  * we are directly connected (on a core level).
57  */
58 struct CadetFlowControl
59 {
60   /**
61    * Connection this controls.
62    */
63   struct CadetConnection *c;
64
65   /**
66    * How many messages are in the queue on this connection.
67    */
68   unsigned int queue_n;
69
70   /**
71    * How many messages do we accept in the queue.
72    */
73   unsigned int queue_max;
74
75   /**
76    * ID of the last packet sent towards the peer.
77    */
78   uint32_t last_pid_sent;
79
80   /**
81    * ID of the last packet received from the peer.
82    */
83   uint32_t last_pid_recv;
84
85   /**
86    * Last ACK sent to the peer (peer can't send more than this PID).
87    */
88   uint32_t last_ack_sent;
89
90   /**
91    * Last ACK sent towards the origin (for traffic towards leaf node).
92    */
93   uint32_t last_ack_recv;
94
95   /**
96    * Task to poll the peer in case of a lost ACK causes stall.
97    */
98   GNUNET_SCHEDULER_TaskIdentifier poll_task;
99
100   /**
101    * How frequently to poll for ACKs.
102    */
103   struct GNUNET_TIME_Relative poll_time;
104
105   /**
106    * Queued poll message, to cancel if not necessary anymore (got ACK).
107    */
108   struct CadetConnectionQueue *poll_msg;
109
110   /**
111    * Queued poll message, to cancel if not necessary anymore (got ACK).
112    */
113   struct CadetConnectionQueue *ack_msg;
114 };
115
116 /**
117  * Keep a record of the last messages sent on this connection.
118  */
119 struct CadetConnectionPerformance
120 {
121   /**
122    * Circular buffer for storing measurements.
123    */
124   double usecsperbyte[AVG_MSGS];
125
126   /**
127    * Running average of @c usecsperbyte.
128    */
129   double avg;
130
131   /**
132    * How many values of @c usecsperbyte are valid.
133    */
134   uint16_t size;
135
136   /**
137    * Index of the next "free" position in @c usecsperbyte.
138    */
139   uint16_t idx;
140 };
141
142
143 /**
144  * Struct containing all information regarding a connection to a peer.
145  */
146 struct CadetConnection
147 {
148   /**
149    * Tunnel this connection is part of.
150    */
151   struct CadetTunnel *t;
152
153   /**
154    * Flow control information for traffic fwd.
155    */
156   struct CadetFlowControl fwd_fc;
157
158   /**
159    * Flow control information for traffic bck.
160    */
161   struct CadetFlowControl bck_fc;
162
163   /**
164    * Measure connection performance on the endpoint.
165    */
166   struct CadetConnectionPerformance *perf;
167
168   /**
169    * ID of the connection.
170    */
171   struct GNUNET_CADET_Hash id;
172
173   /**
174    * State of the connection.
175    */
176   enum CadetConnectionState state;
177
178   /**
179    * Path being used for the tunnel. At the origin of the connection
180    * it's a pointer to the destination's path pool, otherwise just a copy.
181    */
182   struct CadetPeerPath *path;
183
184   /**
185    * Position of the local peer in the path.
186    */
187   unsigned int own_pos;
188
189   /**
190    * Task to keep the used paths alive at the owner,
191    * time tunnel out on all the other peers.
192    */
193   GNUNET_SCHEDULER_TaskIdentifier fwd_maintenance_task;
194
195   /**
196    * Task to keep the used paths alive at the destination,
197    * time tunnel out on all the other peers.
198    */
199   GNUNET_SCHEDULER_TaskIdentifier bck_maintenance_task;
200
201   /**
202    * Queue handle for maintainance traffic. One handle for FWD and BCK since
203    * one peer never needs to maintain both directions (no loopback connections).
204    */
205   struct CadetPeerQueue *maintenance_q;
206
207   /**
208    * Counter to do exponential backoff when creating a connection (max 64).
209    */
210   unsigned short create_retry;
211
212   /**
213    * Pending message count.
214    */
215   int pending_messages;
216
217   /**
218    * Destroy flag: if true, destroy on last message.
219    */
220   int destroy;
221 };
222
223 /**
224  * Handle for messages queued but not yet sent.
225  */
226 struct CadetConnectionQueue
227 {
228   /**
229    * Peer queue handle, to cancel if necessary.
230    */
231   struct CadetPeerQueue *q;
232
233   /**
234    * Was this a forced message? (Do not account for it)
235    */
236   int forced;
237
238   /**
239    * Continuation to call once sent.
240    */
241   GCC_sent cont;
242
243   /**
244    * Closure for @c cont.
245    */
246   void *cont_cls;
247 };
248
249 /******************************************************************************/
250 /*******************************   GLOBALS  ***********************************/
251 /******************************************************************************/
252
253 /**
254  * Global handle to the statistics service.
255  */
256 extern struct GNUNET_STATISTICS_Handle *stats;
257
258 /**
259  * Local peer own ID (memory efficient handle).
260  */
261 extern GNUNET_PEER_Id myid;
262
263 /**
264  * Local peer own ID (full value).
265  */
266 extern struct GNUNET_PeerIdentity my_full_id;
267
268 /**
269  * Connections known, indexed by cid (CadetConnection).
270  */
271 static struct GNUNET_CONTAINER_MultiHashMap *connections;
272
273 /**
274  * How many connections are we willing to maintain.
275  * Local connections are always allowed, even if there are more connections than max.
276  */
277 static unsigned long long max_connections;
278
279 /**
280  * How many messages *in total* are we willing to queue, divide by number of
281  * connections to get connection queue size.
282  */
283 static unsigned long long max_msgs_queue;
284
285 /**
286  * How often to send path keepalives. Paths timeout after 4 missed.
287  */
288 static struct GNUNET_TIME_Relative refresh_connection_time;
289
290 /**
291  * How often to send path create / ACKs.
292  */
293 static struct GNUNET_TIME_Relative create_connection_time;
294
295
296 /******************************************************************************/
297 /********************************   STATIC  ***********************************/
298 /******************************************************************************/
299
300 #if 0 // avoid compiler warning for unused static function
301 static void
302 fc_debug (struct CadetFlowControl *fc)
303 {
304   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
305               fc->last_pid_recv, fc->last_ack_sent);
306   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
307               fc->last_pid_sent, fc->last_ack_recv);
308   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
309               fc->queue_n, fc->queue_max);
310 }
311
312 static void
313 connection_debug (struct CadetConnection *c)
314 {
315   if (NULL == c)
316   {
317     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CONNECTION ***\n");
318     return;
319   }
320   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
321               peer2s (c->t->peer), GCC_2s (c));
322   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
323               c->state, c->pending_messages);
324   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
325   fc_debug (&c->fwd_fc);
326   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
327   fc_debug (&c->bck_fc);
328 }
329 #endif
330
331
332 /**
333  * Schedule next keepalive task, taking in consideration
334  * the connection state and number of retries.
335  *
336  * @param c Connection for which to schedule the next keepalive.
337  * @param fwd Direction for the next keepalive.
338  */
339 static void
340 schedule_next_keepalive (struct CadetConnection *c, int fwd);
341
342
343 /**
344  * Resets the connection timeout task, some other message has done the
345  * task's job.
346  * - For the first peer on the direction this means to send
347  *   a keepalive or a path confirmation message (either create or ACK).
348  * - For all other peers, this means to destroy the connection,
349  *   due to lack of activity.
350  * Starts the timeout if no timeout was running (connection just created).
351  *
352  * @param c Connection whose timeout to reset.
353  * @param fwd Is this forward?
354  */
355 static void
356 connection_reset_timeout (struct CadetConnection *c, int fwd);
357
358
359 /**
360  * Get string description for tunnel state. Reentrant.
361  *
362  * @param s Tunnel state.
363  *
364  * @return String representation.
365  */
366 static const char *
367 GCC_state2s (enum CadetConnectionState s)
368 {
369   switch (s)
370   {
371     case CADET_CONNECTION_NEW:
372       return "CADET_CONNECTION_NEW";
373     case CADET_CONNECTION_SENT:
374       return "CADET_CONNECTION_SENT";
375     case CADET_CONNECTION_ACK:
376       return "CADET_CONNECTION_ACK";
377     case CADET_CONNECTION_READY:
378       return "CADET_CONNECTION_READY";
379     case CADET_CONNECTION_DESTROYED:
380       return "CADET_CONNECTION_DESTROYED";
381     default:
382       GNUNET_break (0);
383       LOG (GNUNET_ERROR_TYPE_ERROR, " conn state %u unknown!\n", s);
384       return "CADET_CONNECTION_STATE_ERROR";
385   }
386 }
387
388
389 /**
390  * Initialize a Flow Control structure to the initial state.
391  *
392  * @param fc Flow Control structure to initialize.
393  */
394 static void
395 fc_init (struct CadetFlowControl *fc)
396 {
397   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
398   fc->last_pid_recv = (uint32_t) -1;
399   fc->last_ack_sent = (uint32_t) 0;
400   fc->last_ack_recv = (uint32_t) 0;
401   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
402   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
403   fc->queue_n = 0;
404   fc->queue_max = (max_msgs_queue / max_connections) + 1;
405 }
406
407
408 /**
409  * Find a connection.
410  *
411  * @param cid Connection ID.
412  */
413 static struct CadetConnection *
414 connection_get (const struct GNUNET_CADET_Hash *cid)
415 {
416   return GNUNET_CONTAINER_multihashmap_get (connections, GC_h2hc (cid));
417 }
418
419
420 static void
421 connection_change_state (struct CadetConnection* c,
422                          enum CadetConnectionState state)
423 {
424   LOG (GNUNET_ERROR_TYPE_DEBUG,
425        "Connection %s state %s -> %s\n",
426        GCC_2s (c), GCC_state2s (c->state), GCC_state2s (state));
427   if (CADET_CONNECTION_DESTROYED == c->state)
428   {
429     LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
430     return;
431   }
432   c->state = state;
433   if (CADET_CONNECTION_READY == state)
434     c->create_retry = 1;
435 }
436
437
438 /**
439  * Callback called when a queued ACK message is sent.
440  *
441  * @param cls Closure (FC).
442  * @param c Connection this message was on.
443  * @param q Queue handler this call invalidates.
444  * @param type Type of message sent.
445  * @param fwd Was this a FWD going message?
446  * @param size Size of the message.
447  */
448 static void
449 ack_sent (void *cls,
450           struct CadetConnection *c,
451           struct CadetConnectionQueue *q,
452           uint16_t type, int fwd, size_t size)
453 {
454   struct CadetFlowControl *fc = cls;
455
456   fc->ack_msg = NULL;
457 }
458
459
460 /**
461  * Send an ACK on the connection, informing the predecessor about
462  * the available buffer space. Should not be called in case the peer
463  * is origin (no predecessor) in the @c fwd direction.
464  *
465  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
466  * the ACK itself goes "back" (dest->root).
467  *
468  * @param c Connection on which to send the ACK.
469  * @param buffer How much space free to advertise?
470  * @param fwd Is this FWD ACK? (Going dest -> root)
471  * @param force Don't optimize out.
472  */
473 static void
474 send_ack (struct CadetConnection *c, unsigned int buffer, int fwd, int force)
475 {
476   struct CadetFlowControl *next_fc;
477   struct CadetFlowControl *prev_fc;
478   struct GNUNET_CADET_ACK msg;
479   uint32_t ack;
480   int delta;
481
482   /* If origin, there is no connection to send ACKs. Wrong function! */
483   if (GCC_is_origin (c, fwd))
484   {
485     LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s is origin in %s\n",
486          GCC_2s (c), GC_f2s (fwd));
487     GNUNET_break (0);
488     return;
489   }
490
491   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
492   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
493
494   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection send %s ack on %s\n",
495        GC_f2s (fwd), GCC_2s (c));
496
497   /* Check if we need to transmit the ACK. */
498   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
499   if (3 < delta && buffer < delta && GNUNET_NO == force)
500   {
501     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
502     LOG (GNUNET_ERROR_TYPE_DEBUG,
503          "  last pid recv: %u, last ack sent: %u\n",
504          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
505     return;
506   }
507
508   /* Ok, ACK might be necessary, what PID to ACK? */
509   ack = prev_fc->last_pid_recv + buffer;
510   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
511   LOG (GNUNET_ERROR_TYPE_DEBUG,
512        " last pid %u, last ack %u, qmax %u, q %u\n",
513        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
514        next_fc->queue_max, next_fc->queue_n);
515   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
516   {
517     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
518     return;
519   }
520
521   /* Check if message is already in queue */
522   if (NULL != prev_fc->ack_msg)
523   {
524     if (GC_is_pid_bigger (ack, prev_fc->last_ack_sent))
525     {
526       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
527       GCC_cancel (prev_fc->ack_msg);
528       /* GCC_cancel triggers ack_sent(), which clears fc->ack_msg */
529     }
530     else
531     {
532       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
533       return;
534     }
535   }
536
537   prev_fc->last_ack_sent = ack;
538
539   /* Build ACK message and send on connection */
540   msg.header.size = htons (sizeof (msg));
541   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_ACK);
542   msg.ack = htonl (ack);
543   msg.cid = c->id;
544
545   prev_fc->ack_msg = GCC_send_prebuilt_message (&msg.header, 0, ack, c,
546                                                 !fwd, GNUNET_YES,
547                                                 &ack_sent, prev_fc);
548   GNUNET_assert (NULL != prev_fc->ack_msg);
549 }
550
551
552 /**
553  * Callback called when a connection queued message is sent.
554  *
555  * Calculates the average time and connection packet tracking.
556  *
557  * @param cls Closure (ConnectionQueue Handle).
558  * @param c Connection this message was on.
559  * @param sent Was it really sent? (Could have been canceled)
560  * @param type Type of message sent.
561  * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
562  * @param fwd Was this a FWD going message?
563  * @param size Size of the message.
564  * @param wait Time spent waiting for core (only the time for THIS message)
565  */
566 static void
567 conn_message_sent (void *cls,
568                    struct CadetConnection *c, int sent,
569                    uint16_t type, uint32_t pid, int fwd, size_t size,
570                    struct GNUNET_TIME_Relative wait)
571 {
572   struct CadetConnectionPerformance *p;
573   struct CadetFlowControl *fc;
574   struct CadetConnectionQueue *q = cls;
575   double usecsperbyte;
576   int forced;
577
578   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
579
580   fc = fwd ? &c->fwd_fc : &c->bck_fc;
581   LOG (GNUNET_ERROR_TYPE_DEBUG, " %ssent %s %s\n",
582        sent ? "" : "not ", GC_f2s (fwd), GC_m2s (type));
583   if (NULL != q)
584   {
585     forced = q->forced;
586     if (NULL != q->cont)
587     {
588       LOG (GNUNET_ERROR_TYPE_DEBUG, " calling cont\n");
589       q->cont (q->cont_cls, c, q, type, fwd, size);
590     }
591     GNUNET_free (q);
592   }
593   else if (type == GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED)
594   {
595     /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
596     forced = GNUNET_YES;
597   }
598   else
599   {
600     forced = GNUNET_NO;
601   }
602   if (NULL == c)
603   {
604     if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
605         && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
606     {
607       LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
608            GC_m2s (type));
609     }
610     return;
611   }
612   LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P- %p %u\n", c, c->pending_messages);
613   c->pending_messages--;
614   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
615   {
616     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
617     GCC_destroy (c);
618     return;
619   }
620   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
621   switch (type)
622   {
623     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
624     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
625       c->maintenance_q = NULL;
626       /* Don't trigger a keepalive for sent ACKs, only SYN and SYNACKs */
627       if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE == type || !fwd)
628         schedule_next_keepalive (c, fwd);
629       break;
630
631     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
632       if (GNUNET_YES == sent)
633       {
634         GNUNET_assert (NULL != q);
635         fc->last_pid_sent = pid; // FIXME
636         GCC_send_ack (c, fwd, GNUNET_NO);
637         connection_reset_timeout (c, fwd);
638       }
639
640       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
641       if (GNUNET_NO == forced)
642       {
643         fc->queue_n--;
644         LOG (GNUNET_ERROR_TYPE_DEBUG,
645             "!   accounting pid %u\n",
646             fc->last_pid_sent);
647       }
648       else
649       {
650         LOG (GNUNET_ERROR_TYPE_DEBUG,
651              "!   forced, Q_N not accounting pid %u\n",
652              fc->last_pid_sent);
653       }
654       break;
655
656     case GNUNET_MESSAGE_TYPE_CADET_KX:
657       if (GNUNET_YES == sent)
658         connection_reset_timeout (c, fwd);
659       break;
660
661     case GNUNET_MESSAGE_TYPE_CADET_POLL:
662       fc->poll_msg = NULL;
663       break;
664
665     case GNUNET_MESSAGE_TYPE_CADET_ACK:
666       fc->ack_msg = NULL;
667       break;
668
669     default:
670       break;
671   }
672   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
673
674   if (NULL == c->perf)
675     return; /* Only endpoints are interested in timing. */
676
677   p = c->perf;
678   usecsperbyte = ((double) wait.rel_value_us) / size;
679   if (p->size == AVG_MSGS)
680   {
681     /* Array is full. Substract oldest value, add new one and store. */
682     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
683     p->usecsperbyte[p->idx] = usecsperbyte;
684     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
685   }
686   else
687   {
688     /* Array not yet full. Add current value to avg and store. */
689     p->usecsperbyte[p->idx] = usecsperbyte;
690     p->avg *= p->size;
691     p->avg += p->usecsperbyte[p->idx];
692     p->size++;
693     p->avg /= p->size;
694   }
695   p->idx = (p->idx + 1) % AVG_MSGS;
696 }
697
698
699 /**
700  * Get the previous hop in a connection
701  *
702  * @param c Connection.
703  *
704  * @return Previous peer in the connection.
705  */
706 static struct CadetPeer *
707 get_prev_hop (const struct CadetConnection *c)
708 {
709   GNUNET_PEER_Id id;
710
711   LOG (GNUNET_ERROR_TYPE_DEBUG, " get prev hop %s [%u/%u]\n",
712        GCC_2s (c), c->own_pos, c->path->length);
713   if (0 == c->own_pos || c->path->length < 2)
714     id = c->path->peers[0];
715   else
716     id = c->path->peers[c->own_pos - 1];
717
718   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
719        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
720
721   return GCP_get_short (id);
722 }
723
724
725 /**
726  * Get the next hop in a connection
727  *
728  * @param c Connection.
729  *
730  * @return Next peer in the connection.
731  */
732 static struct CadetPeer *
733 get_next_hop (const struct CadetConnection *c)
734 {
735   GNUNET_PEER_Id id;
736
737   LOG (GNUNET_ERROR_TYPE_DEBUG, " get next hop %s [%u/%u]\n",
738        GCC_2s (c), c->own_pos, c->path->length);
739   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
740     id = c->path->peers[c->path->length - 1];
741   else
742     id = c->path->peers[c->own_pos + 1];
743
744   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
745        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
746
747   return GCP_get_short (id);
748 }
749
750
751 /**
752  * Get the hop in a connection.
753  *
754  * @param c Connection.
755  * @param fwd Next hop?
756  *
757  * @return Next peer in the connection.
758  */
759 static struct CadetPeer *
760 get_hop (struct CadetConnection *c, int fwd)
761 {
762   if (fwd)
763     return get_next_hop (c);
764   return get_prev_hop (c);
765 }
766
767
768 /**
769  * Is traffic coming from this sender 'FWD' traffic?
770  *
771  * @param c Connection to check.
772  * @param sender Peer identity of neighbor.
773  *
774  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
775  *         the traffic is 'FWD'.
776  *         #GNUNET_NO for BCK.
777  *         #GNUNET_SYSERR for errors.
778  */
779 static int
780 is_fwd (const struct CadetConnection *c,
781         const struct GNUNET_PeerIdentity *sender)
782 {
783   GNUNET_PEER_Id id;
784
785   id = GNUNET_PEER_search (sender);
786   if (GCP_get_short_id (get_prev_hop (c)) == id)
787     return GNUNET_YES;
788
789   if (GCP_get_short_id (get_next_hop (c)) == id)
790     return GNUNET_NO;
791
792   GNUNET_break (0);
793   return GNUNET_SYSERR;
794 }
795
796
797 /**
798  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
799  * or a first CONNECTION_ACK directed to us.
800  *
801  * @param connection Connection to confirm.
802  * @param fwd Should we send it FWD? (root->dest)
803  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
804  */
805 static void
806 send_connection_ack (struct CadetConnection *connection, int fwd)
807 {
808   struct CadetTunnel *t;
809
810   t = connection->t;
811   LOG (GNUNET_ERROR_TYPE_INFO, "===> {%14s ACK} on connection %s\n",
812        GC_f2s (!fwd), GCC_2s (connection));
813   GCP_queue_add (get_hop (connection, fwd), NULL,
814                  GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, 0, 0,
815                  sizeof (struct GNUNET_CADET_ConnectionACK),
816                  connection, fwd, &conn_message_sent, NULL);
817   connection->pending_messages++;
818   if (CADET_TUNNEL_NEW == GCT_get_cstate (t))
819     GCT_change_cstate (t, CADET_TUNNEL_WAITING);
820   if (CADET_CONNECTION_READY != connection->state)
821     connection_change_state (connection, CADET_CONNECTION_SENT);
822 }
823
824
825 /**
826  * Send a notification that a connection is broken.
827  *
828  * @param c Connection that is broken.
829  * @param id1 Peer that has disconnected.
830  * @param id2 Peer that has disconnected.
831  * @param fwd Direction towards which to send it.
832  */
833 static void
834 send_broken (struct CadetConnection *c,
835              const struct GNUNET_PeerIdentity *id1,
836              const struct GNUNET_PeerIdentity *id2,
837              int fwd)
838 {
839   struct GNUNET_CADET_ConnectionBroken msg;
840
841   msg.header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
842   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
843   msg.cid = c->id;
844   msg.peer1 = *id1;
845   msg.peer2 = *id2;
846   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c, fwd,
847                                                     GNUNET_YES, NULL, NULL));
848 }
849
850
851 /**
852  * Send a notification that a connection is broken, when a connection
853  * isn't even known to the local peer.
854  *
855  * @param connection_id Connection ID.
856  * @param id1 Peer that has disconnected, probably local peer.
857  * @param id2 Peer that has disconnected can be NULL if unknown.
858  * @param peer Peer to notify (neighbor who sent the connection).
859  */
860 static void
861 send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id,
862                      const struct GNUNET_PeerIdentity *id1,
863                      const struct GNUNET_PeerIdentity *id2,
864                      const struct GNUNET_PeerIdentity *peer_id)
865 {
866   struct GNUNET_CADET_ConnectionBroken *msg;
867   struct CadetPeer *neighbor;
868
869   LOG (GNUNET_ERROR_TYPE_INFO, "===> BROKEN on unknown connection %s\n",
870        GNUNET_h2s (GC_h2hc (connection_id)));
871
872   msg = GNUNET_new (struct GNUNET_CADET_ConnectionBroken);
873   msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
874   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
875   msg->cid = *connection_id;
876   msg->peer1 = *id1;
877   if (NULL != id2)
878     msg->peer2 = *id2;
879   else
880     memset (&msg->peer2, 0, sizeof (msg->peer2));
881   neighbor = GCP_get (peer_id);
882   GCP_queue_add (neighbor, msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
883                  0, 2, sizeof (struct GNUNET_CADET_ConnectionBroken),
884                  NULL, GNUNET_SYSERR, /* connection, fwd */
885                  NULL, NULL); /* continuation */
886 }
887
888
889 /**
890  * Send keepalive packets for a connection.
891  *
892  * @param c Connection to keep alive..
893  * @param fwd Is this a FWD keepalive? (owner -> dest).
894  */
895 static void
896 send_connection_keepalive (struct CadetConnection *c, int fwd)
897 {
898   struct GNUNET_MessageHeader msg;
899   struct CadetFlowControl *fc;
900
901   LOG (GNUNET_ERROR_TYPE_INFO,
902        "keepalive %s for connection %s\n",
903        GC_f2s (fwd), GCC_2s (c));
904
905   fc = fwd ? &c->fwd_fc : &c->bck_fc;
906   if (0 < fc->queue_n)
907   {
908     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, traffic in queue\n");
909   }
910
911   GNUNET_STATISTICS_update (stats, "# keepalives sent", 1, GNUNET_NO);
912
913   GNUNET_assert (NULL != c->t);
914   msg.size = htons (sizeof (msg));
915   msg.type = htons (GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE);
916
917   GNUNET_assert (NULL ==
918                  GCT_send_prebuilt_message (&msg, c->t, c,
919                                             GNUNET_NO, NULL, NULL));
920 }
921
922
923 /**
924  * Send CONNECTION_{CREATE/ACK} packets for a connection.
925  *
926  * @param c Connection for which to send the message.
927  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
928  */
929 static void
930 connection_recreate (struct CadetConnection *c, int fwd)
931 {
932   LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
933   if (fwd)
934     GCC_send_create (c);
935   else
936     send_connection_ack (c, GNUNET_NO);
937 }
938
939
940 /**
941  * Generic connection timer management.
942  * Depending on the role of the peer in the connection will send the
943  * appropriate message (build or keepalive)
944  *
945  * @param c Conncetion to maintain.
946  * @param fwd Is FWD?
947  */
948 static void
949 connection_maintain (struct CadetConnection *c, int fwd)
950 {
951   if (GNUNET_NO != c->destroy)
952     return;
953
954   if (NULL == c->t)
955   {
956     GNUNET_break (0);
957     return;
958   }
959
960   if (CADET_TUNNEL_SEARCHING == GCT_get_cstate (c->t))
961   {
962     /* TODO DHT GET with RO_BART */
963     return;
964   }
965   switch (c->state)
966   {
967     case CADET_CONNECTION_NEW:
968       GNUNET_break (0);
969       /* fall-through */
970     case CADET_CONNECTION_SENT:
971       connection_recreate (c, fwd);
972       break;
973     case CADET_CONNECTION_READY:
974       send_connection_keepalive (c, fwd);
975       break;
976     default:
977       break;
978   }
979 }
980
981
982
983 /**
984  * Keep the connection alive.
985  *
986  * @param c Connection to keep alive.
987  * @param fwd Direction.
988  * @param shutdown Are we shutting down? (Don't send traffic)
989  *                 Non-zero value for true, not necessarily GNUNET_YES.
990  */
991 static void
992 connection_keepalive (struct CadetConnection *c, int fwd, int shutdown)
993 {
994   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s keepalive for %s\n",
995        GC_f2s (fwd), GCC_2s (c));
996
997   if (fwd)
998     c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
999   else
1000     c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1001
1002   if (GNUNET_NO != shutdown)
1003     return;
1004
1005   connection_maintain (c, fwd);
1006
1007   /* Next execution will be scheduled by message_sent */
1008 }
1009
1010
1011 /**
1012  * Keep the connection alive in the FWD direction.
1013  *
1014  * @param cls Closure (connection to keepalive).
1015  * @param tc TaskContext.
1016  */
1017 static void
1018 connection_fwd_keepalive (void *cls,
1019                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1020 {
1021   connection_keepalive ((struct CadetConnection *) cls,
1022                         GNUNET_YES,
1023                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1024 }
1025
1026
1027 /**
1028  * Keep the connection alive in the BCK direction.
1029  *
1030  * @param cls Closure (connection to keepalive).
1031  * @param tc TaskContext.
1032  */
1033 static void
1034 connection_bck_keepalive (void *cls,
1035                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1036 {
1037   connection_keepalive ((struct CadetConnection *) cls,
1038                         GNUNET_NO,
1039                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1040 }
1041
1042
1043 /**
1044  * Schedule next keepalive task, taking in consideration
1045  * the connection state and number of retries.
1046  *
1047  * If the peer is not the origin, do nothing.
1048  *
1049  * @param c Connection for which to schedule the next keepalive.
1050  * @param fwd Direction for the next keepalive.
1051  */
1052 static void
1053 schedule_next_keepalive (struct CadetConnection *c, int fwd)
1054 {
1055   struct GNUNET_TIME_Relative delay;
1056   GNUNET_SCHEDULER_TaskIdentifier *task_id;
1057   GNUNET_SCHEDULER_Task keepalive_task;
1058
1059   if (GNUNET_NO == GCC_is_origin (c, fwd))
1060     return;
1061
1062   /* Calculate delay to use, depending on the state of the connection */
1063   if (CADET_CONNECTION_READY == c->state)
1064   {
1065     delay = refresh_connection_time;
1066   }
1067   else
1068   {
1069     if (1 > c->create_retry)
1070       c->create_retry = 1;
1071     delay = GNUNET_TIME_relative_multiply (create_connection_time,
1072                                            c->create_retry);
1073     if (c->create_retry < 64)
1074       c->create_retry *= 2;
1075   }
1076
1077   /* Select direction-dependent parameters */
1078   if (GNUNET_YES == fwd)
1079   {
1080     task_id = &c->fwd_maintenance_task;
1081     keepalive_task = &connection_fwd_keepalive;
1082   }
1083   else
1084   {
1085     task_id = &c->bck_maintenance_task;
1086     keepalive_task = &connection_bck_keepalive;
1087   }
1088
1089   /* Check that no one scheduled it before us */
1090   if (GNUNET_SCHEDULER_NO_TASK != *task_id)
1091   {
1092     /* No need for a _break. It can happen for instance when sending a SYNACK
1093      * for a duplicate SYN: the first SYNACK scheduled the task. */
1094     GNUNET_SCHEDULER_cancel (*task_id);
1095   }
1096
1097   /* Schedule the task */
1098   *task_id = GNUNET_SCHEDULER_add_delayed (delay, keepalive_task, c);
1099   LOG (GNUNET_ERROR_TYPE_DEBUG, "next keepalive in %s\n",
1100        GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1101 }
1102
1103
1104 /**
1105  * @brief Re-initiate traffic on this connection if necessary.
1106  *
1107  * Check if there is traffic queued towards this peer
1108  * and the core transmit handle is NULL (traffic was stalled).
1109  * If so, call core tmt rdy.
1110  *
1111  * @param c Connection on which initiate traffic.
1112  * @param fwd Is this about fwd traffic?
1113  */
1114 static void
1115 connection_unlock_queue (struct CadetConnection *c, int fwd)
1116 {
1117   struct CadetPeer *peer;
1118
1119   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_unlock_queue %s on %s\n",
1120        GC_f2s (fwd), GCC_2s (c));
1121
1122   if (GCC_is_terminal (c, fwd))
1123   {
1124     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal, can unlock!\n");
1125     return;
1126   }
1127
1128   peer = get_hop (c, fwd);
1129   GCP_queue_unlock (peer, c);
1130 }
1131
1132
1133 /**
1134  * Cancel all transmissions that belong to a certain connection.
1135  *
1136  * If the connection is scheduled for destruction and no more messages are left,
1137  * the connection will be destroyed by the continuation call.
1138  *
1139  * @param c Connection which to cancel. Might be destroyed during this call.
1140  * @param fwd Cancel fwd traffic?
1141  */
1142 static void
1143 connection_cancel_queues (struct CadetConnection *c, int fwd)
1144 {
1145   struct CadetFlowControl *fc;
1146   struct CadetPeer *peer;
1147
1148   LOG (GNUNET_ERROR_TYPE_DEBUG,
1149        " *** Cancel %s queues for connection %s\n",
1150        GC_f2s (fwd), GCC_2s (c));
1151   if (NULL == c)
1152   {
1153     GNUNET_break (0);
1154     return;
1155   }
1156
1157   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1158   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
1159   {
1160     GNUNET_SCHEDULER_cancel (fc->poll_task);
1161     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1162     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Cancel POLL in ccq for fc %p\n", fc);
1163   }
1164   peer = get_hop (c, fwd);
1165   GCP_queue_cancel (peer, c);
1166 }
1167
1168
1169 /**
1170  * Function called if a connection has been stalled for a while,
1171  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1172  *
1173  * @param cls Closure (poll ctx).
1174  * @param tc TaskContext.
1175  */
1176 static void
1177 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1178
1179
1180 /**
1181  * Callback called when a queued POLL message is sent.
1182  *
1183  * @param cls Closure (FC).
1184  * @param c Connection this message was on.
1185  * @param q Queue handler this call invalidates.
1186  * @param type Type of message sent.
1187  * @param fwd Was this a FWD going message?
1188  * @param size Size of the message.
1189  */
1190 static void
1191 poll_sent (void *cls,
1192            struct CadetConnection *c,
1193            struct CadetConnectionQueue *q,
1194            uint16_t type, int fwd, size_t size)
1195 {
1196   struct CadetFlowControl *fc = cls;
1197
1198   if (2 == c->destroy)
1199   {
1200     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL canceled on shutdown\n");
1201     return;
1202   }
1203   LOG (GNUNET_ERROR_TYPE_DEBUG,
1204        " *** POLL sent for , scheduling new one!\n");
1205   fc->poll_msg = NULL;
1206   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
1207   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1208                                                 &connection_poll, fc);
1209   LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
1210
1211 }
1212
1213 /**
1214  * Function called if a connection has been stalled for a while,
1215  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1216  *
1217  * @param cls Closure (poll ctx).
1218  * @param tc TaskContext.
1219  */
1220 static void
1221 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1222 {
1223   struct CadetFlowControl *fc = cls;
1224   struct GNUNET_CADET_Poll msg;
1225   struct CadetConnection *c;
1226
1227   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1228   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1229   {
1230     return;
1231   }
1232
1233   c = fc->c;
1234   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling connection %s %s\n",
1235        GCC_2s (c), fc == &c->fwd_fc ? "FWD" : "BCK");
1236
1237   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_POLL);
1238   msg.header.size = htons (sizeof (msg));
1239   msg.pid = htonl (fc->last_pid_sent);
1240   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent);
1241   fc->poll_msg =
1242       GCC_send_prebuilt_message (&msg.header, 0, fc->last_pid_sent, c,
1243                                  fc == &c->fwd_fc, GNUNET_YES, &poll_sent, fc);
1244   GNUNET_assert (NULL != fc->poll_msg);
1245 }
1246
1247
1248 /**
1249  * Timeout function due to lack of keepalive/traffic from the owner.
1250  * Destroys connection if called.
1251  *
1252  * @param cls Closure (connection to destroy).
1253  * @param tc TaskContext.
1254  */
1255 static void
1256 connection_fwd_timeout (void *cls,
1257                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1258 {
1259   struct CadetConnection *c = cls;
1260
1261   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1262   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1263     return;
1264
1265   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s FWD timed out. Destroying.\n",
1266        GCC_2s (c));
1267   if (GCC_is_origin (c, GNUNET_YES)) /* If local, leave. */
1268   {
1269     GNUNET_break (0);
1270     return;
1271   }
1272
1273   GCC_destroy (c);
1274 }
1275
1276
1277 /**
1278  * Timeout function due to lack of keepalive/traffic from the destination.
1279  * Destroys connection if called.
1280  *
1281  * @param cls Closure (connection to destroy).
1282  * @param tc TaskContext
1283  */
1284 static void
1285 connection_bck_timeout (void *cls,
1286                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1287 {
1288   struct CadetConnection *c = cls;
1289
1290   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1291   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1292     return;
1293
1294   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s BCK timed out. Destroying.\n",
1295        GCC_2s (c));
1296
1297   if (GCC_is_origin (c, GNUNET_NO)) /* If local, leave. */
1298   {
1299     GNUNET_break (0);
1300     return;
1301   }
1302
1303   GCC_destroy (c);
1304 }
1305
1306
1307 /**
1308  * Resets the connection timeout task, some other message has done the
1309  * task's job.
1310  * - For the first peer on the direction this means to send
1311  *   a keepalive or a path confirmation message (either create or ACK).
1312  * - For all other peers, this means to destroy the connection,
1313  *   due to lack of activity.
1314  * Starts the timeout if no timeout was running (connection just created).
1315  *
1316  * @param c Connection whose timeout to reset.
1317  * @param fwd Is this forward?
1318  *
1319  * TODO use heap to improve efficiency of scheduler.
1320  */
1321 static void
1322 connection_reset_timeout (struct CadetConnection *c, int fwd)
1323 {
1324   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GC_f2s (fwd));
1325
1326   if (GCC_is_origin (c, fwd)) /* Startpoint */
1327   {
1328     schedule_next_keepalive (c, fwd);
1329   }
1330   else /* Relay, endpoint. */
1331   {
1332     struct GNUNET_TIME_Relative delay;
1333     GNUNET_SCHEDULER_TaskIdentifier *ti;
1334     GNUNET_SCHEDULER_Task f;
1335
1336     ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1337
1338     if (GNUNET_SCHEDULER_NO_TASK != *ti)
1339       GNUNET_SCHEDULER_cancel (*ti);
1340     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1341     f = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1342     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1343   }
1344 }
1345
1346
1347 /**
1348  * Add the connection to the list of both neighbors.
1349  *
1350  * @param c Connection.
1351  *
1352  * @return #GNUNET_OK if everything went fine
1353  *         #GNUNET_SYSERR if the was an error and @c c is malformed.
1354  */
1355 static int
1356 register_neighbors (struct CadetConnection *c)
1357 {
1358   struct CadetPeer *next_peer;
1359   struct CadetPeer *prev_peer;
1360
1361   next_peer = get_next_hop (c);
1362   prev_peer = get_prev_hop (c);
1363
1364   LOG (GNUNET_ERROR_TYPE_DEBUG, "register neighbors for connection %s\n",
1365        GCC_2s (c));
1366   path_debug (c->path);
1367   LOG (GNUNET_ERROR_TYPE_DEBUG, "own pos %u\n", c->own_pos);
1368   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to next peer %p\n",
1369        GCC_2s (c), next_peer);
1370   LOG (GNUNET_ERROR_TYPE_DEBUG, "next peer %p %s\n", next_peer, GCP_2s (next_peer));
1371   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to prev peer %p\n",
1372        GCC_2s (c), prev_peer);
1373   LOG (GNUNET_ERROR_TYPE_DEBUG, "prev peer %p %s\n", prev_peer, GCP_2s (prev_peer));
1374
1375   if (GNUNET_NO == GCP_is_neighbor (next_peer)
1376       || GNUNET_NO == GCP_is_neighbor (prev_peer))
1377   {
1378     if (GCC_is_origin (c, GNUNET_YES))
1379       GNUNET_STATISTICS_update (stats, "# local bad paths", 1, GNUNET_NO);
1380     GNUNET_STATISTICS_update (stats, "# bad paths", 1, GNUNET_NO);
1381
1382     LOG (GNUNET_ERROR_TYPE_DEBUG, "  register neighbors failed\n");
1383     LOG (GNUNET_ERROR_TYPE_DEBUG, "  prev: %s, neighbor?: %d\n",
1384          GCP_2s (prev_peer), GCP_is_neighbor (prev_peer));
1385     LOG (GNUNET_ERROR_TYPE_DEBUG, "  next: %s, neighbor?: %d\n",
1386          GCP_2s (next_peer), GCP_is_neighbor (next_peer));
1387     return GNUNET_SYSERR;
1388   }
1389
1390   GCP_add_connection (next_peer, c);
1391   GCP_add_connection (prev_peer, c);
1392
1393   return GNUNET_OK;
1394 }
1395
1396
1397 /**
1398  * Remove the connection from the list of both neighbors.
1399  *
1400  * @param c Connection.
1401  */
1402 static void
1403 unregister_neighbors (struct CadetConnection *c)
1404 {
1405   struct CadetPeer *peer;
1406
1407   peer = get_next_hop (c);
1408   if (GNUNET_OK != GCP_remove_connection (peer, c))
1409   {
1410     GNUNET_assert (CADET_CONNECTION_NEW == c->state
1411                    || CADET_CONNECTION_DESTROYED == c->state);
1412     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1413     if (NULL != c->t) GCT_debug (c->t, GNUNET_ERROR_TYPE_DEBUG);
1414   }
1415
1416   peer = get_prev_hop (c);
1417   if (GNUNET_OK != GCP_remove_connection (peer, c))
1418   {
1419     GNUNET_assert (CADET_CONNECTION_NEW == c->state
1420                    || CADET_CONNECTION_DESTROYED == c->state);
1421     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1422     if (NULL != c->t) GCT_debug (c->t, GNUNET_ERROR_TYPE_DEBUG);
1423   }
1424 }
1425
1426
1427 /**
1428  * Bind the connection to the peer and the tunnel to that peer.
1429  *
1430  * If the peer has no tunnel, create one. Update tunnel and connection
1431  * data structres to reflect new status.
1432  *
1433  * @param c Connection.
1434  * @param peer Peer.
1435  */
1436 static void
1437 add_to_peer (struct CadetConnection *c, struct CadetPeer *peer)
1438 {
1439   GCP_add_tunnel (peer);
1440   c->t = GCP_get_tunnel (peer);
1441   GCT_add_connection (c->t, c);
1442 }
1443
1444
1445 /**
1446  * Builds a path from a PeerIdentity array.
1447  *
1448  * @param peers PeerIdentity array.
1449  * @param size Size of the @c peers array.
1450  * @param own_pos Output parameter: own position in the path.
1451  *
1452  * @return Fixed and shortened path.
1453  */
1454 static struct CadetPeerPath *
1455 build_path_from_peer_ids (struct GNUNET_PeerIdentity *peers,
1456                           unsigned int size,
1457                           unsigned int *own_pos)
1458 {
1459   struct CadetPeerPath *path;
1460   GNUNET_PEER_Id shortid;
1461   unsigned int i;
1462   unsigned int j;
1463   unsigned int offset;
1464
1465   /* Create path */
1466   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating path...\n");
1467   path = path_new (size);
1468   *own_pos = 0;
1469   offset = 0;
1470   for (i = 0; i < size; i++)
1471   {
1472     LOG (GNUNET_ERROR_TYPE_DEBUG, "  - %u: taking %s\n",
1473          i, GNUNET_i2s (&peers[i]));
1474     shortid = GNUNET_PEER_intern (&peers[i]);
1475
1476     /* Check for loops / duplicates */
1477     for (j = 0; j < i - offset; j++)
1478     {
1479       if (path->peers[j] == shortid)
1480       {
1481         LOG (GNUNET_ERROR_TYPE_DEBUG, "    already exists at pos %u\n", j);
1482         offset = i - j;
1483         LOG (GNUNET_ERROR_TYPE_DEBUG, "    offset now %u\n", offset);
1484         GNUNET_PEER_change_rc (shortid, -1);
1485       }
1486     }
1487     LOG (GNUNET_ERROR_TYPE_DEBUG, "    storing at %u\n", i - offset);
1488     path->peers[i - offset] = shortid;
1489     if (path->peers[i - offset] == myid)
1490       *own_pos = i - offset;
1491   }
1492   path->length -= offset;
1493
1494   if (path->peers[*own_pos] != myid)
1495   {
1496     /* create path: self not found in path through self */
1497     GNUNET_break_op (0);
1498     path_destroy (path);
1499     return NULL;
1500   }
1501
1502   return path;
1503 }
1504
1505
1506 /**
1507  * Log receipt of message on stderr (INFO level).
1508  *
1509  * @param message Message received.
1510  * @param peer Peer who sent the message.
1511  * @param hash Connection ID.
1512  */
1513 static void
1514 log_message (const struct GNUNET_MessageHeader *message,
1515              const struct GNUNET_PeerIdentity *peer,
1516              const struct GNUNET_CADET_Hash *hash)
1517 {
1518   LOG (GNUNET_ERROR_TYPE_INFO, "<-- %s on connection %s from %s\n",
1519        GC_m2s (ntohs (message->type)), GNUNET_h2s (GC_h2hc (hash)),
1520        GNUNET_i2s (peer));
1521 }
1522
1523 /******************************************************************************/
1524 /********************************    API    ***********************************/
1525 /******************************************************************************/
1526
1527 /**
1528  * Core handler for connection creation.
1529  *
1530  * @param cls Closure (unused).
1531  * @param peer Sender (neighbor).
1532  * @param message Message.
1533  *
1534  * @return GNUNET_OK to keep the connection open,
1535  *         GNUNET_SYSERR to close it (signal serious error)
1536  */
1537 int
1538 GCC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
1539                    const struct GNUNET_MessageHeader *message)
1540 {
1541   struct GNUNET_CADET_ConnectionCreate *msg;
1542   struct GNUNET_PeerIdentity *id;
1543   struct GNUNET_CADET_Hash *cid;
1544   struct CadetPeerPath *path;
1545   struct CadetPeer *dest_peer;
1546   struct CadetPeer *orig_peer;
1547   struct CadetConnection *c;
1548   unsigned int own_pos;
1549   uint16_t size;
1550
1551   /* Check size */
1552   size = ntohs (message->size);
1553   if (size < sizeof (struct GNUNET_CADET_ConnectionCreate))
1554   {
1555     GNUNET_break_op (0);
1556     return GNUNET_OK;
1557   }
1558
1559   /* Calculate hops */
1560   size -= sizeof (struct GNUNET_CADET_ConnectionCreate);
1561   if (size % sizeof (struct GNUNET_PeerIdentity))
1562   {
1563     GNUNET_break_op (0);
1564     return GNUNET_OK;
1565   }
1566   size /= sizeof (struct GNUNET_PeerIdentity);
1567   if (1 > size)
1568   {
1569     GNUNET_break_op (0);
1570     return GNUNET_OK;
1571   }
1572   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1573
1574   /* Get parameters */
1575   msg = (struct GNUNET_CADET_ConnectionCreate *) message;
1576   cid = &msg->cid;
1577   log_message (message, peer, cid);
1578   id = (struct GNUNET_PeerIdentity *) &msg[1];
1579   LOG (GNUNET_ERROR_TYPE_DEBUG, "    origin: %s\n", GNUNET_i2s (id));
1580
1581   /* Create connection */
1582   c = connection_get (cid);
1583   if (NULL == c)
1584   {
1585     path = build_path_from_peer_ids ((struct GNUNET_PeerIdentity *) &msg[1],
1586                                      size, &own_pos);
1587     if (NULL == path)
1588       return GNUNET_OK;
1589     if (0 == own_pos)
1590     {
1591       GNUNET_break_op (0);
1592       path_destroy (path);
1593       return GNUNET_OK;
1594     }
1595     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1596     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1597     c = GCC_new (cid, NULL, path_duplicate (path), own_pos);
1598     if (NULL == c)
1599     {
1600       if (path->length - 1 == own_pos)
1601       {
1602         /* If we are destination, why did the creation fail? */
1603         GNUNET_break (0);
1604         return GNUNET_OK;
1605       }
1606       send_broken_unknown (cid, &my_full_id,
1607                            GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
1608                            peer);
1609       path_destroy (path);
1610       return GNUNET_OK;
1611     }
1612     GCP_add_path_to_all (path, GNUNET_NO);
1613     connection_reset_timeout (c, GNUNET_YES);
1614   }
1615   else
1616   {
1617     path = path_duplicate (c->path);
1618   }
1619   if (CADET_CONNECTION_NEW == c->state)
1620     connection_change_state (c, CADET_CONNECTION_SENT);
1621
1622   /* Remember peers */
1623   dest_peer = GCP_get (&id[size - 1]);
1624   orig_peer = GCP_get (&id[0]);
1625
1626   /* Is it a connection to us? */
1627   if (c->own_pos == path->length - 1)
1628   {
1629     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
1630     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
1631
1632     add_to_peer (c, orig_peer);
1633     if (CADET_TUNNEL_NEW == GCT_get_cstate (c->t))
1634       GCT_change_cstate (c->t,  CADET_TUNNEL_WAITING);
1635
1636     send_connection_ack (c, GNUNET_NO);
1637     if (CADET_CONNECTION_SENT == c->state)
1638       connection_change_state (c, CADET_CONNECTION_ACK);
1639   }
1640   else
1641   {
1642     /* It's for somebody else! Retransmit. */
1643     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
1644     GCP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
1645     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
1646     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c,
1647                                                       GNUNET_YES, GNUNET_YES,
1648                                                       NULL, NULL));
1649   }
1650   path_destroy (path);
1651   return GNUNET_OK;
1652 }
1653
1654
1655 /**
1656  * Core handler for path confirmations.
1657  *
1658  * @param cls closure
1659  * @param message message
1660  * @param peer peer identity this notification is about
1661  *
1662  * @return GNUNET_OK to keep the connection open,
1663  *         GNUNET_SYSERR to close it (signal serious error)
1664  */
1665 int
1666 GCC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
1667                     const struct GNUNET_MessageHeader *message)
1668 {
1669   struct GNUNET_CADET_ConnectionACK *msg;
1670   struct CadetConnection *c;
1671   struct CadetPeerPath *p;
1672   struct CadetPeer *pi;
1673   enum CadetConnectionState oldstate;
1674   int fwd;
1675
1676   msg = (struct GNUNET_CADET_ConnectionACK *) message;
1677   log_message (message, peer, &msg->cid);
1678   c = connection_get (&msg->cid);
1679   if (NULL == c)
1680   {
1681     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1682                               1, GNUNET_NO);
1683     LOG (GNUNET_ERROR_TYPE_DEBUG, "  don't know the connection!\n");
1684     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
1685     return GNUNET_OK;
1686   }
1687
1688   if (GNUNET_NO != c->destroy)
1689   {
1690     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection being destroyed\n");
1691     return GNUNET_OK;
1692   }
1693
1694   oldstate = c->state;
1695   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
1696   pi = GCP_get (peer);
1697   if (get_next_hop (c) == pi)
1698   {
1699     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
1700     fwd = GNUNET_NO;
1701     if (CADET_CONNECTION_SENT == oldstate)
1702       connection_change_state (c, CADET_CONNECTION_ACK);
1703   }
1704   else if (get_prev_hop (c) == pi)
1705   {
1706     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK\n");
1707     fwd = GNUNET_YES;
1708     connection_change_state (c, CADET_CONNECTION_READY);
1709   }
1710   else
1711   {
1712     GNUNET_break_op (0);
1713     return GNUNET_OK;
1714   }
1715
1716   connection_reset_timeout (c, fwd);
1717
1718   /* Add path to peers? */
1719   p = c->path;
1720   if (NULL != p)
1721   {
1722     GCP_add_path_to_all (p, GNUNET_YES);
1723   }
1724   else
1725   {
1726     GNUNET_break (0);
1727   }
1728
1729   /* Message for us as creator? */
1730   if (GCC_is_origin (c, GNUNET_YES))
1731   {
1732     if (GNUNET_NO != fwd)
1733     {
1734       GNUNET_break_op (0);
1735       return GNUNET_OK;
1736     }
1737     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
1738
1739     /* If just created, cancel the short timeout and start a long one */
1740     if (CADET_CONNECTION_SENT == oldstate)
1741       connection_reset_timeout (c, GNUNET_YES);
1742
1743     /* Change connection state */
1744     connection_change_state (c, CADET_CONNECTION_READY);
1745     send_connection_ack (c, GNUNET_YES);
1746
1747     /* Change tunnel state, trigger KX */
1748     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
1749       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
1750
1751     return GNUNET_OK;
1752   }
1753
1754   /* Message for us as destination? */
1755   if (GCC_is_terminal (c, GNUNET_YES))
1756   {
1757     if (GNUNET_YES != fwd)
1758     {
1759       GNUNET_break_op (0);
1760       return GNUNET_OK;
1761     }
1762     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
1763
1764     /* If just created, cancel the short timeout and start a long one */
1765     if (CADET_CONNECTION_ACK == oldstate)
1766       connection_reset_timeout (c, GNUNET_NO);
1767
1768     /* Change tunnel state */
1769     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
1770       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
1771
1772     return GNUNET_OK;
1773   }
1774
1775   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1776   GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
1777                                                     GNUNET_YES, NULL, NULL));
1778   return GNUNET_OK;
1779 }
1780
1781
1782 /**
1783  * Core handler for notifications of broken connections.
1784  *
1785  * @param cls Closure (unused).
1786  * @param id Peer identity of sending neighbor.
1787  * @param message Message.
1788  *
1789  * @return GNUNET_OK to keep the connection open,
1790  *         GNUNET_SYSERR to close it (signal serious error)
1791  */
1792 int
1793 GCC_handle_broken (void* cls,
1794                    const struct GNUNET_PeerIdentity* id,
1795                    const struct GNUNET_MessageHeader* message)
1796 {
1797   struct GNUNET_CADET_ConnectionBroken *msg;
1798   struct CadetConnection *c;
1799   struct CadetTunnel *t;
1800   unsigned int del;
1801   int pending;
1802   int fwd;
1803
1804   msg = (struct GNUNET_CADET_ConnectionBroken *) message;
1805   log_message (message, id, &msg->cid);
1806   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1807               GNUNET_i2s (&msg->peer1));
1808   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1809               GNUNET_i2s (&msg->peer2));
1810   c = connection_get (&msg->cid);
1811   if (NULL == c)
1812   {
1813     LOG (GNUNET_ERROR_TYPE_DEBUG, "  duplicate CONNECTION_BROKEN\n");
1814     return GNUNET_OK;
1815   }
1816
1817   t = c->t;
1818   fwd = is_fwd (c, id);
1819   if (GCC_is_terminal (c, fwd))
1820   {
1821     struct GNUNET_MessageHeader *out_msg;
1822     struct CadetPeer *neighbor;
1823     struct CadetPeer *endpoint;
1824
1825     if (NULL == t)
1826     {
1827       /* A terminal connection should not have 't' set to NULL. */
1828       GNUNET_break (0);
1829       return GNUNET_OK;
1830     }
1831     neighbor = get_hop (c, !fwd);
1832     endpoint = GCP_get_short (c->path->peers[c->path->length - 1]);
1833     path_invalidate (c->path);
1834     GCP_notify_broken_link (endpoint, &msg->peer1, &msg->peer2);
1835     c->state = CADET_CONNECTION_DESTROYED;
1836     GCT_remove_connection (t, c);
1837     c->t = NULL;
1838     pending = c->pending_messages;
1839
1840     /* GCP_connection_pop will destroy the connection when the last message
1841      * is popped! Do not use 'c' after the call. */
1842     while (NULL != (out_msg = GCP_connection_pop (neighbor, c, &del)))
1843     {
1844       pending -= del + 1; /* Substract the deleted messages + the popped one */
1845       GCT_resend_message (out_msg, t);
1846     }
1847     /* All pending messages should have been popped,
1848      * and the connection destroyed by the continuation. */
1849     if (0 < pending)
1850     {
1851       GNUNET_break (0);
1852       GCC_destroy (c);
1853     }
1854     else
1855     {
1856       GNUNET_break (0 == pending_msgs); /* If negative: counter error! */
1857     }
1858   }
1859   else
1860   {
1861     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
1862                                                       GNUNET_YES, NULL, NULL));
1863     c->destroy = GNUNET_YES;
1864     connection_cancel_queues (c, !fwd);
1865   }
1866
1867   return GNUNET_OK;
1868 }
1869
1870
1871 /**
1872  * Core handler for tunnel destruction
1873  *
1874  * @param cls Closure (unused).
1875  * @param peer Peer identity of sending neighbor.
1876  * @param message Message.
1877  *
1878  * @return GNUNET_OK to keep the connection open,
1879  *         GNUNET_SYSERR to close it (signal serious error)
1880  */
1881 int
1882 GCC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
1883                     const struct GNUNET_MessageHeader *message)
1884 {
1885   struct GNUNET_CADET_ConnectionDestroy *msg;
1886   struct CadetConnection *c;
1887   int fwd;
1888
1889   msg = (struct GNUNET_CADET_ConnectionDestroy *) message;
1890   log_message (message, peer, &msg->cid);
1891   c = connection_get (&msg->cid);
1892   if (NULL == c)
1893   {
1894     /* Probably already got the message from another path,
1895      * destroyed the tunnel and retransmitted to children.
1896      * Safe to ignore.
1897      */
1898     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1899                               1, GNUNET_NO);
1900     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection unknown: already destroyed?\n");
1901     return GNUNET_OK;
1902   }
1903   fwd = is_fwd (c, peer);
1904   if (GNUNET_SYSERR == fwd)
1905   {
1906     GNUNET_break_op (0); /* FIXME */
1907     return GNUNET_OK;
1908   }
1909   if (GNUNET_NO == GCC_is_terminal (c, fwd))
1910     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
1911                                                       GNUNET_YES, NULL, NULL));
1912   else if (0 == c->pending_messages)
1913   {
1914     LOG (GNUNET_ERROR_TYPE_DEBUG, "  directly destroying connection!\n");
1915     GCC_destroy (c);
1916     return GNUNET_OK;
1917   }
1918   c->destroy = GNUNET_YES;
1919   c->state = CADET_CONNECTION_DESTROYED;
1920   if (NULL != c->t)
1921   {
1922     GCT_remove_connection (c->t, c);
1923     c->t = NULL;
1924   }
1925
1926   return GNUNET_OK;
1927 }
1928
1929 /**
1930  * Generic handler for cadet network encrypted traffic.
1931  *
1932  * @param peer Peer identity this notification is about.
1933  * @param msg Encrypted message.
1934  *
1935  * @return GNUNET_OK to keep the connection open,
1936  *         GNUNET_SYSERR to close it (signal serious error)
1937  */
1938 static int
1939 handle_cadet_encrypted (const struct GNUNET_PeerIdentity *peer,
1940                        const struct GNUNET_CADET_Encrypted *msg)
1941 {
1942   struct CadetConnection *c;
1943   struct CadetPeer *neighbor;
1944   struct CadetFlowControl *fc;
1945   GNUNET_PEER_Id peer_id;
1946   uint32_t pid;
1947   uint32_t ttl;
1948   size_t size;
1949   int fwd;
1950
1951   log_message (&msg->header, peer, &msg->cid);
1952
1953   /* Check size */
1954   size = ntohs (msg->header.size);
1955   if (size <
1956       sizeof (struct GNUNET_CADET_Encrypted) +
1957       sizeof (struct GNUNET_MessageHeader))
1958   {
1959     GNUNET_break_op (0);
1960     return GNUNET_OK;
1961   }
1962
1963   /* Check connection */
1964   c = connection_get (&msg->cid);
1965   if (NULL == c)
1966   {
1967     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1968     LOG (GNUNET_ERROR_TYPE_DEBUG, "enc on unknown connection %s\n",
1969          GNUNET_h2s (GC_h2hc (&msg->cid)));
1970     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
1971     return GNUNET_OK;
1972   }
1973
1974   LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection %s\n", GCC_2s (c));
1975
1976   /* Check if origin is as expected */
1977   neighbor = get_prev_hop (c);
1978   peer_id = GNUNET_PEER_search (peer);
1979   if (peer_id == GCP_get_short_id (neighbor))
1980   {
1981     fwd = GNUNET_YES;
1982   }
1983   else
1984   {
1985     neighbor = get_next_hop (c);
1986     if (peer_id == GCP_get_short_id (neighbor))
1987     {
1988       fwd = GNUNET_NO;
1989     }
1990     else
1991     {
1992       /* Unexpected peer sending traffic on a connection. */
1993       GNUNET_break_op (0);
1994       return GNUNET_OK;
1995     }
1996   }
1997
1998   /* Check PID */
1999   fc = fwd ? &c->bck_fc : &c->fwd_fc;
2000   pid = ntohl (msg->pid);
2001   LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u (expected %u+)\n",
2002        pid, fc->last_pid_recv + 1);
2003   if (GC_is_pid_bigger (pid, fc->last_ack_sent))
2004   {
2005     GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
2006     GNUNET_break_op (0);
2007     LOG (GNUNET_ERROR_TYPE_WARNING,
2008          "Received PID %u, (prev %u), ACK %u\n",
2009          pid, fc->last_pid_recv, fc->last_ack_sent);
2010     return GNUNET_OK;
2011   }
2012   if (GNUNET_NO == GC_is_pid_bigger (pid, fc->last_pid_recv))
2013   {
2014     GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
2015     LOG (GNUNET_ERROR_TYPE_DEBUG,
2016                 " PID %u not expected (%u+), dropping!\n",
2017                 pid, fc->last_pid_recv + 1);
2018     return GNUNET_OK;
2019   }
2020   if (CADET_CONNECTION_SENT == c->state || CADET_CONNECTION_ACK == c->state)
2021     connection_change_state (c, CADET_CONNECTION_READY);
2022   connection_reset_timeout (c, fwd);
2023   fc->last_pid_recv = pid;
2024
2025   /* Is this message for us? */
2026   if (GCC_is_terminal (c, fwd))
2027   {
2028     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2029     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
2030
2031     if (NULL == c->t)
2032     {
2033       GNUNET_break (GNUNET_NO != c->destroy);
2034       return GNUNET_OK;
2035     }
2036     fc->last_pid_recv = pid;
2037     GCT_handle_encrypted (c->t, msg);
2038     GCC_send_ack (c, fwd, GNUNET_NO);
2039     return GNUNET_OK;
2040   }
2041
2042   /* Message not for us: forward to next hop */
2043   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2044   ttl = ntohl (msg->ttl);
2045   LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
2046   if (ttl == 0)
2047   {
2048     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
2049     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
2050     GCC_send_ack (c, fwd, GNUNET_NO);
2051     return GNUNET_OK;
2052   }
2053
2054   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2055   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
2056                                                     GNUNET_NO, NULL, NULL));
2057
2058   return GNUNET_OK;
2059 }
2060
2061 /**
2062  * Generic handler for cadet network encrypted traffic.
2063  *
2064  * @param peer Peer identity this notification is about.
2065  * @param msg Encrypted message.
2066  *
2067  * @return GNUNET_OK to keep the connection open,
2068  *         GNUNET_SYSERR to close it (signal serious error)
2069  */
2070 static int
2071 handle_cadet_kx (const struct GNUNET_PeerIdentity *peer,
2072                 const struct GNUNET_CADET_KX *msg)
2073 {
2074   struct CadetConnection *c;
2075   struct CadetPeer *neighbor;
2076   GNUNET_PEER_Id peer_id;
2077   size_t size;
2078   int fwd;
2079
2080   log_message (&msg->header, peer, &msg->cid);
2081
2082   /* Check size */
2083   size = ntohs (msg->header.size);
2084   if (size <
2085       sizeof (struct GNUNET_CADET_KX) +
2086       sizeof (struct GNUNET_MessageHeader))
2087   {
2088     GNUNET_break_op (0);
2089     return GNUNET_OK;
2090   }
2091
2092   /* Check connection */
2093   c = connection_get (&msg->cid);
2094   if (NULL == c)
2095   {
2096     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
2097     LOG (GNUNET_ERROR_TYPE_DEBUG, "kx on unknown connection %s\n",
2098          GNUNET_h2s (GC_h2hc (&msg->cid)));
2099     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2100     return GNUNET_OK;
2101   }
2102   LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s\n", GCC_2s (c));
2103
2104   /* Check if origin is as expected */
2105   neighbor = get_prev_hop (c);
2106   peer_id = GNUNET_PEER_search (peer);
2107   if (peer_id == GCP_get_short_id (neighbor))
2108   {
2109     fwd = GNUNET_YES;
2110   }
2111   else
2112   {
2113     neighbor = get_next_hop (c);
2114     if (peer_id == GCP_get_short_id (neighbor))
2115     {
2116       fwd = GNUNET_NO;
2117     }
2118     else
2119     {
2120       /* Unexpected peer sending traffic on a connection. */
2121       GNUNET_break_op (0);
2122       return GNUNET_OK;
2123     }
2124   }
2125
2126   /* Count as connection confirmation. */
2127   if (CADET_CONNECTION_SENT == c->state || CADET_CONNECTION_ACK == c->state)
2128   {
2129     connection_change_state (c, CADET_CONNECTION_READY);
2130     if (NULL != c->t)
2131     {
2132       if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2133         GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2134     }
2135   }
2136   connection_reset_timeout (c, fwd);
2137
2138   /* Is this message for us? */
2139   if (GCC_is_terminal (c, fwd))
2140   {
2141     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2142     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
2143     if (NULL == c->t)
2144     {
2145       GNUNET_break (0);
2146       return GNUNET_OK;
2147     }
2148     GCT_handle_kx (c->t, &msg[1].header);
2149     return GNUNET_OK;
2150   }
2151
2152   /* Message not for us: forward to next hop */
2153   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2154   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2155   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
2156                                                     GNUNET_NO, NULL, NULL));
2157
2158   return GNUNET_OK;
2159 }
2160
2161
2162 /**
2163  * Core handler for encrypted cadet network traffic (channel mgmt, data).
2164  *
2165  * @param cls Closure (unused).
2166  * @param message Message received.
2167  * @param peer Peer who sent the message.
2168  *
2169  * @return GNUNET_OK to keep the connection open,
2170  *         GNUNET_SYSERR to close it (signal serious error)
2171  */
2172 int
2173 GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
2174                       const struct GNUNET_MessageHeader *message)
2175 {
2176   return handle_cadet_encrypted (peer,
2177                                 (struct GNUNET_CADET_Encrypted *)message);
2178 }
2179
2180
2181 /**
2182  * Core handler for key exchange traffic (ephemeral key, ping, pong).
2183  *
2184  * @param cls Closure (unused).
2185  * @param message Message received.
2186  * @param peer Peer who sent the message.
2187  *
2188  * @return GNUNET_OK to keep the connection open,
2189  *         GNUNET_SYSERR to close it (signal serious error)
2190  */
2191 int
2192 GCC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
2193                const struct GNUNET_MessageHeader *message)
2194 {
2195   return handle_cadet_kx (peer,
2196                          (struct GNUNET_CADET_KX *) message);
2197 }
2198
2199
2200 /**
2201  * Core handler for cadet network traffic point-to-point acks.
2202  *
2203  * @param cls closure
2204  * @param message message
2205  * @param peer peer identity this notification is about
2206  *
2207  * @return GNUNET_OK to keep the connection open,
2208  *         GNUNET_SYSERR to close it (signal serious error)
2209  */
2210 int
2211 GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
2212                 const struct GNUNET_MessageHeader *message)
2213 {
2214   struct GNUNET_CADET_ACK *msg;
2215   struct CadetConnection *c;
2216   struct CadetFlowControl *fc;
2217   GNUNET_PEER_Id id;
2218   uint32_t ack;
2219   int fwd;
2220
2221   msg = (struct GNUNET_CADET_ACK *) message;
2222   log_message (message, peer, &msg->cid);
2223   c = connection_get (&msg->cid);
2224   if (NULL == c)
2225   {
2226     GNUNET_STATISTICS_update (stats, "# ack on unknown connection", 1,
2227                               GNUNET_NO);
2228     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2229     return GNUNET_OK;
2230   }
2231
2232   /* Is this a forward or backward ACK? */
2233   id = GNUNET_PEER_search (peer);
2234   if (GCP_get_short_id (get_next_hop (c)) == id)
2235   {
2236     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
2237     fc = &c->fwd_fc;
2238     fwd = GNUNET_YES;
2239   }
2240   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2241   {
2242     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
2243     fc = &c->bck_fc;
2244     fwd = GNUNET_NO;
2245   }
2246   else
2247   {
2248     GNUNET_break_op (0);
2249     return GNUNET_OK;
2250   }
2251
2252   ack = ntohl (msg->ack);
2253   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK %u (was %u)\n",
2254               ack, fc->last_ack_recv);
2255   if (GC_is_pid_bigger (ack, fc->last_ack_recv))
2256     fc->last_ack_recv = ack;
2257
2258   /* Cancel polling if the ACK is big enough. */
2259   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
2260       GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2261   {
2262     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
2263     GNUNET_SCHEDULER_cancel (fc->poll_task);
2264     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2265     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
2266   }
2267
2268   connection_unlock_queue (c, fwd);
2269
2270   return GNUNET_OK;
2271 }
2272
2273
2274 /**
2275  * Core handler for cadet network traffic point-to-point ack polls.
2276  *
2277  * @param cls closure
2278  * @param message message
2279  * @param peer peer identity this notification is about
2280  *
2281  * @return GNUNET_OK to keep the connection open,
2282  *         GNUNET_SYSERR to close it (signal serious error)
2283  */
2284 int
2285 GCC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
2286                  const struct GNUNET_MessageHeader *message)
2287 {
2288   struct GNUNET_CADET_Poll *msg;
2289   struct CadetConnection *c;
2290   struct CadetFlowControl *fc;
2291   GNUNET_PEER_Id id;
2292   uint32_t pid;
2293   int fwd;
2294
2295   msg = (struct GNUNET_CADET_Poll *) message;
2296   log_message (message, peer, &msg->cid);
2297   c = connection_get (&msg->cid);
2298   if (NULL == c)
2299   {
2300     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
2301                               GNUNET_NO);
2302     LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL message on unknown connection %s!\n",
2303          GNUNET_h2s (GC_h2hc (&msg->cid)));
2304     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2305     return GNUNET_OK;
2306   }
2307
2308   /* Is this a forward or backward ACK?
2309    * Note: a poll should never be needed in a loopback case,
2310    * since there is no possiblility of packet loss there, so
2311    * this way of discerining FWD/BCK should not be a problem.
2312    */
2313   id = GNUNET_PEER_search (peer);
2314   if (GCP_get_short_id (get_next_hop (c)) == id)
2315   {
2316     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
2317     fc = &c->fwd_fc;
2318   }
2319   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2320   {
2321     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
2322     fc = &c->bck_fc;
2323   }
2324   else
2325   {
2326     GNUNET_break_op (0);
2327     return GNUNET_OK;
2328   }
2329
2330   pid = ntohl (msg->pid);
2331   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
2332   fc->last_pid_recv = pid;
2333   fwd = fc == &c->bck_fc;
2334   GCC_send_ack (c, fwd, GNUNET_YES);
2335
2336   return GNUNET_OK;
2337 }
2338
2339
2340 /**
2341  * Send an ACK on the appropriate connection/channel, depending on
2342  * the direction and the position of the peer.
2343  *
2344  * @param c Which connection to send the hop-by-hop ACK.
2345  * @param fwd Is this a fwd ACK? (will go dest->root).
2346  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
2347  */
2348 void
2349 GCC_send_ack (struct CadetConnection *c, int fwd, int force)
2350 {
2351   unsigned int buffer;
2352
2353   LOG (GNUNET_ERROR_TYPE_DEBUG,
2354        "GMC send %s ACK on %s\n",
2355        GC_f2s (fwd), GCC_2s (c));
2356
2357   if (NULL == c)
2358   {
2359     GNUNET_break (0);
2360     return;
2361   }
2362
2363   if (GNUNET_NO != c->destroy)
2364   {
2365     LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
2366     return;
2367   }
2368
2369   /* Get available buffer space */
2370   if (GCC_is_terminal (c, fwd))
2371   {
2372     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
2373     buffer = GCT_get_channels_buffer (c->t);
2374   }
2375   else
2376   {
2377     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
2378     buffer = GCC_get_buffer (c, fwd);
2379   }
2380   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
2381   if (0 == buffer && GNUNET_NO == force)
2382     return;
2383
2384   /* Send available buffer space */
2385   if (GCC_is_origin (c, fwd))
2386   {
2387     GNUNET_assert (NULL != c->t);
2388     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2389     GCT_unchoke_channels (c->t);
2390   }
2391   else
2392   {
2393     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2394     send_ack (c, buffer, fwd, force);
2395   }
2396 }
2397
2398
2399 /**
2400  * Initialize the connections subsystem
2401  *
2402  * @param c Configuration handle.
2403  */
2404 void
2405 GCC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2406 {
2407   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2408   if (GNUNET_OK !=
2409       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_MSGS_QUEUE",
2410                                              &max_msgs_queue))
2411   {
2412     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2413                                "CADET", "MAX_MSGS_QUEUE", "MISSING");
2414     GNUNET_SCHEDULER_shutdown ();
2415     return;
2416   }
2417
2418   if (GNUNET_OK !=
2419       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_CONNECTIONS",
2420                                              &max_connections))
2421   {
2422     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2423                                "CADET", "MAX_CONNECTIONS", "MISSING");
2424     GNUNET_SCHEDULER_shutdown ();
2425     return;
2426   }
2427
2428   if (GNUNET_OK !=
2429       GNUNET_CONFIGURATION_get_value_time (c, "CADET", "REFRESH_CONNECTION_TIME",
2430                                            &refresh_connection_time))
2431   {
2432     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2433                                "CADET", "REFRESH_CONNECTION_TIME", "MISSING");
2434     GNUNET_SCHEDULER_shutdown ();
2435     return;
2436   }
2437   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2438   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
2439 }
2440
2441
2442 /**
2443  * Destroy each connection on shutdown.
2444  *
2445  * @param cls Closure (unused).
2446  * @param key Current key code (CID, unused).
2447  * @param value Value in the hash map (connection)
2448  *
2449  * @return #GNUNET_YES, because we should continue to iterate,
2450  */
2451 static int
2452 shutdown_iterator (void *cls,
2453                    const struct GNUNET_HashCode *key,
2454                    void *value)
2455 {
2456   struct CadetConnection *c = value;
2457
2458   GCC_destroy (c);
2459   return GNUNET_YES;
2460 }
2461
2462
2463 /**
2464  * Shut down the connections subsystem.
2465  */
2466 void
2467 GCC_shutdown (void)
2468 {
2469   GNUNET_CONTAINER_multihashmap_iterate (connections, &shutdown_iterator, NULL);
2470   GNUNET_CONTAINER_multihashmap_destroy (connections);
2471   connections = NULL;
2472 }
2473
2474
2475 struct CadetConnection *
2476 GCC_new (const struct GNUNET_CADET_Hash *cid,
2477          struct CadetTunnel *t,
2478          struct CadetPeerPath *p,
2479          unsigned int own_pos)
2480 {
2481   struct CadetConnection *c;
2482
2483   c = GNUNET_new (struct CadetConnection);
2484   c->id = *cid;
2485   GNUNET_assert (GNUNET_OK ==
2486                  GNUNET_CONTAINER_multihashmap_put (connections,
2487                                                     GCC_get_h (c), c,
2488                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2489   fc_init (&c->fwd_fc);
2490   fc_init (&c->bck_fc);
2491   c->fwd_fc.c = c;
2492   c->bck_fc.c = c;
2493
2494   c->t = t;
2495   GNUNET_assert (own_pos <= p->length - 1);
2496   c->own_pos = own_pos;
2497   c->path = p;
2498   p->c = c;
2499
2500   if (GNUNET_OK != register_neighbors (c))
2501   {
2502     if (0 == own_pos)
2503     {
2504       path_invalidate (c->path);
2505       c->t = NULL;
2506       c->path = NULL;
2507     }
2508     GCC_destroy (c);
2509     return NULL;
2510   }
2511
2512   return c;
2513 }
2514
2515
2516 void
2517 GCC_destroy (struct CadetConnection *c)
2518 {
2519   if (NULL == c)
2520   {
2521     GNUNET_break (0);
2522     return;
2523   }
2524
2525   if (2 == c->destroy) /* cancel queues -> GCP_queue_cancel -> q_destroy -> */
2526     return;            /* -> message_sent -> GCC_destroy. Don't loop. */
2527   c->destroy = 2;
2528
2529   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GCC_2s (c));
2530   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc's f: %p, b: %p\n",
2531        &c->fwd_fc, &c->bck_fc);
2532   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2533        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2534
2535   /* Cancel all traffic */
2536   if (NULL != c->path)
2537   {
2538     connection_cancel_queues (c, GNUNET_YES);
2539     connection_cancel_queues (c, GNUNET_NO);
2540     unregister_neighbors (c);
2541   }
2542
2543   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2544        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2545
2546   /* Cancel maintainance task (keepalive/timeout) */
2547   if (NULL != c->fwd_fc.poll_msg)
2548   {
2549     GCC_cancel (c->fwd_fc.poll_msg);
2550     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg FWD canceled\n");
2551   }
2552   if (NULL != c->bck_fc.poll_msg)
2553   {
2554     GCC_cancel (c->bck_fc.poll_msg);
2555     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg BCK canceled\n");
2556   }
2557
2558   /* Delete from tunnel */
2559   if (NULL != c->t)
2560     GCT_remove_connection (c->t, c);
2561
2562   if (GNUNET_NO == GCC_is_origin (c, GNUNET_YES) && NULL != c->path)
2563     path_destroy (c->path);
2564   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
2565     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
2566   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
2567     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
2568   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_fc.poll_task)
2569   {
2570     GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
2571     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL FWD canceled\n");
2572   }
2573   if (GNUNET_SCHEDULER_NO_TASK != c->bck_fc.poll_task)
2574   {
2575     GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
2576     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL BCK canceled\n");
2577   }
2578
2579   GNUNET_break (GNUNET_YES ==
2580                 GNUNET_CONTAINER_multihashmap_remove (connections,
2581                                                       GCC_get_h (c), c));
2582
2583   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
2584   GNUNET_free (c);
2585 }
2586
2587 /**
2588  * Get the connection ID.
2589  *
2590  * @param c Connection to get the ID from.
2591  *
2592  * @return ID of the connection.
2593  */
2594 const struct GNUNET_CADET_Hash *
2595 GCC_get_id (const struct CadetConnection *c)
2596 {
2597   return &c->id;
2598 }
2599
2600
2601 /**
2602  * Get the connection ID.
2603  *
2604  * @param c Connection to get the ID from.
2605  *
2606  * @return ID of the connection.
2607  */
2608 const struct GNUNET_HashCode *
2609 GCC_get_h (const struct CadetConnection *c)
2610 {
2611   return GC_h2hc (&c->id);
2612 }
2613
2614
2615 /**
2616  * Get the connection path.
2617  *
2618  * @param c Connection to get the path from.
2619  *
2620  * @return path used by the connection.
2621  */
2622 const struct CadetPeerPath *
2623 GCC_get_path (const struct CadetConnection *c)
2624 {
2625   if (GNUNET_NO == c->destroy)
2626     return c->path;
2627   return NULL;
2628 }
2629
2630
2631 /**
2632  * Get the connection state.
2633  *
2634  * @param c Connection to get the state from.
2635  *
2636  * @return state of the connection.
2637  */
2638 enum CadetConnectionState
2639 GCC_get_state (const struct CadetConnection *c)
2640 {
2641   return c->state;
2642 }
2643
2644 /**
2645  * Get the connection tunnel.
2646  *
2647  * @param c Connection to get the tunnel from.
2648  *
2649  * @return tunnel of the connection.
2650  */
2651 struct CadetTunnel *
2652 GCC_get_tunnel (const struct CadetConnection *c)
2653 {
2654   return c->t;
2655 }
2656
2657
2658 /**
2659  * Get free buffer space in a connection.
2660  *
2661  * @param c Connection.
2662  * @param fwd Is query about FWD traffic?
2663  *
2664  * @return Free buffer space [0 - max_msgs_queue/max_connections]
2665  */
2666 unsigned int
2667 GCC_get_buffer (struct CadetConnection *c, int fwd)
2668 {
2669   struct CadetFlowControl *fc;
2670
2671   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2672
2673   return (fc->queue_max - fc->queue_n);
2674 }
2675
2676 /**
2677  * Get how many messages have we allowed to send to us from a direction.
2678  *
2679  * @param c Connection.
2680  * @param fwd Are we asking about traffic from FWD (BCK messages)?
2681  *
2682  * @return last_ack_sent - last_pid_recv
2683  */
2684 unsigned int
2685 GCC_get_allowed (struct CadetConnection *c, int fwd)
2686 {
2687   struct CadetFlowControl *fc;
2688
2689   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2690   if (GC_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
2691   {
2692     return 0;
2693   }
2694   return (fc->last_ack_sent - fc->last_pid_recv);
2695 }
2696
2697 /**
2698  * Get messages queued in a connection.
2699  *
2700  * @param c Connection.
2701  * @param fwd Is query about FWD traffic?
2702  *
2703  * @return Number of messages queued.
2704  */
2705 unsigned int
2706 GCC_get_qn (struct CadetConnection *c, int fwd)
2707 {
2708   struct CadetFlowControl *fc;
2709
2710   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2711
2712   return fc->queue_n;
2713 }
2714
2715
2716 /**
2717  * Get next PID to use.
2718  *
2719  * @param c Connection.
2720  * @param fwd Is query about FWD traffic?
2721  *
2722  * @return Last PID used + 1.
2723  */
2724 unsigned int
2725 GCC_get_pid (struct CadetConnection *c, int fwd)
2726 {
2727   struct CadetFlowControl *fc;
2728
2729   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2730
2731   return fc->last_pid_sent + 1;
2732 }
2733
2734
2735 /**
2736  * Allow the connection to advertise a buffer of the given size.
2737  *
2738  * The connection will send an @c fwd ACK message (so: in direction !fwd)
2739  * allowing up to last_pid_recv + buffer.
2740  *
2741  * @param c Connection.
2742  * @param buffer How many more messages the connection can accept.
2743  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
2744  */
2745 void
2746 GCC_allow (struct CadetConnection *c, unsigned int buffer, int fwd)
2747 {
2748   LOG (GNUNET_ERROR_TYPE_DEBUG, "  allowing %s %u messages %s\n",
2749        GCC_2s (c), buffer, GC_f2s (fwd));
2750   send_ack (c, buffer, fwd, GNUNET_NO);
2751 }
2752
2753
2754 /**
2755  * Notify other peers on a connection of a broken link. Mark connections
2756  * to destroy after all traffic has been sent.
2757  *
2758  * @param c Connection on which there has been a disconnection.
2759  * @param peer Peer that disconnected.
2760  */
2761 void
2762 GCC_notify_broken (struct CadetConnection *c,
2763                    struct CadetPeer *peer)
2764 {
2765   int fwd;
2766
2767   LOG (GNUNET_ERROR_TYPE_DEBUG,
2768        " notify broken on %s due to %s disconnect\n",
2769        GCC_2s (c), GCP_2s (peer));
2770
2771   fwd = peer == get_prev_hop (c);
2772
2773   if (GNUNET_YES == GCC_is_terminal (c, fwd))
2774   {
2775     /* Local shutdown, no one to notify about this. */
2776     GCC_destroy (c);
2777     return;
2778   }
2779   if (GNUNET_NO == c->destroy)
2780     send_broken (c, &my_full_id, GCP_get_id (peer), fwd);
2781
2782   /* Connection will have at least one pending message
2783    * (the one we just scheduled), so no point in checking whether to
2784    * destroy immediately. */
2785   c->destroy = GNUNET_YES;
2786   c->state = CADET_CONNECTION_DESTROYED;
2787
2788   /**
2789    * Cancel all queues, if no message is left, connection will be destroyed.
2790    */
2791   connection_cancel_queues (c, !fwd);
2792
2793   return;
2794 }
2795
2796
2797 /**
2798  * Is this peer the first one on the connection?
2799  *
2800  * @param c Connection.
2801  * @param fwd Is this about fwd traffic?
2802  *
2803  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
2804  */
2805 int
2806 GCC_is_origin (struct CadetConnection *c, int fwd)
2807 {
2808   if (!fwd && c->path->length - 1 == c->own_pos )
2809     return GNUNET_YES;
2810   if (fwd && 0 == c->own_pos)
2811     return GNUNET_YES;
2812   return GNUNET_NO;
2813 }
2814
2815
2816 /**
2817  * Is this peer the last one on the connection?
2818  *
2819  * @param c Connection.
2820  * @param fwd Is this about fwd traffic?
2821  *            Note that the ROOT is the terminal for BCK traffic!
2822  *
2823  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
2824  */
2825 int
2826 GCC_is_terminal (struct CadetConnection *c, int fwd)
2827 {
2828   return GCC_is_origin (c, !fwd);
2829 }
2830
2831
2832 /**
2833  * See if we are allowed to send by the next hop in the given direction.
2834  *
2835  * @param c Connection.
2836  * @param fwd Is this about fwd traffic?
2837  *
2838  * @return #GNUNET_YES in case it's OK to send.
2839  */
2840 int
2841 GCC_is_sendable (struct CadetConnection *c, int fwd)
2842 {
2843   struct CadetFlowControl *fc;
2844
2845   LOG (GNUNET_ERROR_TYPE_DEBUG, " checking sendability of %s traffic on %s\n",
2846        GC_f2s (fwd), GCC_2s (c));
2847   if (NULL == c)
2848   {
2849     GNUNET_break (0);
2850     return GNUNET_YES;
2851   }
2852   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2853   LOG (GNUNET_ERROR_TYPE_DEBUG,
2854        " last ack recv: %u, last pid sent: %u\n",
2855        fc->last_ack_recv, fc->last_pid_sent);
2856   if (GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2857   {
2858     LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable\n");
2859     return GNUNET_YES;
2860   }
2861   LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n");
2862   return GNUNET_NO;
2863 }
2864
2865
2866 /**
2867  * Check if this connection is a direct one (never trim a direct connection).
2868  *
2869  * @param c Connection.
2870  *
2871  * @return #GNUNET_YES in case it's a direct connection, #GNUNET_NO otherwise.
2872  */
2873 int
2874 GCC_is_direct (struct CadetConnection *c)
2875 {
2876   return (c->path->length == 2) ? GNUNET_YES : GNUNET_NO;
2877 }
2878
2879 /**
2880  * Sends an already built message on a connection, properly registering
2881  * all used resources.
2882  *
2883  * @param message Message to send. Function makes a copy of it.
2884  *                If message is not hop-by-hop, decrements TTL of copy.
2885  * @param payload_type Type of payload, in case the message is encrypted.
2886  * @param c Connection on which this message is transmitted.
2887  * @param fwd Is this a fwd message?
2888  * @param force Force the connection to accept the message (buffer overfill).
2889  * @param cont Continuation called once message is sent. Can be NULL.
2890  * @param cont_cls Closure for @c cont.
2891  *
2892  * @return Handle to cancel the message before it's sent.
2893  *         NULL on error or if @c cont is NULL.
2894  *         Invalid on @c cont call.
2895  */
2896 struct CadetConnectionQueue *
2897 GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2898                            uint16_t payload_type, uint32_t payload_id,
2899                            struct CadetConnection *c, int fwd, int force,
2900                            GCC_sent cont, void *cont_cls)
2901 {
2902   struct CadetFlowControl *fc;
2903   struct CadetConnectionQueue *q;
2904   void *data;
2905   size_t size;
2906   uint16_t type;
2907   int droppable;
2908
2909   size = ntohs (message->size);
2910   data = GNUNET_malloc (size);
2911   memcpy (data, message, size);
2912   type = ntohs (message->type);
2913   LOG (GNUNET_ERROR_TYPE_INFO, "--> %s (%s %u) on connection %s (%u bytes)\n",
2914        GC_m2s (type), GC_m2s (payload_type), payload_id, GCC_2s (c), size);
2915
2916   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2917   droppable = GNUNET_NO == force;
2918   switch (type)
2919   {
2920     struct GNUNET_CADET_Encrypted *emsg;
2921     struct GNUNET_CADET_KX        *kmsg;
2922     struct GNUNET_CADET_ACK       *amsg;
2923     struct GNUNET_CADET_Poll      *pmsg;
2924     struct GNUNET_CADET_ConnectionDestroy *dmsg;
2925     struct GNUNET_CADET_ConnectionBroken  *bmsg;
2926     uint32_t ttl;
2927
2928     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
2929       emsg = (struct GNUNET_CADET_Encrypted *) data;
2930       ttl = ntohl (emsg->ttl);
2931       if (0 == ttl)
2932       {
2933         GNUNET_break_op (0);
2934         GNUNET_free (data);
2935         return NULL;
2936       }
2937       emsg->cid = c->id;
2938       emsg->ttl = htonl (ttl - 1);
2939       emsg->pid = htonl (0);
2940       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
2941       LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
2942       LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
2943       if (GNUNET_YES == droppable)
2944       {
2945         fc->queue_n++;
2946       }
2947       else
2948       {
2949         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not droppable, Q_N stays the same\n");
2950       }
2951       if (GC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
2952       {
2953         GCC_start_poll (c, fwd);
2954       }
2955       break;
2956
2957     case GNUNET_MESSAGE_TYPE_CADET_KX:
2958       kmsg = (struct GNUNET_CADET_KX *) data;
2959       kmsg->cid = c->id;
2960       break;
2961
2962     case GNUNET_MESSAGE_TYPE_CADET_ACK:
2963       amsg = (struct GNUNET_CADET_ACK *) data;
2964       amsg->cid = c->id;
2965       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
2966       droppable = GNUNET_NO;
2967       break;
2968
2969     case GNUNET_MESSAGE_TYPE_CADET_POLL:
2970       pmsg = (struct GNUNET_CADET_Poll *) data;
2971       pmsg->cid = c->id;
2972       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
2973       droppable = GNUNET_NO;
2974       break;
2975
2976     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
2977       dmsg = (struct GNUNET_CADET_ConnectionDestroy *) data;
2978       dmsg->cid = c->id;
2979       break;
2980
2981     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
2982       bmsg = (struct GNUNET_CADET_ConnectionBroken *) data;
2983       bmsg->cid = c->id;
2984       break;
2985
2986     case GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE:
2987       GNUNET_break (0);
2988       /* falltrough */
2989     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
2990     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
2991       break;
2992
2993     default:
2994       GNUNET_break (0);
2995       GNUNET_free (data);
2996       return NULL;
2997   }
2998
2999   if (fc->queue_n > fc->queue_max && droppable)
3000   {
3001     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
3002                               1, GNUNET_NO);
3003     GNUNET_break (0);
3004     LOG (GNUNET_ERROR_TYPE_DEBUG, "queue full: %u/%u\n",
3005          fc->queue_n, fc->queue_max);
3006     if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == type)
3007     {
3008       fc->queue_n--;
3009     }
3010     GNUNET_free (data);
3011     return NULL; /* Drop this message */
3012   }
3013
3014   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %s %u\n",
3015        GCC_2s (c), c->pending_messages);
3016   c->pending_messages++;
3017
3018   q = GNUNET_new (struct CadetConnectionQueue);
3019   q->forced = !droppable;
3020   q->q = GCP_queue_add (get_hop (c, fwd), data, type, payload_type, payload_id,
3021                         size, c, fwd, &conn_message_sent, q);
3022   if (NULL == q->q)
3023   {
3024     LOG (GNUNET_ERROR_TYPE_DEBUG, "dropping msg on %s, NULL q\n", GCC_2s (c));
3025     GNUNET_free (data);
3026     GNUNET_free (q);
3027     return NULL;
3028   }
3029   q->cont = cont;
3030   q->cont_cls = cont_cls;
3031   return (NULL == cont) ? NULL : q;
3032 }
3033
3034
3035 /**
3036  * Cancel a previously sent message while it's in the queue.
3037  *
3038  * ONLY can be called before the continuation given to the send function
3039  * is called. Once the continuation is called, the message is no longer in the
3040  * queue.
3041  *
3042  * @param q Handle to the queue.
3043  */
3044 void
3045 GCC_cancel (struct CadetConnectionQueue *q)
3046 {
3047   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GMC cancel message\n");
3048
3049   /* queue destroy calls message_sent, which calls q->cont and frees q */
3050   GCP_queue_destroy (q->q, GNUNET_YES, GNUNET_NO, 0);
3051 }
3052
3053
3054 /**
3055  * Sends a CREATE CONNECTION message for a path to a peer.
3056  * Changes the connection and tunnel states if necessary.
3057  *
3058  * @param connection Connection to create.
3059  */
3060 void
3061 GCC_send_create (struct CadetConnection *connection)
3062 {
3063   enum CadetTunnelCState state;
3064   size_t size;
3065
3066   size = sizeof (struct GNUNET_CADET_ConnectionCreate);
3067   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
3068
3069   LOG (GNUNET_ERROR_TYPE_INFO, "===> %s on connection %s  (%u bytes)\n",
3070        GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE),
3071        GCC_2s (connection), size);
3072   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
3073        connection, connection->pending_messages);
3074   connection->pending_messages++;
3075
3076   connection->maintenance_q =
3077     GCP_queue_add (get_next_hop (connection), NULL,
3078                    GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0, 0,
3079                    size, connection, GNUNET_YES, &conn_message_sent, NULL);
3080
3081   state = GCT_get_cstate (connection->t);
3082   if (CADET_TUNNEL_SEARCHING == state || CADET_TUNNEL_NEW == state)
3083     GCT_change_cstate (connection->t, CADET_TUNNEL_WAITING);
3084   if (CADET_CONNECTION_NEW == connection->state)
3085     connection_change_state (connection, CADET_CONNECTION_SENT);
3086 }
3087
3088
3089 /**
3090  * Send a message to all peers in this connection that the connection
3091  * is no longer valid.
3092  *
3093  * If some peer should not receive the message, it should be zero'ed out
3094  * before calling this function.
3095  *
3096  * @param c The connection whose peers to notify.
3097  */
3098 void
3099 GCC_send_destroy (struct CadetConnection *c)
3100 {
3101   struct GNUNET_CADET_ConnectionDestroy msg;
3102
3103   if (GNUNET_YES == c->destroy)
3104     return;
3105
3106   msg.header.size = htons (sizeof (msg));
3107   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
3108   msg.cid = c->id;
3109   LOG (GNUNET_ERROR_TYPE_DEBUG,
3110               "  sending connection destroy for connection %s\n",
3111               GCC_2s (c));
3112
3113   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_YES))
3114     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c,
3115                                                       GNUNET_YES, GNUNET_YES,
3116                                                       NULL, NULL));
3117   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_NO))
3118     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c,
3119                                                       GNUNET_NO, GNUNET_YES,
3120                                                       NULL, NULL));
3121   c->destroy = GNUNET_YES;
3122   c->state = CADET_CONNECTION_DESTROYED;
3123 }
3124
3125
3126 /**
3127  * @brief Start a polling timer for the connection.
3128  *
3129  * When a neighbor does not accept more traffic on the connection it could be
3130  * caused by a simple congestion or by a lost ACK. Polling enables to check
3131  * for the lastest ACK status for a connection.
3132  *
3133  * @param c Connection.
3134  * @param fwd Should we poll in the FWD direction?
3135  */
3136 void
3137 GCC_start_poll (struct CadetConnection *c, int fwd)
3138 {
3139   struct CadetFlowControl *fc;
3140
3141   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3142   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL %s requested\n",
3143        GC_f2s (fwd));
3144   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task || NULL != fc->poll_msg)
3145   {
3146     LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   not needed (%u, %p)\n",
3147          fc->poll_task, fc->poll_msg);
3148     return;
3149   }
3150   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL started on request\n");
3151   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
3152                                                 &connection_poll,
3153                                                 fc);
3154 }
3155
3156
3157 /**
3158  * @brief Stop polling a connection for ACKs.
3159  *
3160  * Once we have enough ACKs for future traffic, polls are no longer necessary.
3161  *
3162  * @param c Connection.
3163  * @param fwd Should we stop the poll in the FWD direction?
3164  */
3165 void
3166 GCC_stop_poll (struct CadetConnection *c, int fwd)
3167 {
3168   struct CadetFlowControl *fc;
3169
3170   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3171   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
3172   {
3173     GNUNET_SCHEDULER_cancel (fc->poll_task);
3174     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
3175   }
3176 }
3177
3178 /**
3179  * Get a (static) string for a connection.
3180  *
3181  * @param c Connection.
3182  */
3183 const char *
3184 GCC_2s (const struct CadetConnection *c)
3185 {
3186   if (NULL == c)
3187     return "NULL";
3188
3189   if (NULL != c->t)
3190   {
3191     static char buf[128];
3192
3193     sprintf (buf, "%s (->%s)",
3194              GNUNET_h2s (GC_h2hc (GCC_get_id (c))), GCT_2s (c->t));
3195     return buf;
3196   }
3197   return GNUNET_h2s (GC_h2hc (&c->id));
3198 }
3199
3200
3201 /**
3202  * Log all possible info about the connection state.
3203  *
3204  * @param c Connection to debug.
3205  * @param level Debug level to use.
3206  */
3207 void
3208 GCC_debug (const struct CadetConnection *c, enum GNUNET_ErrorType level)
3209 {
3210   int do_log;
3211   char *s;
3212
3213   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
3214                                        "cadet-con",
3215                                        __FILE__, __FUNCTION__, __LINE__);
3216   if (0 == do_log)
3217     return;
3218
3219   if (NULL == c)
3220   {
3221     LOG2 (level, "CCC DEBUG NULL CONNECTION\n");
3222     return;
3223   }
3224
3225   LOG2 (level, "CCC DEBUG CONNECTION %s\n", GCC_2s (c));
3226   s = path_2s (c->path);
3227   LOG2 (level, "CCC  path %s, own pos: %u\n", s, c->own_pos);
3228   GNUNET_free (s);
3229   LOG2 (level, "CCC  state: %s, destroy: %u\n",
3230         GCC_state2s (c->state), c->destroy);
3231   LOG2 (level, "CCC  pending messages: %u\n", c->pending_messages);
3232   if (NULL != c->perf)
3233     LOG2 (level, "CCC  us/byte: %f\n", c->perf->avg);
3234
3235   LOG2 (level, "CCC  FWD flow control:\n");
3236   LOG2 (level, "CCC   queue: %u/%u\n", c->fwd_fc.queue_n, c->fwd_fc.queue_max);
3237   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3238         c->fwd_fc.last_pid_sent, c->fwd_fc.last_pid_recv);
3239   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3240         c->fwd_fc.last_ack_sent, c->fwd_fc.last_ack_recv);
3241   LOG2 (level, "CCC   POLL: task %d, msg  %p, msg_ack %p)\n",
3242         c->fwd_fc.poll_task, c->fwd_fc.poll_msg, c->fwd_fc.ack_msg);
3243
3244   LOG2 (level, "CCC  BCK flow control:\n");
3245   LOG2 (level, "CCC   queue: %u/%u\n", c->bck_fc.queue_n, c->bck_fc.queue_max);
3246   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3247         c->bck_fc.last_pid_sent, c->bck_fc.last_pid_recv);
3248   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3249         c->bck_fc.last_ack_sent, c->bck_fc.last_ack_recv);
3250   LOG2 (level, "CCC   POLL: task %d, msg  %p, msg_ack %p)\n",
3251         c->bck_fc.poll_task, c->bck_fc.poll_msg, c->bck_fc.ack_msg);
3252
3253   LOG2 (level, "CCC DEBUG CONNECTION END\n");
3254 }
3255