- fix
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_connection.c
1 /*
2      This file is part of GNUnet.
3      (C) 2001-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file cadet/gnunet-service-cadet_connection.c
23  * @brief GNUnet CADET service connection handling
24  * @author Bartlomiej Polot
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30 #include "gnunet_statistics_service.h"
31
32 #include "cadet_path.h"
33 #include "cadet_protocol.h"
34 #include "cadet.h"
35 #include "gnunet-service-cadet_connection.h"
36 #include "gnunet-service-cadet_peer.h"
37 #include "gnunet-service-cadet_tunnel.h"
38
39
40 #define LOG(level, ...) GNUNET_log_from (level,"cadet-con",__VA_ARGS__)
41 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
42
43
44 #define CADET_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
45                                   GNUNET_TIME_UNIT_MINUTES,\
46                                   10)
47 #define AVG_MSGS                32
48
49
50 /******************************************************************************/
51 /********************************   STRUCTS  **********************************/
52 /******************************************************************************/
53
54 /**
55  * Struct to encapsulate all the Flow Control information to a peer to which
56  * we are directly connected (on a core level).
57  */
58 struct CadetFlowControl
59 {
60   /**
61    * Connection this controls.
62    */
63   struct CadetConnection *c;
64
65   /**
66    * How many messages are in the queue on this connection.
67    */
68   unsigned int queue_n;
69
70   /**
71    * How many messages do we accept in the queue.
72    */
73   unsigned int queue_max;
74
75   /**
76    * ID of the last packet sent towards the peer.
77    */
78   uint32_t last_pid_sent;
79
80   /**
81    * ID of the last packet received from the peer.
82    */
83   uint32_t last_pid_recv;
84
85   /**
86    * Last ACK sent to the peer (peer can't send more than this PID).
87    */
88   uint32_t last_ack_sent;
89
90   /**
91    * Last ACK sent towards the origin (for traffic towards leaf node).
92    */
93   uint32_t last_ack_recv;
94
95   /**
96    * Task to poll the peer in case of a lost ACK causes stall.
97    */
98   GNUNET_SCHEDULER_TaskIdentifier poll_task;
99
100   /**
101    * How frequently to poll for ACKs.
102    */
103   struct GNUNET_TIME_Relative poll_time;
104
105   /**
106    * Queued poll message, to cancel if not necessary anymore (got ACK).
107    */
108   struct CadetConnectionQueue *poll_msg;
109
110   /**
111    * Queued poll message, to cancel if not necessary anymore (got ACK).
112    */
113   struct CadetConnectionQueue *ack_msg;
114 };
115
116 /**
117  * Keep a record of the last messages sent on this connection.
118  */
119 struct CadetConnectionPerformance
120 {
121   /**
122    * Circular buffer for storing measurements.
123    */
124   double usecsperbyte[AVG_MSGS];
125
126   /**
127    * Running average of @c usecsperbyte.
128    */
129   double avg;
130
131   /**
132    * How many values of @c usecsperbyte are valid.
133    */
134   uint16_t size;
135
136   /**
137    * Index of the next "free" position in @c usecsperbyte.
138    */
139   uint16_t idx;
140 };
141
142
143 /**
144  * Struct containing all information regarding a connection to a peer.
145  */
146 struct CadetConnection
147 {
148   /**
149    * Tunnel this connection is part of.
150    */
151   struct CadetTunnel *t;
152
153   /**
154    * Flow control information for traffic fwd.
155    */
156   struct CadetFlowControl fwd_fc;
157
158   /**
159    * Flow control information for traffic bck.
160    */
161   struct CadetFlowControl bck_fc;
162
163   /**
164    * Measure connection performance on the endpoint.
165    */
166   struct CadetConnectionPerformance *perf;
167
168   /**
169    * ID of the connection.
170    */
171   struct GNUNET_CADET_Hash id;
172
173   /**
174    * State of the connection.
175    */
176   enum CadetConnectionState state;
177
178   /**
179    * Path being used for the tunnel. At the origin of the connection
180    * it's a pointer to the destination's path pool, otherwise just a copy.
181    */
182   struct CadetPeerPath *path;
183
184   /**
185    * Position of the local peer in the path.
186    */
187   unsigned int own_pos;
188
189   /**
190    * Task to keep the used paths alive at the owner,
191    * time tunnel out on all the other peers.
192    */
193   GNUNET_SCHEDULER_TaskIdentifier fwd_maintenance_task;
194
195   /**
196    * Task to keep the used paths alive at the destination,
197    * time tunnel out on all the other peers.
198    */
199   GNUNET_SCHEDULER_TaskIdentifier bck_maintenance_task;
200
201   /**
202    * Queue handle for maintainance traffic. One handle for FWD and BCK since
203    * one peer never needs to maintain both directions (no loopback connections).
204    */
205   struct CadetPeerQueue *maintenance_q;
206
207   /**
208    * Counter to do exponential backoff when creating a connection (max 64).
209    */
210   unsigned short create_retry;
211
212   /**
213    * Pending message count.
214    */
215   int pending_messages;
216
217   /**
218    * Destroy flag: if true, destroy on last message.
219    */
220   int destroy;
221 };
222
223 /**
224  * Handle for messages queued but not yet sent.
225  */
226 struct CadetConnectionQueue
227 {
228   /**
229    * Peer queue handle, to cancel if necessary.
230    */
231   struct CadetPeerQueue *q;
232
233   /**
234    * Was this a forced message? (Do not account for it)
235    */
236   int forced;
237
238   /**
239    * Continuation to call once sent.
240    */
241   GCC_sent cont;
242
243   /**
244    * Closure for @c cont.
245    */
246   void *cont_cls;
247 };
248
249 /******************************************************************************/
250 /*******************************   GLOBALS  ***********************************/
251 /******************************************************************************/
252
253 /**
254  * Global handle to the statistics service.
255  */
256 extern struct GNUNET_STATISTICS_Handle *stats;
257
258 /**
259  * Local peer own ID (memory efficient handle).
260  */
261 extern GNUNET_PEER_Id myid;
262
263 /**
264  * Local peer own ID (full value).
265  */
266 extern struct GNUNET_PeerIdentity my_full_id;
267
268 /**
269  * Connections known, indexed by cid (CadetConnection).
270  */
271 static struct GNUNET_CONTAINER_MultiHashMap *connections;
272
273 /**
274  * How many connections are we willing to maintain.
275  * Local connections are always allowed, even if there are more connections than max.
276  */
277 static unsigned long long max_connections;
278
279 /**
280  * How many messages *in total* are we willing to queue, divide by number of
281  * connections to get connection queue size.
282  */
283 static unsigned long long max_msgs_queue;
284
285 /**
286  * How often to send path keepalives. Paths timeout after 4 missed.
287  */
288 static struct GNUNET_TIME_Relative refresh_connection_time;
289
290 /**
291  * How often to send path create / ACKs.
292  */
293 static struct GNUNET_TIME_Relative create_connection_time;
294
295
296 /******************************************************************************/
297 /********************************   STATIC  ***********************************/
298 /******************************************************************************/
299
300 #if 0 // avoid compiler warning for unused static function
301 static void
302 fc_debug (struct CadetFlowControl *fc)
303 {
304   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
305               fc->last_pid_recv, fc->last_ack_sent);
306   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
307               fc->last_pid_sent, fc->last_ack_recv);
308   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
309               fc->queue_n, fc->queue_max);
310 }
311
312 static void
313 connection_debug (struct CadetConnection *c)
314 {
315   if (NULL == c)
316   {
317     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CONNECTION ***\n");
318     return;
319   }
320   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
321               peer2s (c->t->peer), GCC_2s (c));
322   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
323               c->state, c->pending_messages);
324   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
325   fc_debug (&c->fwd_fc);
326   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
327   fc_debug (&c->bck_fc);
328 }
329 #endif
330
331
332 /**
333  * Schedule next keepalive task, taking in consideration
334  * the connection state and number of retries.
335  *
336  * @param c Connection for which to schedule the next keepalive.
337  * @param fwd Direction for the next keepalive.
338  */
339 static void
340 schedule_next_keepalive (struct CadetConnection *c, int fwd);
341
342
343 /**
344  * Resets the connection timeout task, some other message has done the
345  * task's job.
346  * - For the first peer on the direction this means to send
347  *   a keepalive or a path confirmation message (either create or ACK).
348  * - For all other peers, this means to destroy the connection,
349  *   due to lack of activity.
350  * Starts the timeout if no timeout was running (connection just created).
351  *
352  * @param c Connection whose timeout to reset.
353  * @param fwd Is this forward?
354  */
355 static void
356 connection_reset_timeout (struct CadetConnection *c, int fwd);
357
358
359 /**
360  * Get string description for tunnel state. Reentrant.
361  *
362  * @param s Tunnel state.
363  *
364  * @return String representation.
365  */
366 static const char *
367 GCC_state2s (enum CadetConnectionState s)
368 {
369   switch (s)
370   {
371     case CADET_CONNECTION_NEW:
372       return "CADET_CONNECTION_NEW";
373     case CADET_CONNECTION_SENT:
374       return "CADET_CONNECTION_SENT";
375     case CADET_CONNECTION_ACK:
376       return "CADET_CONNECTION_ACK";
377     case CADET_CONNECTION_READY:
378       return "CADET_CONNECTION_READY";
379     case CADET_CONNECTION_DESTROYED:
380       return "CADET_CONNECTION_DESTROYED";
381     default:
382       GNUNET_break (0);
383       LOG (GNUNET_ERROR_TYPE_ERROR, " conn state %u unknown!\n", s);
384       return "CADET_CONNECTION_STATE_ERROR";
385   }
386 }
387
388
389 /**
390  * Initialize a Flow Control structure to the initial state.
391  *
392  * @param fc Flow Control structure to initialize.
393  */
394 static void
395 fc_init (struct CadetFlowControl *fc)
396 {
397   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
398   fc->last_pid_recv = (uint32_t) -1;
399   fc->last_ack_sent = (uint32_t) 0;
400   fc->last_ack_recv = (uint32_t) 0;
401   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
402   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
403   fc->queue_n = 0;
404   fc->queue_max = (max_msgs_queue / max_connections) + 1;
405 }
406
407
408 /**
409  * Find a connection.
410  *
411  * @param cid Connection ID.
412  */
413 static struct CadetConnection *
414 connection_get (const struct GNUNET_CADET_Hash *cid)
415 {
416   return GNUNET_CONTAINER_multihashmap_get (connections, GC_h2hc (cid));
417 }
418
419
420 static void
421 connection_change_state (struct CadetConnection* c,
422                          enum CadetConnectionState state)
423 {
424   LOG (GNUNET_ERROR_TYPE_DEBUG,
425        "Connection %s state %s -> %s\n",
426        GCC_2s (c), GCC_state2s (c->state), GCC_state2s (state));
427   if (CADET_CONNECTION_DESTROYED == c->state)
428   {
429     LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
430     return;
431   }
432   c->state = state;
433   if (CADET_CONNECTION_READY == state)
434     c->create_retry = 1;
435 }
436
437
438 /**
439  * Callback called when a queued ACK message is sent.
440  *
441  * @param cls Closure (FC).
442  * @param c Connection this message was on.
443  * @param q Queue handler this call invalidates.
444  * @param type Type of message sent.
445  * @param fwd Was this a FWD going message?
446  * @param size Size of the message.
447  */
448 static void
449 ack_sent (void *cls,
450           struct CadetConnection *c,
451           struct CadetConnectionQueue *q,
452           uint16_t type, int fwd, size_t size)
453 {
454   struct CadetFlowControl *fc = cls;
455
456   fc->ack_msg = NULL;
457 }
458
459
460 /**
461  * Send an ACK on the connection, informing the predecessor about
462  * the available buffer space. Should not be called in case the peer
463  * is origin (no predecessor) in the @c fwd direction.
464  *
465  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
466  * the ACK itself goes "back" (dest->root).
467  *
468  * @param c Connection on which to send the ACK.
469  * @param buffer How much space free to advertise?
470  * @param fwd Is this FWD ACK? (Going dest -> root)
471  * @param force Don't optimize out.
472  */
473 static void
474 send_ack (struct CadetConnection *c, unsigned int buffer, int fwd, int force)
475 {
476   struct CadetFlowControl *next_fc;
477   struct CadetFlowControl *prev_fc;
478   struct GNUNET_CADET_ACK msg;
479   uint32_t ack;
480   int delta;
481
482   /* If origin, there is no connection to send ACKs. Wrong function! */
483   if (GCC_is_origin (c, fwd))
484   {
485     LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s is origin in %s\n",
486          GCC_2s (c), GC_f2s (fwd));
487     GNUNET_break (0);
488     return;
489   }
490
491   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
492   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
493
494   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection send %s ack on %s\n",
495        GC_f2s (fwd), GCC_2s (c));
496
497   /* Check if we need to transmit the ACK. */
498   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
499   if (3 < delta && buffer < delta && GNUNET_NO == force)
500   {
501     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
502     LOG (GNUNET_ERROR_TYPE_DEBUG,
503          "  last pid recv: %u, last ack sent: %u\n",
504          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
505     return;
506   }
507
508   /* Ok, ACK might be necessary, what PID to ACK? */
509   ack = prev_fc->last_pid_recv + buffer;
510   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
511   LOG (GNUNET_ERROR_TYPE_DEBUG,
512        " last pid %u, last ack %u, qmax %u, q %u\n",
513        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
514        next_fc->queue_max, next_fc->queue_n);
515   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
516   {
517     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
518     return;
519   }
520
521   /* Check if message is already in queue */
522   if (NULL != prev_fc->ack_msg)
523   {
524     if (GC_is_pid_bigger (ack, prev_fc->last_ack_sent))
525     {
526       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
527       GCC_cancel (prev_fc->ack_msg);
528       /* GCC_cancel triggers ack_sent(), which clears fc->ack_msg */
529     }
530     else
531     {
532       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
533       return;
534     }
535   }
536
537   prev_fc->last_ack_sent = ack;
538
539   /* Build ACK message and send on connection */
540   msg.header.size = htons (sizeof (msg));
541   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_ACK);
542   msg.ack = htonl (ack);
543   msg.cid = c->id;
544
545   prev_fc->ack_msg = GCC_send_prebuilt_message (&msg.header, 0, ack, c,
546                                                 !fwd, GNUNET_YES,
547                                                 &ack_sent, prev_fc);
548   GNUNET_assert (NULL != prev_fc->ack_msg);
549 }
550
551
552 /**
553  * Callback called when a connection queued message is sent.
554  *
555  * Calculates the average time and connection packet tracking.
556  *
557  * @param cls Closure (ConnectionQueue Handle).
558  * @param c Connection this message was on.
559  * @param sent Was it really sent? (Could have been canceled)
560  * @param type Type of message sent.
561  * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
562  * @param fwd Was this a FWD going message?
563  * @param size Size of the message.
564  * @param wait Time spent waiting for core (only the time for THIS message)
565  */
566 static void
567 conn_message_sent (void *cls,
568                    struct CadetConnection *c, int sent,
569                    uint16_t type, uint32_t pid, int fwd, size_t size,
570                    struct GNUNET_TIME_Relative wait)
571 {
572   struct CadetConnectionPerformance *p;
573   struct CadetFlowControl *fc;
574   struct CadetConnectionQueue *q = cls;
575   double usecsperbyte;
576   int forced;
577
578   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
579
580   fc = fwd ? &c->fwd_fc : &c->bck_fc;
581   LOG (GNUNET_ERROR_TYPE_DEBUG, " %ssent %s %s\n",
582        sent ? "" : "not ", GC_f2s (fwd), GC_m2s (type));
583   if (NULL != q)
584   {
585     forced = q->forced;
586     if (NULL != q->cont)
587     {
588       LOG (GNUNET_ERROR_TYPE_DEBUG, " calling cont\n");
589       q->cont (q->cont_cls, c, q, type, fwd, size);
590     }
591     GNUNET_free (q);
592   }
593   else if (type == GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED)
594   {
595     /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
596     forced = GNUNET_YES;
597   }
598   else
599   {
600     forced = GNUNET_NO;
601   }
602   if (NULL == c)
603   {
604     if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
605         && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
606     {
607       LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
608            GC_m2s (type));
609     }
610     return;
611   }
612   LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P- %p %u\n", c, c->pending_messages);
613   c->pending_messages--;
614   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
615   {
616     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
617     GCC_destroy (c);
618     return;
619   }
620   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
621   switch (type)
622   {
623     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
624     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
625       c->maintenance_q = NULL;
626       /* Don't trigger a keepalive for sent ACKs, only SYN and SYNACKs */
627       if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE == type || !fwd)
628         schedule_next_keepalive (c, fwd);
629       break;
630
631     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
632       if (GNUNET_YES == sent)
633       {
634         GNUNET_assert (NULL != q);
635         fc->last_pid_sent = pid; // FIXME
636         GCC_send_ack (c, fwd, GNUNET_NO);
637         connection_reset_timeout (c, fwd);
638       }
639
640       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
641       if (GNUNET_NO == forced)
642       {
643         fc->queue_n--;
644         LOG (GNUNET_ERROR_TYPE_DEBUG,
645             "!   accounting pid %u\n",
646             fc->last_pid_sent);
647       }
648       else
649       {
650         LOG (GNUNET_ERROR_TYPE_DEBUG,
651              "!   forced, Q_N not accounting pid %u\n",
652              fc->last_pid_sent);
653       }
654       break;
655
656     case GNUNET_MESSAGE_TYPE_CADET_KX:
657       if (GNUNET_YES == sent)
658         connection_reset_timeout (c, fwd);
659       break;
660
661     case GNUNET_MESSAGE_TYPE_CADET_POLL:
662       fc->poll_msg = NULL;
663       break;
664
665     case GNUNET_MESSAGE_TYPE_CADET_ACK:
666       fc->ack_msg = NULL;
667       break;
668
669     default:
670       break;
671   }
672   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
673
674   if (NULL == c->perf)
675     return; /* Only endpoints are interested in timing. */
676
677   p = c->perf;
678   usecsperbyte = ((double) wait.rel_value_us) / size;
679   if (p->size == AVG_MSGS)
680   {
681     /* Array is full. Substract oldest value, add new one and store. */
682     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
683     p->usecsperbyte[p->idx] = usecsperbyte;
684     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
685   }
686   else
687   {
688     /* Array not yet full. Add current value to avg and store. */
689     p->usecsperbyte[p->idx] = usecsperbyte;
690     p->avg *= p->size;
691     p->avg += p->usecsperbyte[p->idx];
692     p->size++;
693     p->avg /= p->size;
694   }
695   p->idx = (p->idx + 1) % AVG_MSGS;
696 }
697
698
699 /**
700  * Get the previous hop in a connection
701  *
702  * @param c Connection.
703  *
704  * @return Previous peer in the connection.
705  */
706 static struct CadetPeer *
707 get_prev_hop (const struct CadetConnection *c)
708 {
709   GNUNET_PEER_Id id;
710
711   LOG (GNUNET_ERROR_TYPE_DEBUG, " get prev hop %s [%u/%u]\n",
712        GCC_2s (c), c->own_pos, c->path->length);
713   if (0 == c->own_pos || c->path->length < 2)
714     id = c->path->peers[0];
715   else
716     id = c->path->peers[c->own_pos - 1];
717
718   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
719        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
720
721   return GCP_get_short (id);
722 }
723
724
725 /**
726  * Get the next hop in a connection
727  *
728  * @param c Connection.
729  *
730  * @return Next peer in the connection.
731  */
732 static struct CadetPeer *
733 get_next_hop (const struct CadetConnection *c)
734 {
735   GNUNET_PEER_Id id;
736
737   LOG (GNUNET_ERROR_TYPE_DEBUG, " get next hop %s [%u/%u]\n",
738        GCC_2s (c), c->own_pos, c->path->length);
739   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
740     id = c->path->peers[c->path->length - 1];
741   else
742     id = c->path->peers[c->own_pos + 1];
743
744   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
745        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
746
747   return GCP_get_short (id);
748 }
749
750
751 /**
752  * Get the hop in a connection.
753  *
754  * @param c Connection.
755  * @param fwd Next hop?
756  *
757  * @return Next peer in the connection.
758  */
759 static struct CadetPeer *
760 get_hop (struct CadetConnection *c, int fwd)
761 {
762   if (fwd)
763     return get_next_hop (c);
764   return get_prev_hop (c);
765 }
766
767
768 /**
769  * Is traffic coming from this sender 'FWD' traffic?
770  *
771  * @param c Connection to check.
772  * @param sender Peer identity of neighbor.
773  *
774  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
775  *         the traffic is 'FWD'.
776  *         #GNUNET_NO for BCK.
777  *         #GNUNET_SYSERR for errors.
778  */
779 static int
780 is_fwd (const struct CadetConnection *c,
781         const struct GNUNET_PeerIdentity *sender)
782 {
783   GNUNET_PEER_Id id;
784
785   id = GNUNET_PEER_search (sender);
786   if (GCP_get_short_id (get_prev_hop (c)) == id)
787     return GNUNET_YES;
788
789   if (GCP_get_short_id (get_next_hop (c)) == id)
790     return GNUNET_NO;
791
792   GNUNET_break (0);
793   return GNUNET_SYSERR;
794 }
795
796
797 /**
798  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
799  * or a first CONNECTION_ACK directed to us.
800  *
801  * @param connection Connection to confirm.
802  * @param fwd Should we send it FWD? (root->dest)
803  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
804  */
805 static void
806 send_connection_ack (struct CadetConnection *connection, int fwd)
807 {
808   struct CadetTunnel *t;
809
810   t = connection->t;
811   LOG (GNUNET_ERROR_TYPE_INFO, "===> {%14s ACK} on connection %s\n",
812        GC_f2s (!fwd), GCC_2s (connection));
813   GCP_queue_add (get_hop (connection, fwd), NULL,
814                  GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, 0, 0,
815                  sizeof (struct GNUNET_CADET_ConnectionACK),
816                  connection, fwd, &conn_message_sent, NULL);
817   connection->pending_messages++;
818   if (CADET_TUNNEL_NEW == GCT_get_cstate (t))
819     GCT_change_cstate (t, CADET_TUNNEL_WAITING);
820   if (CADET_CONNECTION_READY != connection->state)
821     connection_change_state (connection, CADET_CONNECTION_SENT);
822 }
823
824
825 /**
826  * Send a notification that a connection is broken.
827  *
828  * @param c Connection that is broken.
829  * @param id1 Peer that has disconnected.
830  * @param id2 Peer that has disconnected.
831  * @param fwd Direction towards which to send it.
832  */
833 static void
834 send_broken (struct CadetConnection *c,
835              const struct GNUNET_PeerIdentity *id1,
836              const struct GNUNET_PeerIdentity *id2,
837              int fwd)
838 {
839   struct GNUNET_CADET_ConnectionBroken msg;
840
841   msg.header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
842   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
843   msg.cid = c->id;
844   msg.peer1 = *id1;
845   msg.peer2 = *id2;
846   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c, fwd,
847                                                     GNUNET_YES, NULL, NULL));
848 }
849
850
851 /**
852  * Send a notification that a connection is broken, when a connection
853  * isn't even known to the local peer.
854  *
855  * @param connection_id Connection ID.
856  * @param id1 Peer that has disconnected, probably local peer.
857  * @param id2 Peer that has disconnected can be NULL if unknown.
858  * @param peer Peer to notify (neighbor who sent the connection).
859  */
860 static void
861 send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id,
862                      const struct GNUNET_PeerIdentity *id1,
863                      const struct GNUNET_PeerIdentity *id2,
864                      const struct GNUNET_PeerIdentity *peer_id)
865 {
866   struct GNUNET_CADET_ConnectionBroken *msg;
867   struct CadetPeer *neighbor;
868
869   LOG (GNUNET_ERROR_TYPE_INFO, "===> BROKEN on unknown connection %s\n",
870        GNUNET_h2s (GC_h2hc (connection_id)));
871
872   msg = GNUNET_new (struct GNUNET_CADET_ConnectionBroken);
873   msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
874   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
875   msg->cid = *connection_id;
876   msg->peer1 = *id1;
877   if (NULL != id2)
878     msg->peer2 = *id2;
879   else
880     memset (&msg->peer2, 0, sizeof (msg->peer2));
881   neighbor = GCP_get (peer_id);
882   GCP_queue_add (neighbor, msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
883                  0, 2, sizeof (struct GNUNET_CADET_ConnectionBroken),
884                  NULL, GNUNET_SYSERR, /* connection, fwd */
885                  NULL, NULL); /* continuation */
886 }
887
888
889 /**
890  * Send keepalive packets for a connection.
891  *
892  * @param c Connection to keep alive..
893  * @param fwd Is this a FWD keepalive? (owner -> dest).
894  */
895 static void
896 send_connection_keepalive (struct CadetConnection *c, int fwd)
897 {
898   struct GNUNET_MessageHeader msg;
899   struct CadetFlowControl *fc;
900
901   LOG (GNUNET_ERROR_TYPE_INFO, "keepalive %s for connection %s\n",
902        GC_f2s (fwd), GCC_2s (c));
903
904   fc = fwd ? &c->fwd_fc : &c->bck_fc;
905   if (0 < fc->queue_n)
906   {
907     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, traffic in queue\n");
908   }
909
910   GNUNET_STATISTICS_update (stats, "# keepalives sent", 1, GNUNET_NO);
911
912   GNUNET_assert (NULL != c->t);
913   msg.size = htons (sizeof (msg));
914   msg.type = htons (GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE);
915
916   GNUNET_assert (NULL ==
917                  GCT_send_prebuilt_message (&msg, c->t, c,
918                                             GNUNET_NO, NULL, NULL));
919 }
920
921
922 /**
923  * Send CONNECTION_{CREATE/ACK} packets for a connection.
924  *
925  * @param c Connection for which to send the message.
926  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
927  */
928 static void
929 connection_recreate (struct CadetConnection *c, int fwd)
930 {
931   LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
932   if (fwd)
933     GCC_send_create (c);
934   else
935     send_connection_ack (c, GNUNET_NO);
936 }
937
938
939 /**
940  * Generic connection timer management.
941  * Depending on the role of the peer in the connection will send the
942  * appropriate message (build or keepalive)
943  *
944  * @param c Conncetion to maintain.
945  * @param fwd Is FWD?
946  */
947 static void
948 connection_maintain (struct CadetConnection *c, int fwd)
949 {
950   if (GNUNET_NO != c->destroy)
951   {
952     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, being destroyed\n");
953     return;
954   }
955
956   if (NULL == c->t)
957   {
958     GNUNET_break (0);
959     return;
960   }
961
962   if (CADET_TUNNEL_SEARCHING == GCT_get_cstate (c->t))
963   {
964     /* If status is SEARCHING, why is there a connection? Should be WAITING */
965     GNUNET_break (0);
966     GCT_debug (c->t, GNUNET_ERROR_TYPE_ERROR);
967     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, tunnel SEARCHING\n");
968     schedule_next_keepalive (c, fwd);
969     return;
970   }
971   switch (c->state)
972   {
973     case CADET_CONNECTION_NEW:
974       GNUNET_break (0);
975       /* fall-through */
976     case CADET_CONNECTION_SENT:
977       connection_recreate (c, fwd);
978       break;
979     case CADET_CONNECTION_READY:
980       send_connection_keepalive (c, fwd);
981       break;
982     default:
983       break;
984   }
985 }
986
987
988
989 /**
990  * Keep the connection alive.
991  *
992  * @param c Connection to keep alive.
993  * @param fwd Direction.
994  * @param shutdown Are we shutting down? (Don't send traffic)
995  *                 Non-zero value for true, not necessarily GNUNET_YES.
996  */
997 static void
998 connection_keepalive (struct CadetConnection *c, int fwd, int shutdown)
999 {
1000   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s keepalive for %s\n",
1001        GC_f2s (fwd), GCC_2s (c));
1002
1003   if (fwd)
1004     c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1005   else
1006     c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1007
1008   if (GNUNET_NO != shutdown)
1009     return;
1010
1011   connection_maintain (c, fwd);
1012
1013   /* Next execution will be scheduled by message_sent or _maintain*/
1014 }
1015
1016
1017 /**
1018  * Keep the connection alive in the FWD direction.
1019  *
1020  * @param cls Closure (connection to keepalive).
1021  * @param tc TaskContext.
1022  */
1023 static void
1024 connection_fwd_keepalive (void *cls,
1025                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1026 {
1027   connection_keepalive ((struct CadetConnection *) cls,
1028                         GNUNET_YES,
1029                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1030 }
1031
1032
1033 /**
1034  * Keep the connection alive in the BCK direction.
1035  *
1036  * @param cls Closure (connection to keepalive).
1037  * @param tc TaskContext.
1038  */
1039 static void
1040 connection_bck_keepalive (void *cls,
1041                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1042 {
1043   connection_keepalive ((struct CadetConnection *) cls,
1044                         GNUNET_NO,
1045                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1046 }
1047
1048
1049 /**
1050  * Schedule next keepalive task, taking in consideration
1051  * the connection state and number of retries.
1052  *
1053  * If the peer is not the origin, do nothing.
1054  *
1055  * @param c Connection for which to schedule the next keepalive.
1056  * @param fwd Direction for the next keepalive.
1057  */
1058 static void
1059 schedule_next_keepalive (struct CadetConnection *c, int fwd)
1060 {
1061   struct GNUNET_TIME_Relative delay;
1062   GNUNET_SCHEDULER_TaskIdentifier *task_id;
1063   GNUNET_SCHEDULER_Task keepalive_task;
1064
1065   if (GNUNET_NO == GCC_is_origin (c, fwd))
1066     return;
1067
1068   /* Calculate delay to use, depending on the state of the connection */
1069   if (CADET_CONNECTION_READY == c->state)
1070   {
1071     delay = refresh_connection_time;
1072   }
1073   else
1074   {
1075     if (1 > c->create_retry)
1076       c->create_retry = 1;
1077     delay = GNUNET_TIME_relative_multiply (create_connection_time,
1078                                            c->create_retry);
1079     if (c->create_retry < 64)
1080       c->create_retry *= 2;
1081   }
1082
1083   /* Select direction-dependent parameters */
1084   if (GNUNET_YES == fwd)
1085   {
1086     task_id = &c->fwd_maintenance_task;
1087     keepalive_task = &connection_fwd_keepalive;
1088   }
1089   else
1090   {
1091     task_id = &c->bck_maintenance_task;
1092     keepalive_task = &connection_bck_keepalive;
1093   }
1094
1095   /* Check that no one scheduled it before us */
1096   if (GNUNET_SCHEDULER_NO_TASK != *task_id)
1097   {
1098     /* No need for a _break. It can happen for instance when sending a SYNACK
1099      * for a duplicate SYN: the first SYNACK scheduled the task. */
1100     GNUNET_SCHEDULER_cancel (*task_id);
1101   }
1102
1103   /* Schedule the task */
1104   *task_id = GNUNET_SCHEDULER_add_delayed (delay, keepalive_task, c);
1105   LOG (GNUNET_ERROR_TYPE_DEBUG, "next keepalive in %s\n",
1106        GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1107 }
1108
1109
1110 /**
1111  * @brief Re-initiate traffic on this connection if necessary.
1112  *
1113  * Check if there is traffic queued towards this peer
1114  * and the core transmit handle is NULL (traffic was stalled).
1115  * If so, call core tmt rdy.
1116  *
1117  * @param c Connection on which initiate traffic.
1118  * @param fwd Is this about fwd traffic?
1119  */
1120 static void
1121 connection_unlock_queue (struct CadetConnection *c, int fwd)
1122 {
1123   struct CadetPeer *peer;
1124
1125   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_unlock_queue %s on %s\n",
1126        GC_f2s (fwd), GCC_2s (c));
1127
1128   if (GCC_is_terminal (c, fwd))
1129   {
1130     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal, can unlock!\n");
1131     return;
1132   }
1133
1134   peer = get_hop (c, fwd);
1135   GCP_queue_unlock (peer, c);
1136 }
1137
1138
1139 /**
1140  * Cancel all transmissions that belong to a certain connection.
1141  *
1142  * If the connection is scheduled for destruction and no more messages are left,
1143  * the connection will be destroyed by the continuation call.
1144  *
1145  * @param c Connection which to cancel. Might be destroyed during this call.
1146  * @param fwd Cancel fwd traffic?
1147  */
1148 static void
1149 connection_cancel_queues (struct CadetConnection *c, int fwd)
1150 {
1151   struct CadetFlowControl *fc;
1152   struct CadetPeer *peer;
1153
1154   LOG (GNUNET_ERROR_TYPE_DEBUG,
1155        " *** Cancel %s queues for connection %s\n",
1156        GC_f2s (fwd), GCC_2s (c));
1157   if (NULL == c)
1158   {
1159     GNUNET_break (0);
1160     return;
1161   }
1162
1163   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1164   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
1165   {
1166     GNUNET_SCHEDULER_cancel (fc->poll_task);
1167     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1168     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Cancel POLL in ccq for fc %p\n", fc);
1169   }
1170   peer = get_hop (c, fwd);
1171   GCP_queue_cancel (peer, c);
1172 }
1173
1174
1175 /**
1176  * Function called if a connection has been stalled for a while,
1177  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1178  *
1179  * @param cls Closure (poll ctx).
1180  * @param tc TaskContext.
1181  */
1182 static void
1183 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1184
1185
1186 /**
1187  * Callback called when a queued POLL message is sent.
1188  *
1189  * @param cls Closure (FC).
1190  * @param c Connection this message was on.
1191  * @param q Queue handler this call invalidates.
1192  * @param type Type of message sent.
1193  * @param fwd Was this a FWD going message?
1194  * @param size Size of the message.
1195  */
1196 static void
1197 poll_sent (void *cls,
1198            struct CadetConnection *c,
1199            struct CadetConnectionQueue *q,
1200            uint16_t type, int fwd, size_t size)
1201 {
1202   struct CadetFlowControl *fc = cls;
1203
1204   if (2 == c->destroy)
1205   {
1206     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL canceled on shutdown\n");
1207     return;
1208   }
1209   LOG (GNUNET_ERROR_TYPE_DEBUG,
1210        " *** POLL sent for , scheduling new one!\n");
1211   fc->poll_msg = NULL;
1212   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
1213   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1214                                                 &connection_poll, fc);
1215   LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
1216
1217 }
1218
1219 /**
1220  * Function called if a connection has been stalled for a while,
1221  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1222  *
1223  * @param cls Closure (poll ctx).
1224  * @param tc TaskContext.
1225  */
1226 static void
1227 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1228 {
1229   struct CadetFlowControl *fc = cls;
1230   struct GNUNET_CADET_Poll msg;
1231   struct CadetConnection *c;
1232
1233   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1234   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1235   {
1236     return;
1237   }
1238
1239   c = fc->c;
1240   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling connection %s %s\n",
1241        GCC_2s (c), fc == &c->fwd_fc ? "FWD" : "BCK");
1242
1243   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_POLL);
1244   msg.header.size = htons (sizeof (msg));
1245   msg.pid = htonl (fc->last_pid_sent);
1246   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent);
1247   fc->poll_msg =
1248       GCC_send_prebuilt_message (&msg.header, 0, fc->last_pid_sent, c,
1249                                  fc == &c->fwd_fc, GNUNET_YES, &poll_sent, fc);
1250   GNUNET_assert (NULL != fc->poll_msg);
1251 }
1252
1253
1254 /**
1255  * Timeout function due to lack of keepalive/traffic from the owner.
1256  * Destroys connection if called.
1257  *
1258  * @param cls Closure (connection to destroy).
1259  * @param tc TaskContext.
1260  */
1261 static void
1262 connection_fwd_timeout (void *cls,
1263                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1264 {
1265   struct CadetConnection *c = cls;
1266
1267   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1268   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1269     return;
1270
1271   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s FWD timed out. Destroying.\n",
1272        GCC_2s (c));
1273   if (GCC_is_origin (c, GNUNET_YES)) /* If local, leave. */
1274   {
1275     GNUNET_break (0);
1276     return;
1277   }
1278
1279   GCC_destroy (c);
1280 }
1281
1282
1283 /**
1284  * Timeout function due to lack of keepalive/traffic from the destination.
1285  * Destroys connection if called.
1286  *
1287  * @param cls Closure (connection to destroy).
1288  * @param tc TaskContext
1289  */
1290 static void
1291 connection_bck_timeout (void *cls,
1292                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1293 {
1294   struct CadetConnection *c = cls;
1295
1296   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1297   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1298     return;
1299
1300   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s BCK timed out. Destroying.\n",
1301        GCC_2s (c));
1302
1303   if (GCC_is_origin (c, GNUNET_NO)) /* If local, leave. */
1304   {
1305     GNUNET_break (0);
1306     return;
1307   }
1308
1309   GCC_destroy (c);
1310 }
1311
1312
1313 /**
1314  * Resets the connection timeout task, some other message has done the
1315  * task's job.
1316  * - For the first peer on the direction this means to send
1317  *   a keepalive or a path confirmation message (either create or ACK).
1318  * - For all other peers, this means to destroy the connection,
1319  *   due to lack of activity.
1320  * Starts the timeout if no timeout was running (connection just created).
1321  *
1322  * @param c Connection whose timeout to reset.
1323  * @param fwd Is this forward?
1324  *
1325  * TODO use heap to improve efficiency of scheduler.
1326  */
1327 static void
1328 connection_reset_timeout (struct CadetConnection *c, int fwd)
1329 {
1330   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GC_f2s (fwd));
1331
1332   if (GCC_is_origin (c, fwd)) /* Startpoint */
1333   {
1334     schedule_next_keepalive (c, fwd);
1335   }
1336   else /* Relay, endpoint. */
1337   {
1338     struct GNUNET_TIME_Relative delay;
1339     GNUNET_SCHEDULER_TaskIdentifier *ti;
1340     GNUNET_SCHEDULER_Task f;
1341
1342     ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1343
1344     if (GNUNET_SCHEDULER_NO_TASK != *ti)
1345       GNUNET_SCHEDULER_cancel (*ti);
1346     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1347     f = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1348     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1349   }
1350 }
1351
1352
1353 /**
1354  * Add the connection to the list of both neighbors.
1355  *
1356  * @param c Connection.
1357  *
1358  * @return #GNUNET_OK if everything went fine
1359  *         #GNUNET_SYSERR if the was an error and @c c is malformed.
1360  */
1361 static int
1362 register_neighbors (struct CadetConnection *c)
1363 {
1364   struct CadetPeer *next_peer;
1365   struct CadetPeer *prev_peer;
1366
1367   next_peer = get_next_hop (c);
1368   prev_peer = get_prev_hop (c);
1369
1370   LOG (GNUNET_ERROR_TYPE_DEBUG, "register neighbors for connection %s\n",
1371        GCC_2s (c));
1372   path_debug (c->path);
1373   LOG (GNUNET_ERROR_TYPE_DEBUG, "own pos %u\n", c->own_pos);
1374   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to next peer %p\n",
1375        GCC_2s (c), next_peer);
1376   LOG (GNUNET_ERROR_TYPE_DEBUG, "next peer %p %s\n", next_peer, GCP_2s (next_peer));
1377   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to prev peer %p\n",
1378        GCC_2s (c), prev_peer);
1379   LOG (GNUNET_ERROR_TYPE_DEBUG, "prev peer %p %s\n", prev_peer, GCP_2s (prev_peer));
1380
1381   if (GNUNET_NO == GCP_is_neighbor (next_peer)
1382       || GNUNET_NO == GCP_is_neighbor (prev_peer))
1383   {
1384     if (GCC_is_origin (c, GNUNET_YES))
1385       GNUNET_STATISTICS_update (stats, "# local bad paths", 1, GNUNET_NO);
1386     GNUNET_STATISTICS_update (stats, "# bad paths", 1, GNUNET_NO);
1387
1388     LOG (GNUNET_ERROR_TYPE_DEBUG, "  register neighbors failed\n");
1389     LOG (GNUNET_ERROR_TYPE_DEBUG, "  prev: %s, neighbor?: %d\n",
1390          GCP_2s (prev_peer), GCP_is_neighbor (prev_peer));
1391     LOG (GNUNET_ERROR_TYPE_DEBUG, "  next: %s, neighbor?: %d\n",
1392          GCP_2s (next_peer), GCP_is_neighbor (next_peer));
1393     return GNUNET_SYSERR;
1394   }
1395
1396   GCP_add_connection (next_peer, c);
1397   GCP_add_connection (prev_peer, c);
1398
1399   return GNUNET_OK;
1400 }
1401
1402
1403 /**
1404  * Remove the connection from the list of both neighbors.
1405  *
1406  * @param c Connection.
1407  */
1408 static void
1409 unregister_neighbors (struct CadetConnection *c)
1410 {
1411   struct CadetPeer *peer;
1412
1413   peer = get_next_hop (c);
1414   if (GNUNET_OK != GCP_remove_connection (peer, c))
1415   {
1416     GNUNET_assert (CADET_CONNECTION_NEW == c->state
1417                    || CADET_CONNECTION_DESTROYED == c->state);
1418     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1419     if (NULL != c->t) GCT_debug (c->t, GNUNET_ERROR_TYPE_DEBUG);
1420   }
1421
1422   peer = get_prev_hop (c);
1423   if (GNUNET_OK != GCP_remove_connection (peer, c))
1424   {
1425     GNUNET_assert (CADET_CONNECTION_NEW == c->state
1426                    || CADET_CONNECTION_DESTROYED == c->state);
1427     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1428     if (NULL != c->t) GCT_debug (c->t, GNUNET_ERROR_TYPE_DEBUG);
1429   }
1430 }
1431
1432
1433 /**
1434  * Bind the connection to the peer and the tunnel to that peer.
1435  *
1436  * If the peer has no tunnel, create one. Update tunnel and connection
1437  * data structres to reflect new status.
1438  *
1439  * @param c Connection.
1440  * @param peer Peer.
1441  */
1442 static void
1443 add_to_peer (struct CadetConnection *c, struct CadetPeer *peer)
1444 {
1445   GCP_add_tunnel (peer);
1446   c->t = GCP_get_tunnel (peer);
1447   GCT_add_connection (c->t, c);
1448 }
1449
1450
1451 /**
1452  * Builds a path from a PeerIdentity array.
1453  *
1454  * @param peers PeerIdentity array.
1455  * @param size Size of the @c peers array.
1456  * @param own_pos Output parameter: own position in the path.
1457  *
1458  * @return Fixed and shortened path.
1459  */
1460 static struct CadetPeerPath *
1461 build_path_from_peer_ids (struct GNUNET_PeerIdentity *peers,
1462                           unsigned int size,
1463                           unsigned int *own_pos)
1464 {
1465   struct CadetPeerPath *path;
1466   GNUNET_PEER_Id shortid;
1467   unsigned int i;
1468   unsigned int j;
1469   unsigned int offset;
1470
1471   /* Create path */
1472   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating path...\n");
1473   path = path_new (size);
1474   *own_pos = 0;
1475   offset = 0;
1476   for (i = 0; i < size; i++)
1477   {
1478     LOG (GNUNET_ERROR_TYPE_DEBUG, "  - %u: taking %s\n",
1479          i, GNUNET_i2s (&peers[i]));
1480     shortid = GNUNET_PEER_intern (&peers[i]);
1481
1482     /* Check for loops / duplicates */
1483     for (j = 0; j < i - offset; j++)
1484     {
1485       if (path->peers[j] == shortid)
1486       {
1487         LOG (GNUNET_ERROR_TYPE_DEBUG, "    already exists at pos %u\n", j);
1488         offset = i - j;
1489         LOG (GNUNET_ERROR_TYPE_DEBUG, "    offset now %u\n", offset);
1490         GNUNET_PEER_change_rc (shortid, -1);
1491       }
1492     }
1493     LOG (GNUNET_ERROR_TYPE_DEBUG, "    storing at %u\n", i - offset);
1494     path->peers[i - offset] = shortid;
1495     if (path->peers[i - offset] == myid)
1496       *own_pos = i - offset;
1497   }
1498   path->length -= offset;
1499
1500   if (path->peers[*own_pos] != myid)
1501   {
1502     /* create path: self not found in path through self */
1503     GNUNET_break_op (0);
1504     path_destroy (path);
1505     return NULL;
1506   }
1507
1508   return path;
1509 }
1510
1511
1512 /**
1513  * Log receipt of message on stderr (INFO level).
1514  *
1515  * @param message Message received.
1516  * @param peer Peer who sent the message.
1517  * @param hash Connection ID.
1518  */
1519 static void
1520 log_message (const struct GNUNET_MessageHeader *message,
1521              const struct GNUNET_PeerIdentity *peer,
1522              const struct GNUNET_CADET_Hash *hash)
1523 {
1524   LOG (GNUNET_ERROR_TYPE_INFO, "<-- %s on connection %s from %s\n",
1525        GC_m2s (ntohs (message->type)), GNUNET_h2s (GC_h2hc (hash)),
1526        GNUNET_i2s (peer));
1527 }
1528
1529 /******************************************************************************/
1530 /********************************    API    ***********************************/
1531 /******************************************************************************/
1532
1533 /**
1534  * Core handler for connection creation.
1535  *
1536  * @param cls Closure (unused).
1537  * @param peer Sender (neighbor).
1538  * @param message Message.
1539  *
1540  * @return GNUNET_OK to keep the connection open,
1541  *         GNUNET_SYSERR to close it (signal serious error)
1542  */
1543 int
1544 GCC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
1545                    const struct GNUNET_MessageHeader *message)
1546 {
1547   struct GNUNET_CADET_ConnectionCreate *msg;
1548   struct GNUNET_PeerIdentity *id;
1549   struct GNUNET_CADET_Hash *cid;
1550   struct CadetPeerPath *path;
1551   struct CadetPeer *dest_peer;
1552   struct CadetPeer *orig_peer;
1553   struct CadetConnection *c;
1554   unsigned int own_pos;
1555   uint16_t size;
1556
1557   /* Check size */
1558   size = ntohs (message->size);
1559   if (size < sizeof (struct GNUNET_CADET_ConnectionCreate))
1560   {
1561     GNUNET_break_op (0);
1562     return GNUNET_OK;
1563   }
1564
1565   /* Calculate hops */
1566   size -= sizeof (struct GNUNET_CADET_ConnectionCreate);
1567   if (size % sizeof (struct GNUNET_PeerIdentity))
1568   {
1569     GNUNET_break_op (0);
1570     return GNUNET_OK;
1571   }
1572   size /= sizeof (struct GNUNET_PeerIdentity);
1573   if (1 > size)
1574   {
1575     GNUNET_break_op (0);
1576     return GNUNET_OK;
1577   }
1578   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1579
1580   /* Get parameters */
1581   msg = (struct GNUNET_CADET_ConnectionCreate *) message;
1582   cid = &msg->cid;
1583   log_message (message, peer, cid);
1584   id = (struct GNUNET_PeerIdentity *) &msg[1];
1585   LOG (GNUNET_ERROR_TYPE_DEBUG, "    origin: %s\n", GNUNET_i2s (id));
1586
1587   /* Create connection */
1588   c = connection_get (cid);
1589   if (NULL == c)
1590   {
1591     path = build_path_from_peer_ids ((struct GNUNET_PeerIdentity *) &msg[1],
1592                                      size, &own_pos);
1593     if (NULL == path)
1594       return GNUNET_OK;
1595     if (0 == own_pos)
1596     {
1597       GNUNET_break_op (0);
1598       path_destroy (path);
1599       return GNUNET_OK;
1600     }
1601     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1602     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1603     c = GCC_new (cid, NULL, path_duplicate (path), own_pos);
1604     if (NULL == c)
1605     {
1606       if (path->length - 1 == own_pos)
1607       {
1608         /* If we are destination, why did the creation fail? */
1609         GNUNET_break (0);
1610         return GNUNET_OK;
1611       }
1612       send_broken_unknown (cid, &my_full_id,
1613                            GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
1614                            peer);
1615       path_destroy (path);
1616       return GNUNET_OK;
1617     }
1618     GCP_add_path_to_all (path, GNUNET_NO);
1619     connection_reset_timeout (c, GNUNET_YES);
1620   }
1621   else
1622   {
1623     path = path_duplicate (c->path);
1624   }
1625   if (CADET_CONNECTION_NEW == c->state)
1626     connection_change_state (c, CADET_CONNECTION_SENT);
1627
1628   /* Remember peers */
1629   dest_peer = GCP_get (&id[size - 1]);
1630   orig_peer = GCP_get (&id[0]);
1631
1632   /* Is it a connection to us? */
1633   if (c->own_pos == path->length - 1)
1634   {
1635     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
1636     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
1637
1638     add_to_peer (c, orig_peer);
1639     if (CADET_TUNNEL_NEW == GCT_get_cstate (c->t))
1640       GCT_change_cstate (c->t,  CADET_TUNNEL_WAITING);
1641
1642     send_connection_ack (c, GNUNET_NO);
1643     if (CADET_CONNECTION_SENT == c->state)
1644       connection_change_state (c, CADET_CONNECTION_ACK);
1645   }
1646   else
1647   {
1648     /* It's for somebody else! Retransmit. */
1649     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
1650     GCP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
1651     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
1652     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c,
1653                                                       GNUNET_YES, GNUNET_YES,
1654                                                       NULL, NULL));
1655   }
1656   path_destroy (path);
1657   return GNUNET_OK;
1658 }
1659
1660
1661 /**
1662  * Core handler for path confirmations.
1663  *
1664  * @param cls closure
1665  * @param message message
1666  * @param peer peer identity this notification is about
1667  *
1668  * @return GNUNET_OK to keep the connection open,
1669  *         GNUNET_SYSERR to close it (signal serious error)
1670  */
1671 int
1672 GCC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
1673                     const struct GNUNET_MessageHeader *message)
1674 {
1675   struct GNUNET_CADET_ConnectionACK *msg;
1676   struct CadetConnection *c;
1677   struct CadetPeerPath *p;
1678   struct CadetPeer *pi;
1679   enum CadetConnectionState oldstate;
1680   int fwd;
1681
1682   msg = (struct GNUNET_CADET_ConnectionACK *) message;
1683   log_message (message, peer, &msg->cid);
1684   c = connection_get (&msg->cid);
1685   if (NULL == c)
1686   {
1687     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1688                               1, GNUNET_NO);
1689     LOG (GNUNET_ERROR_TYPE_DEBUG, "  don't know the connection!\n");
1690     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
1691     return GNUNET_OK;
1692   }
1693
1694   if (GNUNET_NO != c->destroy)
1695   {
1696     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection being destroyed\n");
1697     return GNUNET_OK;
1698   }
1699
1700   oldstate = c->state;
1701   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
1702   pi = GCP_get (peer);
1703   if (get_next_hop (c) == pi)
1704   {
1705     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
1706     fwd = GNUNET_NO;
1707     if (CADET_CONNECTION_SENT == oldstate)
1708       connection_change_state (c, CADET_CONNECTION_ACK);
1709   }
1710   else if (get_prev_hop (c) == pi)
1711   {
1712     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FINAL ACK\n");
1713     fwd = GNUNET_YES;
1714     connection_change_state (c, CADET_CONNECTION_READY);
1715   }
1716   else
1717   {
1718     GNUNET_break_op (0);
1719     return GNUNET_OK;
1720   }
1721
1722   connection_reset_timeout (c, fwd);
1723
1724   /* Add path to peers? */
1725   p = c->path;
1726   if (NULL != p)
1727   {
1728     GCP_add_path_to_all (p, GNUNET_YES);
1729   }
1730   else
1731   {
1732     GNUNET_break (0);
1733   }
1734
1735   /* Message for us as creator? */
1736   if (GCC_is_origin (c, GNUNET_YES))
1737   {
1738     if (GNUNET_NO != fwd)
1739     {
1740       GNUNET_break_op (0);
1741       return GNUNET_OK;
1742     }
1743     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
1744
1745     /* If just created, cancel the short timeout and start a long one */
1746     if (CADET_CONNECTION_SENT == oldstate)
1747       connection_reset_timeout (c, GNUNET_YES);
1748
1749     /* Change connection state */
1750     connection_change_state (c, CADET_CONNECTION_READY);
1751     send_connection_ack (c, GNUNET_YES);
1752
1753     /* Change tunnel state, trigger KX */
1754     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
1755       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
1756
1757     return GNUNET_OK;
1758   }
1759
1760   /* Message for us as destination? */
1761   if (GCC_is_terminal (c, GNUNET_YES))
1762   {
1763     if (GNUNET_YES != fwd)
1764     {
1765       GNUNET_break_op (0);
1766       return GNUNET_OK;
1767     }
1768     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
1769
1770     /* If just created, cancel the short timeout and start a long one */
1771     if (CADET_CONNECTION_ACK == oldstate)
1772       connection_reset_timeout (c, GNUNET_NO);
1773
1774     /* Change tunnel state */
1775     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
1776       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
1777
1778     return GNUNET_OK;
1779   }
1780
1781   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1782   GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
1783                                                     GNUNET_YES, NULL, NULL));
1784   return GNUNET_OK;
1785 }
1786
1787
1788 /**
1789  * Core handler for notifications of broken connections.
1790  *
1791  * @param cls Closure (unused).
1792  * @param id Peer identity of sending neighbor.
1793  * @param message Message.
1794  *
1795  * @return GNUNET_OK to keep the connection open,
1796  *         GNUNET_SYSERR to close it (signal serious error)
1797  */
1798 int
1799 GCC_handle_broken (void* cls,
1800                    const struct GNUNET_PeerIdentity* id,
1801                    const struct GNUNET_MessageHeader* message)
1802 {
1803   struct GNUNET_CADET_ConnectionBroken *msg;
1804   struct CadetConnection *c;
1805   struct CadetTunnel *t;
1806   unsigned int del;
1807   int pending;
1808   int fwd;
1809
1810   msg = (struct GNUNET_CADET_ConnectionBroken *) message;
1811   log_message (message, id, &msg->cid);
1812   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1813               GNUNET_i2s (&msg->peer1));
1814   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1815               GNUNET_i2s (&msg->peer2));
1816   c = connection_get (&msg->cid);
1817   if (NULL == c)
1818   {
1819     LOG (GNUNET_ERROR_TYPE_DEBUG, "  duplicate CONNECTION_BROKEN\n");
1820     return GNUNET_OK;
1821   }
1822
1823   t = c->t;
1824   fwd = is_fwd (c, id);
1825   if (GCC_is_terminal (c, fwd))
1826   {
1827     struct GNUNET_MessageHeader *out_msg;
1828     struct CadetPeer *neighbor;
1829     struct CadetPeer *endpoint;
1830
1831     if (NULL == t)
1832     {
1833       /* A terminal connection should not have 't' set to NULL. */
1834       GNUNET_break (0);
1835       return GNUNET_OK;
1836     }
1837     neighbor = get_hop (c, !fwd);
1838     endpoint = GCP_get_short (c->path->peers[c->path->length - 1]);
1839     path_invalidate (c->path);
1840     GCP_notify_broken_link (endpoint, &msg->peer1, &msg->peer2);
1841     c->state = CADET_CONNECTION_DESTROYED;
1842     GCT_remove_connection (t, c);
1843     c->t = NULL;
1844     pending = c->pending_messages;
1845
1846     /* GCP_connection_pop will destroy the connection when the last message
1847      * is popped! Do not use 'c' after the call. */
1848     while (NULL != (out_msg = GCP_connection_pop (neighbor, c, &del)))
1849     {
1850       pending -= del + 1; /* Substract the deleted messages + the popped one */
1851       GCT_resend_message (out_msg, t);
1852     }
1853     /* All pending messages should have been popped,
1854      * and the connection destroyed by the continuation. */
1855     if (0 < pending)
1856     {
1857       GNUNET_break (0);
1858       GCC_destroy (c);
1859     }
1860     else
1861     {
1862       GNUNET_break (0 == pending); /* If negative: counter error! */
1863     }
1864   }
1865   else
1866   {
1867     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
1868                                                       GNUNET_YES, NULL, NULL));
1869     c->destroy = GNUNET_YES;
1870     connection_cancel_queues (c, !fwd);
1871   }
1872
1873   return GNUNET_OK;
1874 }
1875
1876
1877 /**
1878  * Core handler for tunnel destruction
1879  *
1880  * @param cls Closure (unused).
1881  * @param peer Peer identity of sending neighbor.
1882  * @param message Message.
1883  *
1884  * @return GNUNET_OK to keep the connection open,
1885  *         GNUNET_SYSERR to close it (signal serious error)
1886  */
1887 int
1888 GCC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
1889                     const struct GNUNET_MessageHeader *message)
1890 {
1891   struct GNUNET_CADET_ConnectionDestroy *msg;
1892   struct CadetConnection *c;
1893   int fwd;
1894
1895   msg = (struct GNUNET_CADET_ConnectionDestroy *) message;
1896   log_message (message, peer, &msg->cid);
1897   c = connection_get (&msg->cid);
1898   if (NULL == c)
1899   {
1900     /* Probably already got the message from another path,
1901      * destroyed the tunnel and retransmitted to children.
1902      * Safe to ignore.
1903      */
1904     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1905                               1, GNUNET_NO);
1906     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection unknown: already destroyed?\n");
1907     return GNUNET_OK;
1908   }
1909   fwd = is_fwd (c, peer);
1910   if (GNUNET_SYSERR == fwd)
1911   {
1912     GNUNET_break_op (0); /* FIXME */
1913     return GNUNET_OK;
1914   }
1915   if (GNUNET_NO == GCC_is_terminal (c, fwd))
1916     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
1917                                                       GNUNET_YES, NULL, NULL));
1918   else if (0 == c->pending_messages)
1919   {
1920     LOG (GNUNET_ERROR_TYPE_DEBUG, "  directly destroying connection!\n");
1921     GCC_destroy (c);
1922     return GNUNET_OK;
1923   }
1924   c->destroy = GNUNET_YES;
1925   c->state = CADET_CONNECTION_DESTROYED;
1926   if (NULL != c->t)
1927   {
1928     GCT_remove_connection (c->t, c);
1929     c->t = NULL;
1930   }
1931
1932   return GNUNET_OK;
1933 }
1934
1935 /**
1936  * Generic handler for cadet network encrypted traffic.
1937  *
1938  * @param peer Peer identity this notification is about.
1939  * @param msg Encrypted message.
1940  *
1941  * @return GNUNET_OK to keep the connection open,
1942  *         GNUNET_SYSERR to close it (signal serious error)
1943  */
1944 static int
1945 handle_cadet_encrypted (const struct GNUNET_PeerIdentity *peer,
1946                        const struct GNUNET_CADET_Encrypted *msg)
1947 {
1948   struct CadetConnection *c;
1949   struct CadetPeer *neighbor;
1950   struct CadetFlowControl *fc;
1951   GNUNET_PEER_Id peer_id;
1952   uint32_t pid;
1953   uint32_t ttl;
1954   size_t size;
1955   int fwd;
1956
1957   log_message (&msg->header, peer, &msg->cid);
1958
1959   /* Check size */
1960   size = ntohs (msg->header.size);
1961   if (size <
1962       sizeof (struct GNUNET_CADET_Encrypted) +
1963       sizeof (struct GNUNET_MessageHeader))
1964   {
1965     GNUNET_break_op (0);
1966     return GNUNET_OK;
1967   }
1968
1969   /* Check connection */
1970   c = connection_get (&msg->cid);
1971   if (NULL == c)
1972   {
1973     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1974     LOG (GNUNET_ERROR_TYPE_DEBUG, "enc on unknown connection %s\n",
1975          GNUNET_h2s (GC_h2hc (&msg->cid)));
1976     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
1977     return GNUNET_OK;
1978   }
1979
1980   LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection %s\n", GCC_2s (c));
1981
1982   /* Check if origin is as expected */
1983   neighbor = get_prev_hop (c);
1984   peer_id = GNUNET_PEER_search (peer);
1985   if (peer_id == GCP_get_short_id (neighbor))
1986   {
1987     fwd = GNUNET_YES;
1988   }
1989   else
1990   {
1991     neighbor = get_next_hop (c);
1992     if (peer_id == GCP_get_short_id (neighbor))
1993     {
1994       fwd = GNUNET_NO;
1995     }
1996     else
1997     {
1998       /* Unexpected peer sending traffic on a connection. */
1999       GNUNET_break_op (0);
2000       return GNUNET_OK;
2001     }
2002   }
2003
2004   /* Check PID */
2005   fc = fwd ? &c->bck_fc : &c->fwd_fc;
2006   pid = ntohl (msg->pid);
2007   LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u (expected %u+)\n",
2008        pid, fc->last_pid_recv + 1);
2009   if (GC_is_pid_bigger (pid, fc->last_ack_sent))
2010   {
2011     GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
2012     GNUNET_break_op (0);
2013     LOG (GNUNET_ERROR_TYPE_WARNING,
2014          "Received PID %u, (prev %u), ACK %u\n",
2015          pid, fc->last_pid_recv, fc->last_ack_sent);
2016     return GNUNET_OK;
2017   }
2018   if (GNUNET_NO == GC_is_pid_bigger (pid, fc->last_pid_recv))
2019   {
2020     GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
2021     LOG (GNUNET_ERROR_TYPE_DEBUG,
2022                 " PID %u not expected (%u+), dropping!\n",
2023                 pid, fc->last_pid_recv + 1);
2024     return GNUNET_OK;
2025   }
2026   if (CADET_CONNECTION_SENT == c->state || CADET_CONNECTION_ACK == c->state)
2027     connection_change_state (c, CADET_CONNECTION_READY);
2028   connection_reset_timeout (c, fwd);
2029   fc->last_pid_recv = pid;
2030
2031   /* Is this message for us? */
2032   if (GCC_is_terminal (c, fwd))
2033   {
2034     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2035     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
2036
2037     if (NULL == c->t)
2038     {
2039       GNUNET_break (GNUNET_NO != c->destroy);
2040       return GNUNET_OK;
2041     }
2042     fc->last_pid_recv = pid;
2043     GCT_handle_encrypted (c->t, msg);
2044     GCC_send_ack (c, fwd, GNUNET_NO);
2045     return GNUNET_OK;
2046   }
2047
2048   /* Message not for us: forward to next hop */
2049   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2050   ttl = ntohl (msg->ttl);
2051   LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
2052   if (ttl == 0)
2053   {
2054     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
2055     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
2056     GCC_send_ack (c, fwd, GNUNET_NO);
2057     return GNUNET_OK;
2058   }
2059
2060   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2061   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
2062                                                     GNUNET_NO, NULL, NULL));
2063
2064   return GNUNET_OK;
2065 }
2066
2067 /**
2068  * Generic handler for cadet network encrypted traffic.
2069  *
2070  * @param peer Peer identity this notification is about.
2071  * @param msg Encrypted message.
2072  *
2073  * @return GNUNET_OK to keep the connection open,
2074  *         GNUNET_SYSERR to close it (signal serious error)
2075  */
2076 static int
2077 handle_cadet_kx (const struct GNUNET_PeerIdentity *peer,
2078                 const struct GNUNET_CADET_KX *msg)
2079 {
2080   struct CadetConnection *c;
2081   struct CadetPeer *neighbor;
2082   GNUNET_PEER_Id peer_id;
2083   size_t size;
2084   int fwd;
2085
2086   log_message (&msg->header, peer, &msg->cid);
2087
2088   /* Check size */
2089   size = ntohs (msg->header.size);
2090   if (size <
2091       sizeof (struct GNUNET_CADET_KX) +
2092       sizeof (struct GNUNET_MessageHeader))
2093   {
2094     GNUNET_break_op (0);
2095     return GNUNET_OK;
2096   }
2097
2098   /* Check connection */
2099   c = connection_get (&msg->cid);
2100   if (NULL == c)
2101   {
2102     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
2103     LOG (GNUNET_ERROR_TYPE_DEBUG, "kx on unknown connection %s\n",
2104          GNUNET_h2s (GC_h2hc (&msg->cid)));
2105     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2106     return GNUNET_OK;
2107   }
2108   LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s\n", GCC_2s (c));
2109
2110   /* Check if origin is as expected */
2111   neighbor = get_prev_hop (c);
2112   peer_id = GNUNET_PEER_search (peer);
2113   if (peer_id == GCP_get_short_id (neighbor))
2114   {
2115     fwd = GNUNET_YES;
2116   }
2117   else
2118   {
2119     neighbor = get_next_hop (c);
2120     if (peer_id == GCP_get_short_id (neighbor))
2121     {
2122       fwd = GNUNET_NO;
2123     }
2124     else
2125     {
2126       /* Unexpected peer sending traffic on a connection. */
2127       GNUNET_break_op (0);
2128       return GNUNET_OK;
2129     }
2130   }
2131
2132   /* Count as connection confirmation. */
2133   if (CADET_CONNECTION_SENT == c->state || CADET_CONNECTION_ACK == c->state)
2134   {
2135     connection_change_state (c, CADET_CONNECTION_READY);
2136     if (NULL != c->t)
2137     {
2138       if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2139         GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2140     }
2141   }
2142   connection_reset_timeout (c, fwd);
2143
2144   /* Is this message for us? */
2145   if (GCC_is_terminal (c, fwd))
2146   {
2147     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2148     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
2149     if (NULL == c->t)
2150     {
2151       GNUNET_break (0);
2152       return GNUNET_OK;
2153     }
2154     GCT_handle_kx (c->t, &msg[1].header);
2155     return GNUNET_OK;
2156   }
2157
2158   /* Message not for us: forward to next hop */
2159   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2160   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2161   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
2162                                                     GNUNET_NO, NULL, NULL));
2163
2164   return GNUNET_OK;
2165 }
2166
2167
2168 /**
2169  * Core handler for encrypted cadet network traffic (channel mgmt, data).
2170  *
2171  * @param cls Closure (unused).
2172  * @param message Message received.
2173  * @param peer Peer who sent the message.
2174  *
2175  * @return GNUNET_OK to keep the connection open,
2176  *         GNUNET_SYSERR to close it (signal serious error)
2177  */
2178 int
2179 GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
2180                       const struct GNUNET_MessageHeader *message)
2181 {
2182   return handle_cadet_encrypted (peer,
2183                                 (struct GNUNET_CADET_Encrypted *)message);
2184 }
2185
2186
2187 /**
2188  * Core handler for key exchange traffic (ephemeral key, ping, pong).
2189  *
2190  * @param cls Closure (unused).
2191  * @param message Message received.
2192  * @param peer Peer who sent the message.
2193  *
2194  * @return GNUNET_OK to keep the connection open,
2195  *         GNUNET_SYSERR to close it (signal serious error)
2196  */
2197 int
2198 GCC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
2199                const struct GNUNET_MessageHeader *message)
2200 {
2201   return handle_cadet_kx (peer,
2202                          (struct GNUNET_CADET_KX *) message);
2203 }
2204
2205
2206 /**
2207  * Core handler for cadet network traffic point-to-point acks.
2208  *
2209  * @param cls closure
2210  * @param message message
2211  * @param peer peer identity this notification is about
2212  *
2213  * @return GNUNET_OK to keep the connection open,
2214  *         GNUNET_SYSERR to close it (signal serious error)
2215  */
2216 int
2217 GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
2218                 const struct GNUNET_MessageHeader *message)
2219 {
2220   struct GNUNET_CADET_ACK *msg;
2221   struct CadetConnection *c;
2222   struct CadetFlowControl *fc;
2223   GNUNET_PEER_Id id;
2224   uint32_t ack;
2225   int fwd;
2226
2227   msg = (struct GNUNET_CADET_ACK *) message;
2228   log_message (message, peer, &msg->cid);
2229   c = connection_get (&msg->cid);
2230   if (NULL == c)
2231   {
2232     GNUNET_STATISTICS_update (stats, "# ack on unknown connection", 1,
2233                               GNUNET_NO);
2234     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2235     return GNUNET_OK;
2236   }
2237
2238   /* Is this a forward or backward ACK? */
2239   id = GNUNET_PEER_search (peer);
2240   if (GCP_get_short_id (get_next_hop (c)) == id)
2241   {
2242     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
2243     fc = &c->fwd_fc;
2244     fwd = GNUNET_YES;
2245   }
2246   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2247   {
2248     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
2249     fc = &c->bck_fc;
2250     fwd = GNUNET_NO;
2251   }
2252   else
2253   {
2254     GNUNET_break_op (0);
2255     return GNUNET_OK;
2256   }
2257
2258   ack = ntohl (msg->ack);
2259   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK %u (was %u)\n",
2260               ack, fc->last_ack_recv);
2261   if (GC_is_pid_bigger (ack, fc->last_ack_recv))
2262     fc->last_ack_recv = ack;
2263
2264   /* Cancel polling if the ACK is big enough. */
2265   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
2266       GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2267   {
2268     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
2269     GNUNET_SCHEDULER_cancel (fc->poll_task);
2270     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2271     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
2272   }
2273
2274   connection_unlock_queue (c, fwd);
2275
2276   return GNUNET_OK;
2277 }
2278
2279
2280 /**
2281  * Core handler for cadet network traffic point-to-point ack polls.
2282  *
2283  * @param cls closure
2284  * @param message message
2285  * @param peer peer identity this notification is about
2286  *
2287  * @return GNUNET_OK to keep the connection open,
2288  *         GNUNET_SYSERR to close it (signal serious error)
2289  */
2290 int
2291 GCC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
2292                  const struct GNUNET_MessageHeader *message)
2293 {
2294   struct GNUNET_CADET_Poll *msg;
2295   struct CadetConnection *c;
2296   struct CadetFlowControl *fc;
2297   GNUNET_PEER_Id id;
2298   uint32_t pid;
2299   int fwd;
2300
2301   msg = (struct GNUNET_CADET_Poll *) message;
2302   log_message (message, peer, &msg->cid);
2303   c = connection_get (&msg->cid);
2304   if (NULL == c)
2305   {
2306     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
2307                               GNUNET_NO);
2308     LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL message on unknown connection %s!\n",
2309          GNUNET_h2s (GC_h2hc (&msg->cid)));
2310     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2311     return GNUNET_OK;
2312   }
2313
2314   /* Is this a forward or backward ACK?
2315    * Note: a poll should never be needed in a loopback case,
2316    * since there is no possiblility of packet loss there, so
2317    * this way of discerining FWD/BCK should not be a problem.
2318    */
2319   id = GNUNET_PEER_search (peer);
2320   if (GCP_get_short_id (get_next_hop (c)) == id)
2321   {
2322     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
2323     fc = &c->fwd_fc;
2324   }
2325   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2326   {
2327     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
2328     fc = &c->bck_fc;
2329   }
2330   else
2331   {
2332     GNUNET_break_op (0);
2333     return GNUNET_OK;
2334   }
2335
2336   pid = ntohl (msg->pid);
2337   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
2338   fc->last_pid_recv = pid;
2339   fwd = fc == &c->bck_fc;
2340   GCC_send_ack (c, fwd, GNUNET_YES);
2341
2342   return GNUNET_OK;
2343 }
2344
2345
2346 /**
2347  * Send an ACK on the appropriate connection/channel, depending on
2348  * the direction and the position of the peer.
2349  *
2350  * @param c Which connection to send the hop-by-hop ACK.
2351  * @param fwd Is this a fwd ACK? (will go dest->root).
2352  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
2353  */
2354 void
2355 GCC_send_ack (struct CadetConnection *c, int fwd, int force)
2356 {
2357   unsigned int buffer;
2358
2359   LOG (GNUNET_ERROR_TYPE_DEBUG,
2360        "GMC send %s ACK on %s\n",
2361        GC_f2s (fwd), GCC_2s (c));
2362
2363   if (NULL == c)
2364   {
2365     GNUNET_break (0);
2366     return;
2367   }
2368
2369   if (GNUNET_NO != c->destroy)
2370   {
2371     LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
2372     return;
2373   }
2374
2375   /* Get available buffer space */
2376   if (GCC_is_terminal (c, fwd))
2377   {
2378     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
2379     buffer = GCT_get_channels_buffer (c->t);
2380   }
2381   else
2382   {
2383     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
2384     buffer = GCC_get_buffer (c, fwd);
2385   }
2386   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
2387   if (0 == buffer && GNUNET_NO == force)
2388     return;
2389
2390   /* Send available buffer space */
2391   if (GCC_is_origin (c, fwd))
2392   {
2393     GNUNET_assert (NULL != c->t);
2394     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2395     GCT_unchoke_channels (c->t);
2396   }
2397   else
2398   {
2399     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2400     send_ack (c, buffer, fwd, force);
2401   }
2402 }
2403
2404
2405 /**
2406  * Initialize the connections subsystem
2407  *
2408  * @param c Configuration handle.
2409  */
2410 void
2411 GCC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2412 {
2413   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2414   if (GNUNET_OK !=
2415       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_MSGS_QUEUE",
2416                                              &max_msgs_queue))
2417   {
2418     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2419                                "CADET", "MAX_MSGS_QUEUE", "MISSING");
2420     GNUNET_SCHEDULER_shutdown ();
2421     return;
2422   }
2423
2424   if (GNUNET_OK !=
2425       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_CONNECTIONS",
2426                                              &max_connections))
2427   {
2428     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2429                                "CADET", "MAX_CONNECTIONS", "MISSING");
2430     GNUNET_SCHEDULER_shutdown ();
2431     return;
2432   }
2433
2434   if (GNUNET_OK !=
2435       GNUNET_CONFIGURATION_get_value_time (c, "CADET", "REFRESH_CONNECTION_TIME",
2436                                            &refresh_connection_time))
2437   {
2438     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2439                                "CADET", "REFRESH_CONNECTION_TIME", "MISSING");
2440     GNUNET_SCHEDULER_shutdown ();
2441     return;
2442   }
2443   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2444   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
2445 }
2446
2447
2448 /**
2449  * Destroy each connection on shutdown.
2450  *
2451  * @param cls Closure (unused).
2452  * @param key Current key code (CID, unused).
2453  * @param value Value in the hash map (connection)
2454  *
2455  * @return #GNUNET_YES, because we should continue to iterate,
2456  */
2457 static int
2458 shutdown_iterator (void *cls,
2459                    const struct GNUNET_HashCode *key,
2460                    void *value)
2461 {
2462   struct CadetConnection *c = value;
2463
2464   GCC_destroy (c);
2465   return GNUNET_YES;
2466 }
2467
2468
2469 /**
2470  * Shut down the connections subsystem.
2471  */
2472 void
2473 GCC_shutdown (void)
2474 {
2475   GNUNET_CONTAINER_multihashmap_iterate (connections, &shutdown_iterator, NULL);
2476   GNUNET_CONTAINER_multihashmap_destroy (connections);
2477   connections = NULL;
2478 }
2479
2480
2481 struct CadetConnection *
2482 GCC_new (const struct GNUNET_CADET_Hash *cid,
2483          struct CadetTunnel *t,
2484          struct CadetPeerPath *p,
2485          unsigned int own_pos)
2486 {
2487   struct CadetConnection *c;
2488
2489   c = GNUNET_new (struct CadetConnection);
2490   c->id = *cid;
2491   GNUNET_assert (GNUNET_OK ==
2492                  GNUNET_CONTAINER_multihashmap_put (connections,
2493                                                     GCC_get_h (c), c,
2494                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2495   fc_init (&c->fwd_fc);
2496   fc_init (&c->bck_fc);
2497   c->fwd_fc.c = c;
2498   c->bck_fc.c = c;
2499
2500   c->t = t;
2501   GNUNET_assert (own_pos <= p->length - 1);
2502   c->own_pos = own_pos;
2503   c->path = p;
2504   p->c = c;
2505
2506   if (GNUNET_OK != register_neighbors (c))
2507   {
2508     if (0 == own_pos)
2509     {
2510       path_invalidate (c->path);
2511       c->t = NULL;
2512       c->path = NULL;
2513     }
2514     GCC_destroy (c);
2515     return NULL;
2516   }
2517
2518   return c;
2519 }
2520
2521
2522 void
2523 GCC_destroy (struct CadetConnection *c)
2524 {
2525   if (NULL == c)
2526   {
2527     GNUNET_break (0);
2528     return;
2529   }
2530
2531   if (2 == c->destroy) /* cancel queues -> GCP_queue_cancel -> q_destroy -> */
2532     return;            /* -> message_sent -> GCC_destroy. Don't loop. */
2533   c->destroy = 2;
2534
2535   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GCC_2s (c));
2536   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc's f: %p, b: %p\n",
2537        &c->fwd_fc, &c->bck_fc);
2538   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2539        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2540
2541   /* Cancel all traffic */
2542   if (NULL != c->path)
2543   {
2544     connection_cancel_queues (c, GNUNET_YES);
2545     connection_cancel_queues (c, GNUNET_NO);
2546     unregister_neighbors (c);
2547   }
2548
2549   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2550        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2551
2552   /* Cancel maintainance task (keepalive/timeout) */
2553   if (NULL != c->fwd_fc.poll_msg)
2554   {
2555     GCC_cancel (c->fwd_fc.poll_msg);
2556     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg FWD canceled\n");
2557   }
2558   if (NULL != c->bck_fc.poll_msg)
2559   {
2560     GCC_cancel (c->bck_fc.poll_msg);
2561     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg BCK canceled\n");
2562   }
2563
2564   /* Delete from tunnel */
2565   if (NULL != c->t)
2566     GCT_remove_connection (c->t, c);
2567
2568   if (GNUNET_NO == GCC_is_origin (c, GNUNET_YES) && NULL != c->path)
2569     path_destroy (c->path);
2570   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
2571     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
2572   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
2573     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
2574   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_fc.poll_task)
2575   {
2576     GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
2577     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL FWD canceled\n");
2578   }
2579   if (GNUNET_SCHEDULER_NO_TASK != c->bck_fc.poll_task)
2580   {
2581     GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
2582     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL BCK canceled\n");
2583   }
2584
2585   GNUNET_break (GNUNET_YES ==
2586                 GNUNET_CONTAINER_multihashmap_remove (connections,
2587                                                       GCC_get_h (c), c));
2588
2589   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
2590   GNUNET_free (c);
2591 }
2592
2593 /**
2594  * Get the connection ID.
2595  *
2596  * @param c Connection to get the ID from.
2597  *
2598  * @return ID of the connection.
2599  */
2600 const struct GNUNET_CADET_Hash *
2601 GCC_get_id (const struct CadetConnection *c)
2602 {
2603   return &c->id;
2604 }
2605
2606
2607 /**
2608  * Get the connection ID.
2609  *
2610  * @param c Connection to get the ID from.
2611  *
2612  * @return ID of the connection.
2613  */
2614 const struct GNUNET_HashCode *
2615 GCC_get_h (const struct CadetConnection *c)
2616 {
2617   return GC_h2hc (&c->id);
2618 }
2619
2620
2621 /**
2622  * Get the connection path.
2623  *
2624  * @param c Connection to get the path from.
2625  *
2626  * @return path used by the connection.
2627  */
2628 const struct CadetPeerPath *
2629 GCC_get_path (const struct CadetConnection *c)
2630 {
2631   if (GNUNET_NO == c->destroy)
2632     return c->path;
2633   return NULL;
2634 }
2635
2636
2637 /**
2638  * Get the connection state.
2639  *
2640  * @param c Connection to get the state from.
2641  *
2642  * @return state of the connection.
2643  */
2644 enum CadetConnectionState
2645 GCC_get_state (const struct CadetConnection *c)
2646 {
2647   return c->state;
2648 }
2649
2650 /**
2651  * Get the connection tunnel.
2652  *
2653  * @param c Connection to get the tunnel from.
2654  *
2655  * @return tunnel of the connection.
2656  */
2657 struct CadetTunnel *
2658 GCC_get_tunnel (const struct CadetConnection *c)
2659 {
2660   return c->t;
2661 }
2662
2663
2664 /**
2665  * Get free buffer space in a connection.
2666  *
2667  * @param c Connection.
2668  * @param fwd Is query about FWD traffic?
2669  *
2670  * @return Free buffer space [0 - max_msgs_queue/max_connections]
2671  */
2672 unsigned int
2673 GCC_get_buffer (struct CadetConnection *c, int fwd)
2674 {
2675   struct CadetFlowControl *fc;
2676
2677   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2678
2679   return (fc->queue_max - fc->queue_n);
2680 }
2681
2682 /**
2683  * Get how many messages have we allowed to send to us from a direction.
2684  *
2685  * @param c Connection.
2686  * @param fwd Are we asking about traffic from FWD (BCK messages)?
2687  *
2688  * @return last_ack_sent - last_pid_recv
2689  */
2690 unsigned int
2691 GCC_get_allowed (struct CadetConnection *c, int fwd)
2692 {
2693   struct CadetFlowControl *fc;
2694
2695   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2696   if (GC_is_pid_bigger (fc->last_pid_recv, fc->last_ack_sent))
2697   {
2698     return 0;
2699   }
2700   return (fc->last_ack_sent - fc->last_pid_recv);
2701 }
2702
2703 /**
2704  * Get messages queued in a connection.
2705  *
2706  * @param c Connection.
2707  * @param fwd Is query about FWD traffic?
2708  *
2709  * @return Number of messages queued.
2710  */
2711 unsigned int
2712 GCC_get_qn (struct CadetConnection *c, int fwd)
2713 {
2714   struct CadetFlowControl *fc;
2715
2716   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2717
2718   return fc->queue_n;
2719 }
2720
2721
2722 /**
2723  * Get next PID to use.
2724  *
2725  * @param c Connection.
2726  * @param fwd Is query about FWD traffic?
2727  *
2728  * @return Last PID used + 1.
2729  */
2730 unsigned int
2731 GCC_get_pid (struct CadetConnection *c, int fwd)
2732 {
2733   struct CadetFlowControl *fc;
2734
2735   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2736
2737   return fc->last_pid_sent + 1;
2738 }
2739
2740
2741 /**
2742  * Allow the connection to advertise a buffer of the given size.
2743  *
2744  * The connection will send an @c fwd ACK message (so: in direction !fwd)
2745  * allowing up to last_pid_recv + buffer.
2746  *
2747  * @param c Connection.
2748  * @param buffer How many more messages the connection can accept.
2749  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
2750  */
2751 void
2752 GCC_allow (struct CadetConnection *c, unsigned int buffer, int fwd)
2753 {
2754   LOG (GNUNET_ERROR_TYPE_DEBUG, "  allowing %s %u messages %s\n",
2755        GCC_2s (c), buffer, GC_f2s (fwd));
2756   send_ack (c, buffer, fwd, GNUNET_NO);
2757 }
2758
2759
2760 /**
2761  * Notify other peers on a connection of a broken link. Mark connections
2762  * to destroy after all traffic has been sent.
2763  *
2764  * @param c Connection on which there has been a disconnection.
2765  * @param peer Peer that disconnected.
2766  */
2767 void
2768 GCC_notify_broken (struct CadetConnection *c,
2769                    struct CadetPeer *peer)
2770 {
2771   int fwd;
2772
2773   LOG (GNUNET_ERROR_TYPE_DEBUG,
2774        " notify broken on %s due to %s disconnect\n",
2775        GCC_2s (c), GCP_2s (peer));
2776
2777   fwd = peer == get_prev_hop (c);
2778
2779   if (GNUNET_YES == GCC_is_terminal (c, fwd))
2780   {
2781     /* Local shutdown, no one to notify about this. */
2782     GCC_destroy (c);
2783     return;
2784   }
2785   if (GNUNET_NO == c->destroy)
2786     send_broken (c, &my_full_id, GCP_get_id (peer), fwd);
2787
2788   /* Connection will have at least one pending message
2789    * (the one we just scheduled), so no point in checking whether to
2790    * destroy immediately. */
2791   c->destroy = GNUNET_YES;
2792   c->state = CADET_CONNECTION_DESTROYED;
2793
2794   /**
2795    * Cancel all queues, if no message is left, connection will be destroyed.
2796    */
2797   connection_cancel_queues (c, !fwd);
2798
2799   return;
2800 }
2801
2802
2803 /**
2804  * Is this peer the first one on the connection?
2805  *
2806  * @param c Connection.
2807  * @param fwd Is this about fwd traffic?
2808  *
2809  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
2810  */
2811 int
2812 GCC_is_origin (struct CadetConnection *c, int fwd)
2813 {
2814   if (!fwd && c->path->length - 1 == c->own_pos )
2815     return GNUNET_YES;
2816   if (fwd && 0 == c->own_pos)
2817     return GNUNET_YES;
2818   return GNUNET_NO;
2819 }
2820
2821
2822 /**
2823  * Is this peer the last one on the connection?
2824  *
2825  * @param c Connection.
2826  * @param fwd Is this about fwd traffic?
2827  *            Note that the ROOT is the terminal for BCK traffic!
2828  *
2829  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
2830  */
2831 int
2832 GCC_is_terminal (struct CadetConnection *c, int fwd)
2833 {
2834   return GCC_is_origin (c, !fwd);
2835 }
2836
2837
2838 /**
2839  * See if we are allowed to send by the next hop in the given direction.
2840  *
2841  * @param c Connection.
2842  * @param fwd Is this about fwd traffic?
2843  *
2844  * @return #GNUNET_YES in case it's OK to send.
2845  */
2846 int
2847 GCC_is_sendable (struct CadetConnection *c, int fwd)
2848 {
2849   struct CadetFlowControl *fc;
2850
2851   LOG (GNUNET_ERROR_TYPE_DEBUG, " checking sendability of %s traffic on %s\n",
2852        GC_f2s (fwd), GCC_2s (c));
2853   if (NULL == c)
2854   {
2855     GNUNET_break (0);
2856     return GNUNET_YES;
2857   }
2858   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2859   LOG (GNUNET_ERROR_TYPE_DEBUG,
2860        " last ack recv: %u, last pid sent: %u\n",
2861        fc->last_ack_recv, fc->last_pid_sent);
2862   if (GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2863   {
2864     LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable\n");
2865     return GNUNET_YES;
2866   }
2867   LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n");
2868   return GNUNET_NO;
2869 }
2870
2871
2872 /**
2873  * Check if this connection is a direct one (never trim a direct connection).
2874  *
2875  * @param c Connection.
2876  *
2877  * @return #GNUNET_YES in case it's a direct connection, #GNUNET_NO otherwise.
2878  */
2879 int
2880 GCC_is_direct (struct CadetConnection *c)
2881 {
2882   return (c->path->length == 2) ? GNUNET_YES : GNUNET_NO;
2883 }
2884
2885 /**
2886  * Sends an already built message on a connection, properly registering
2887  * all used resources.
2888  *
2889  * @param message Message to send. Function makes a copy of it.
2890  *                If message is not hop-by-hop, decrements TTL of copy.
2891  * @param payload_type Type of payload, in case the message is encrypted.
2892  * @param c Connection on which this message is transmitted.
2893  * @param fwd Is this a fwd message?
2894  * @param force Force the connection to accept the message (buffer overfill).
2895  * @param cont Continuation called once message is sent. Can be NULL.
2896  * @param cont_cls Closure for @c cont.
2897  *
2898  * @return Handle to cancel the message before it's sent.
2899  *         NULL on error or if @c cont is NULL.
2900  *         Invalid on @c cont call.
2901  */
2902 struct CadetConnectionQueue *
2903 GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2904                            uint16_t payload_type, uint32_t payload_id,
2905                            struct CadetConnection *c, int fwd, int force,
2906                            GCC_sent cont, void *cont_cls)
2907 {
2908   struct CadetFlowControl *fc;
2909   struct CadetConnectionQueue *q;
2910   void *data;
2911   size_t size;
2912   uint16_t type;
2913   int droppable;
2914
2915   size = ntohs (message->size);
2916   data = GNUNET_malloc (size);
2917   memcpy (data, message, size);
2918   type = ntohs (message->type);
2919   LOG (GNUNET_ERROR_TYPE_INFO, "--> %s (%s %u) on connection %s (%u bytes)\n",
2920        GC_m2s (type), GC_m2s (payload_type), payload_id, GCC_2s (c), size);
2921
2922   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2923   droppable = GNUNET_NO == force;
2924   switch (type)
2925   {
2926     struct GNUNET_CADET_Encrypted *emsg;
2927     struct GNUNET_CADET_KX        *kmsg;
2928     struct GNUNET_CADET_ACK       *amsg;
2929     struct GNUNET_CADET_Poll      *pmsg;
2930     struct GNUNET_CADET_ConnectionDestroy *dmsg;
2931     struct GNUNET_CADET_ConnectionBroken  *bmsg;
2932     uint32_t ttl;
2933
2934     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
2935       emsg = (struct GNUNET_CADET_Encrypted *) data;
2936       ttl = ntohl (emsg->ttl);
2937       if (0 == ttl)
2938       {
2939         GNUNET_break_op (0);
2940         GNUNET_free (data);
2941         return NULL;
2942       }
2943       emsg->cid = c->id;
2944       emsg->ttl = htonl (ttl - 1);
2945       emsg->pid = htonl (0);
2946       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
2947       LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
2948       LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
2949       if (GNUNET_YES == droppable)
2950       {
2951         fc->queue_n++;
2952       }
2953       else
2954       {
2955         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not droppable, Q_N stays the same\n");
2956       }
2957       if (GC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
2958       {
2959         GCC_start_poll (c, fwd);
2960       }
2961       break;
2962
2963     case GNUNET_MESSAGE_TYPE_CADET_KX:
2964       kmsg = (struct GNUNET_CADET_KX *) data;
2965       kmsg->cid = c->id;
2966       break;
2967
2968     case GNUNET_MESSAGE_TYPE_CADET_ACK:
2969       amsg = (struct GNUNET_CADET_ACK *) data;
2970       amsg->cid = c->id;
2971       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
2972       droppable = GNUNET_NO;
2973       break;
2974
2975     case GNUNET_MESSAGE_TYPE_CADET_POLL:
2976       pmsg = (struct GNUNET_CADET_Poll *) data;
2977       pmsg->cid = c->id;
2978       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
2979       droppable = GNUNET_NO;
2980       break;
2981
2982     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
2983       dmsg = (struct GNUNET_CADET_ConnectionDestroy *) data;
2984       dmsg->cid = c->id;
2985       break;
2986
2987     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
2988       bmsg = (struct GNUNET_CADET_ConnectionBroken *) data;
2989       bmsg->cid = c->id;
2990       break;
2991
2992     case GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE:
2993       GNUNET_break (0);
2994       /* falltrough */
2995     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
2996     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
2997       break;
2998
2999     default:
3000       GNUNET_break (0);
3001       GNUNET_free (data);
3002       return NULL;
3003   }
3004
3005   if (fc->queue_n > fc->queue_max && droppable)
3006   {
3007     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
3008                               1, GNUNET_NO);
3009     GNUNET_break (0);
3010     LOG (GNUNET_ERROR_TYPE_DEBUG, "queue full: %u/%u\n",
3011          fc->queue_n, fc->queue_max);
3012     if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == type)
3013     {
3014       fc->queue_n--;
3015     }
3016     GNUNET_free (data);
3017     return NULL; /* Drop this message */
3018   }
3019
3020   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %s %u\n",
3021        GCC_2s (c), c->pending_messages);
3022   c->pending_messages++;
3023
3024   q = GNUNET_new (struct CadetConnectionQueue);
3025   q->forced = !droppable;
3026   q->q = GCP_queue_add (get_hop (c, fwd), data, type, payload_type, payload_id,
3027                         size, c, fwd, &conn_message_sent, q);
3028   if (NULL == q->q)
3029   {
3030     LOG (GNUNET_ERROR_TYPE_DEBUG, "dropping msg on %s, NULL q\n", GCC_2s (c));
3031     GNUNET_free (data);
3032     GNUNET_free (q);
3033     return NULL;
3034   }
3035   q->cont = cont;
3036   q->cont_cls = cont_cls;
3037   return (NULL == cont) ? NULL : q;
3038 }
3039
3040
3041 /**
3042  * Cancel a previously sent message while it's in the queue.
3043  *
3044  * ONLY can be called before the continuation given to the send function
3045  * is called. Once the continuation is called, the message is no longer in the
3046  * queue.
3047  *
3048  * @param q Handle to the queue.
3049  */
3050 void
3051 GCC_cancel (struct CadetConnectionQueue *q)
3052 {
3053   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GMC cancel message\n");
3054
3055   /* queue destroy calls message_sent, which calls q->cont and frees q */
3056   GCP_queue_destroy (q->q, GNUNET_YES, GNUNET_NO, 0);
3057 }
3058
3059
3060 /**
3061  * Sends a CREATE CONNECTION message for a path to a peer.
3062  * Changes the connection and tunnel states if necessary.
3063  *
3064  * @param connection Connection to create.
3065  */
3066 void
3067 GCC_send_create (struct CadetConnection *connection)
3068 {
3069   enum CadetTunnelCState state;
3070   size_t size;
3071
3072   size = sizeof (struct GNUNET_CADET_ConnectionCreate);
3073   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
3074
3075   LOG (GNUNET_ERROR_TYPE_INFO, "===> %s on connection %s  (%u bytes)\n",
3076        GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE),
3077        GCC_2s (connection), size);
3078   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
3079        connection, connection->pending_messages);
3080   connection->pending_messages++;
3081
3082   connection->maintenance_q =
3083     GCP_queue_add (get_next_hop (connection), NULL,
3084                    GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0, 0,
3085                    size, connection, GNUNET_YES, &conn_message_sent, NULL);
3086
3087   state = GCT_get_cstate (connection->t);
3088   if (CADET_TUNNEL_SEARCHING == state || CADET_TUNNEL_NEW == state)
3089     GCT_change_cstate (connection->t, CADET_TUNNEL_WAITING);
3090   if (CADET_CONNECTION_NEW == connection->state)
3091     connection_change_state (connection, CADET_CONNECTION_SENT);
3092 }
3093
3094
3095 /**
3096  * Send a message to all peers in this connection that the connection
3097  * is no longer valid.
3098  *
3099  * If some peer should not receive the message, it should be zero'ed out
3100  * before calling this function.
3101  *
3102  * @param c The connection whose peers to notify.
3103  */
3104 void
3105 GCC_send_destroy (struct CadetConnection *c)
3106 {
3107   struct GNUNET_CADET_ConnectionDestroy msg;
3108
3109   if (GNUNET_YES == c->destroy)
3110     return;
3111
3112   msg.header.size = htons (sizeof (msg));
3113   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
3114   msg.cid = c->id;
3115   LOG (GNUNET_ERROR_TYPE_DEBUG,
3116               "  sending connection destroy for connection %s\n",
3117               GCC_2s (c));
3118
3119   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_YES))
3120     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c,
3121                                                       GNUNET_YES, GNUNET_YES,
3122                                                       NULL, NULL));
3123   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_NO))
3124     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c,
3125                                                       GNUNET_NO, GNUNET_YES,
3126                                                       NULL, NULL));
3127   c->destroy = GNUNET_YES;
3128   c->state = CADET_CONNECTION_DESTROYED;
3129 }
3130
3131
3132 /**
3133  * @brief Start a polling timer for the connection.
3134  *
3135  * When a neighbor does not accept more traffic on the connection it could be
3136  * caused by a simple congestion or by a lost ACK. Polling enables to check
3137  * for the lastest ACK status for a connection.
3138  *
3139  * @param c Connection.
3140  * @param fwd Should we poll in the FWD direction?
3141  */
3142 void
3143 GCC_start_poll (struct CadetConnection *c, int fwd)
3144 {
3145   struct CadetFlowControl *fc;
3146
3147   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3148   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL %s requested\n",
3149        GC_f2s (fwd));
3150   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task || NULL != fc->poll_msg)
3151   {
3152     LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   not needed (%u, %p)\n",
3153          fc->poll_task, fc->poll_msg);
3154     return;
3155   }
3156   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL started on request\n");
3157   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
3158                                                 &connection_poll,
3159                                                 fc);
3160 }
3161
3162
3163 /**
3164  * @brief Stop polling a connection for ACKs.
3165  *
3166  * Once we have enough ACKs for future traffic, polls are no longer necessary.
3167  *
3168  * @param c Connection.
3169  * @param fwd Should we stop the poll in the FWD direction?
3170  */
3171 void
3172 GCC_stop_poll (struct CadetConnection *c, int fwd)
3173 {
3174   struct CadetFlowControl *fc;
3175
3176   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3177   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
3178   {
3179     GNUNET_SCHEDULER_cancel (fc->poll_task);
3180     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
3181   }
3182 }
3183
3184 /**
3185  * Get a (static) string for a connection.
3186  *
3187  * @param c Connection.
3188  */
3189 const char *
3190 GCC_2s (const struct CadetConnection *c)
3191 {
3192   if (NULL == c)
3193     return "NULL";
3194
3195   if (NULL != c->t)
3196   {
3197     static char buf[128];
3198
3199     sprintf (buf, "%s (->%s)",
3200              GNUNET_h2s (GC_h2hc (GCC_get_id (c))), GCT_2s (c->t));
3201     return buf;
3202   }
3203   return GNUNET_h2s (GC_h2hc (&c->id));
3204 }
3205
3206
3207 /**
3208  * Log all possible info about the connection state.
3209  *
3210  * @param c Connection to debug.
3211  * @param level Debug level to use.
3212  */
3213 void
3214 GCC_debug (const struct CadetConnection *c, enum GNUNET_ErrorType level)
3215 {
3216   int do_log;
3217   char *s;
3218
3219   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
3220                                        "cadet-con",
3221                                        __FILE__, __FUNCTION__, __LINE__);
3222   if (0 == do_log)
3223     return;
3224
3225   if (NULL == c)
3226   {
3227     LOG2 (level, "CCC DEBUG NULL CONNECTION\n");
3228     return;
3229   }
3230
3231   LOG2 (level, "CCC DEBUG CONNECTION %s\n", GCC_2s (c));
3232   s = path_2s (c->path);
3233   LOG2 (level, "CCC  path %s, own pos: %u\n", s, c->own_pos);
3234   GNUNET_free (s);
3235   LOG2 (level, "CCC  state: %s, destroy: %u\n",
3236         GCC_state2s (c->state), c->destroy);
3237   LOG2 (level, "CCC  pending messages: %u\n", c->pending_messages);
3238   if (NULL != c->perf)
3239     LOG2 (level, "CCC  us/byte: %f\n", c->perf->avg);
3240
3241   LOG2 (level, "CCC  FWD flow control:\n");
3242   LOG2 (level, "CCC   queue: %u/%u\n", c->fwd_fc.queue_n, c->fwd_fc.queue_max);
3243   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3244         c->fwd_fc.last_pid_sent, c->fwd_fc.last_pid_recv);
3245   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3246         c->fwd_fc.last_ack_sent, c->fwd_fc.last_ack_recv);
3247   LOG2 (level, "CCC   POLL: task %d, msg  %p, msg_ack %p)\n",
3248         c->fwd_fc.poll_task, c->fwd_fc.poll_msg, c->fwd_fc.ack_msg);
3249
3250   LOG2 (level, "CCC  BCK flow control:\n");
3251   LOG2 (level, "CCC   queue: %u/%u\n", c->bck_fc.queue_n, c->bck_fc.queue_max);
3252   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3253         c->bck_fc.last_pid_sent, c->bck_fc.last_pid_recv);
3254   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3255         c->bck_fc.last_ack_sent, c->bck_fc.last_ack_recv);
3256   LOG2 (level, "CCC   POLL: task %d, msg  %p, msg_ack %p)\n",
3257         c->bck_fc.poll_task, c->bck_fc.poll_msg, c->bck_fc.ack_msg);
3258
3259   LOG2 (level, "CCC DEBUG CONNECTION END\n");
3260 }
3261