5c28f9e4c65935348c44d66b2bf66290f4d62d7d
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_connection.c
1 /*
2      This file is part of GNUnet.
3      (C) 2001-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file cadet/gnunet-service-cadet_connection.c
23  * @brief GNUnet CADET service connection handling
24  * @author Bartlomiej Polot
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30 #include "gnunet_statistics_service.h"
31
32 #include "cadet_path.h"
33 #include "cadet_protocol.h"
34 #include "cadet.h"
35 #include "gnunet-service-cadet_connection.h"
36 #include "gnunet-service-cadet_peer.h"
37 #include "gnunet-service-cadet_tunnel.h"
38
39
40 #define LOG(level, ...) GNUNET_log_from (level,"cadet-con",__VA_ARGS__)
41 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
42
43
44 #define CADET_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
45                                   GNUNET_TIME_UNIT_MINUTES,\
46                                   10)
47 #define AVG_MSGS                32
48
49
50 /******************************************************************************/
51 /********************************   STRUCTS  **********************************/
52 /******************************************************************************/
53
54 /**
55  * Struct to encapsulate all the Flow Control information to a peer to which
56  * we are directly connected (on a core level).
57  */
58 struct CadetFlowControl
59 {
60   /**
61    * Connection this controls.
62    */
63   struct CadetConnection *c;
64
65   /**
66    * How many messages are in the queue on this connection.
67    */
68   unsigned int queue_n;
69
70   /**
71    * How many messages do we accept in the queue.
72    */
73   unsigned int queue_max;
74
75   /**
76    * ID of the last packet sent towards the peer.
77    */
78   uint32_t last_pid_sent;
79
80   /**
81    * ID of the last packet received from the peer.
82    */
83   uint32_t last_pid_recv;
84
85   /**
86    * Last ACK sent to the peer (peer can't send more than this PID).
87    */
88   uint32_t last_ack_sent;
89
90   /**
91    * Last ACK sent towards the origin (for traffic towards leaf node).
92    */
93   uint32_t last_ack_recv;
94
95   /**
96    * Task to poll the peer in case of a lost ACK causes stall.
97    */
98   GNUNET_SCHEDULER_TaskIdentifier poll_task;
99
100   /**
101    * How frequently to poll for ACKs.
102    */
103   struct GNUNET_TIME_Relative poll_time;
104
105   /**
106    * Queued poll message, to cancel if not necessary anymore (got ACK).
107    */
108   struct CadetConnectionQueue *poll_msg;
109
110   /**
111    * Queued poll message, to cancel if not necessary anymore (got ACK).
112    */
113   struct CadetConnectionQueue *ack_msg;
114 };
115
116 /**
117  * Keep a record of the last messages sent on this connection.
118  */
119 struct CadetConnectionPerformance
120 {
121   /**
122    * Circular buffer for storing measurements.
123    */
124   double usecsperbyte[AVG_MSGS];
125
126   /**
127    * Running average of @c usecsperbyte.
128    */
129   double avg;
130
131   /**
132    * How many values of @c usecsperbyte are valid.
133    */
134   uint16_t size;
135
136   /**
137    * Index of the next "free" position in @c usecsperbyte.
138    */
139   uint16_t idx;
140 };
141
142
143 /**
144  * Struct containing all information regarding a connection to a peer.
145  */
146 struct CadetConnection
147 {
148   /**
149    * Tunnel this connection is part of.
150    */
151   struct CadetTunnel *t;
152
153   /**
154    * Flow control information for traffic fwd.
155    */
156   struct CadetFlowControl fwd_fc;
157
158   /**
159    * Flow control information for traffic bck.
160    */
161   struct CadetFlowControl bck_fc;
162
163   /**
164    * Measure connection performance on the endpoint.
165    */
166   struct CadetConnectionPerformance *perf;
167
168   /**
169    * ID of the connection.
170    */
171   struct GNUNET_CADET_Hash id;
172
173   /**
174    * State of the connection.
175    */
176   enum CadetConnectionState state;
177
178   /**
179    * Path being used for the tunnel. At the origin of the connection
180    * it's a pointer to the destination's path pool, otherwise just a copy.
181    */
182   struct CadetPeerPath *path;
183
184   /**
185    * Position of the local peer in the path.
186    */
187   unsigned int own_pos;
188
189   /**
190    * Task to keep the used paths alive at the owner,
191    * time tunnel out on all the other peers.
192    */
193   GNUNET_SCHEDULER_TaskIdentifier fwd_maintenance_task;
194
195   /**
196    * Task to keep the used paths alive at the destination,
197    * time tunnel out on all the other peers.
198    */
199   GNUNET_SCHEDULER_TaskIdentifier bck_maintenance_task;
200
201   /**
202    * Queue handle for maintainance traffic. One handle for FWD and BCK since
203    * one peer never needs to maintain both directions (no loopback connections).
204    */
205   struct CadetPeerQueue *maintenance_q;
206
207   /**
208    * Counter to do exponential backoff when creating a connection (max 64).
209    */
210   unsigned short create_retry;
211
212   /**
213    * Pending message count.
214    */
215   int pending_messages;
216
217   /**
218    * Destroy flag: if true, destroy on last message.
219    */
220   int destroy;
221 };
222
223 /**
224  * Handle for messages queued but not yet sent.
225  */
226 struct CadetConnectionQueue
227 {
228   /**
229    * Peer queue handle, to cancel if necessary.
230    */
231   struct CadetPeerQueue *q;
232
233   /**
234    * Was this a forced message? (Do not account for it)
235    */
236   int forced;
237
238   /**
239    * Continuation to call once sent.
240    */
241   GCC_sent cont;
242
243   /**
244    * Closure for @c cont.
245    */
246   void *cont_cls;
247 };
248
249 /******************************************************************************/
250 /*******************************   GLOBALS  ***********************************/
251 /******************************************************************************/
252
253 /**
254  * Global handle to the statistics service.
255  */
256 extern struct GNUNET_STATISTICS_Handle *stats;
257
258 /**
259  * Local peer own ID (memory efficient handle).
260  */
261 extern GNUNET_PEER_Id myid;
262
263 /**
264  * Local peer own ID (full value).
265  */
266 extern struct GNUNET_PeerIdentity my_full_id;
267
268 /**
269  * Connections known, indexed by cid (CadetConnection).
270  */
271 static struct GNUNET_CONTAINER_MultiHashMap *connections;
272
273 /**
274  * How many connections are we willing to maintain.
275  * Local connections are always allowed, even if there are more connections than max.
276  */
277 static unsigned long long max_connections;
278
279 /**
280  * How many messages *in total* are we willing to queue, divide by number of
281  * connections to get connection queue size.
282  */
283 static unsigned long long max_msgs_queue;
284
285 /**
286  * How often to send path keepalives. Paths timeout after 4 missed.
287  */
288 static struct GNUNET_TIME_Relative refresh_connection_time;
289
290 /**
291  * How often to send path create / ACKs.
292  */
293 static struct GNUNET_TIME_Relative create_connection_time;
294
295
296 /******************************************************************************/
297 /********************************   STATIC  ***********************************/
298 /******************************************************************************/
299
300 #if 0 // avoid compiler warning for unused static function
301 static void
302 fc_debug (struct CadetFlowControl *fc)
303 {
304   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
305               fc->last_pid_recv, fc->last_ack_sent);
306   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
307               fc->last_pid_sent, fc->last_ack_recv);
308   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
309               fc->queue_n, fc->queue_max);
310 }
311
312 static void
313 connection_debug (struct CadetConnection *c)
314 {
315   if (NULL == c)
316   {
317     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CONNECTION ***\n");
318     return;
319   }
320   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
321               peer2s (c->t->peer), GCC_2s (c));
322   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
323               c->state, c->pending_messages);
324   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
325   fc_debug (&c->fwd_fc);
326   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
327   fc_debug (&c->bck_fc);
328 }
329 #endif
330
331
332 /**
333  * Schedule next keepalive task, taking in consideration
334  * the connection state and number of retries.
335  *
336  * @param c Connection for which to schedule the next keepalive.
337  * @param fwd Direction for the next keepalive.
338  */
339 static void
340 schedule_next_keepalive (struct CadetConnection *c, int fwd);
341
342
343 /**
344  * Resets the connection timeout task, some other message has done the
345  * task's job.
346  * - For the first peer on the direction this means to send
347  *   a keepalive or a path confirmation message (either create or ACK).
348  * - For all other peers, this means to destroy the connection,
349  *   due to lack of activity.
350  * Starts the timeout if no timeout was running (connection just created).
351  *
352  * @param c Connection whose timeout to reset.
353  * @param fwd Is this forward?
354  */
355 static void
356 connection_reset_timeout (struct CadetConnection *c, int fwd);
357
358
359 /**
360  * Get string description for tunnel state. Reentrant.
361  *
362  * @param s Tunnel state.
363  *
364  * @return String representation.
365  */
366 static const char *
367 GCC_state2s (enum CadetConnectionState s)
368 {
369   switch (s)
370   {
371     case CADET_CONNECTION_NEW:
372       return "CADET_CONNECTION_NEW";
373     case CADET_CONNECTION_SENT:
374       return "CADET_CONNECTION_SENT";
375     case CADET_CONNECTION_ACK:
376       return "CADET_CONNECTION_ACK";
377     case CADET_CONNECTION_READY:
378       return "CADET_CONNECTION_READY";
379     case CADET_CONNECTION_DESTROYED:
380       return "CADET_CONNECTION_DESTROYED";
381     case CADET_CONNECTION_BROKEN:
382       return "CADET_CONNECTION_BROKEN";
383     default:
384       GNUNET_break (0);
385       LOG (GNUNET_ERROR_TYPE_ERROR, " conn state %u unknown!\n", s);
386       return "CADET_CONNECTION_STATE_ERROR";
387   }
388 }
389
390
391 /**
392  * Initialize a Flow Control structure to the initial state.
393  *
394  * @param fc Flow Control structure to initialize.
395  */
396 static void
397 fc_init (struct CadetFlowControl *fc)
398 {
399   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
400   fc->last_pid_recv = (uint32_t) -1;
401   fc->last_ack_sent = (uint32_t) 0;
402   fc->last_ack_recv = (uint32_t) 0;
403   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
404   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
405   fc->queue_n = 0;
406   fc->queue_max = (max_msgs_queue / max_connections) + 1;
407 }
408
409
410 /**
411  * Find a connection.
412  *
413  * @param cid Connection ID.
414  */
415 static struct CadetConnection *
416 connection_get (const struct GNUNET_CADET_Hash *cid)
417 {
418   return GNUNET_CONTAINER_multihashmap_get (connections, GC_h2hc (cid));
419 }
420
421
422 static void
423 connection_change_state (struct CadetConnection* c,
424                          enum CadetConnectionState state)
425 {
426   LOG (GNUNET_ERROR_TYPE_DEBUG,
427        "Connection %s state %s -> %s\n",
428        GCC_2s (c), GCC_state2s (c->state), GCC_state2s (state));
429   if (CADET_CONNECTION_DESTROYED <= c->state) /* Destroyed or broken. */
430   {
431     LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
432     return;
433   }
434   c->state = state;
435   if (CADET_CONNECTION_READY == state)
436     c->create_retry = 1;
437 }
438
439
440 /**
441  * Callback called when a queued ACK message is sent.
442  *
443  * @param cls Closure (FC).
444  * @param c Connection this message was on.
445  * @param q Queue handler this call invalidates.
446  * @param type Type of message sent.
447  * @param fwd Was this a FWD going message?
448  * @param size Size of the message.
449  */
450 static void
451 ack_sent (void *cls,
452           struct CadetConnection *c,
453           struct CadetConnectionQueue *q,
454           uint16_t type, int fwd, size_t size)
455 {
456   struct CadetFlowControl *fc = cls;
457
458   fc->ack_msg = NULL;
459 }
460
461
462 /**
463  * Send an ACK on the connection, informing the predecessor about
464  * the available buffer space. Should not be called in case the peer
465  * is origin (no predecessor) in the @c fwd direction.
466  *
467  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
468  * the ACK itself goes "back" (dest->root).
469  *
470  * @param c Connection on which to send the ACK.
471  * @param buffer How much space free to advertise?
472  * @param fwd Is this FWD ACK? (Going dest -> root)
473  * @param force Don't optimize out.
474  */
475 static void
476 send_ack (struct CadetConnection *c, unsigned int buffer, int fwd, int force)
477 {
478   struct CadetFlowControl *next_fc;
479   struct CadetFlowControl *prev_fc;
480   struct GNUNET_CADET_ACK msg;
481   uint32_t ack;
482   int delta;
483
484   /* If origin, there is no connection to send ACKs. Wrong function! */
485   if (GCC_is_origin (c, fwd))
486   {
487     LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s is origin in %s\n",
488          GCC_2s (c), GC_f2s (fwd));
489     GNUNET_break (0);
490     return;
491   }
492
493   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
494   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
495
496   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection send %s ack on %s\n",
497        GC_f2s (fwd), GCC_2s (c));
498
499   /* Check if we need to transmit the ACK. */
500   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
501   if (3 < delta && buffer < delta && GNUNET_NO == force)
502   {
503     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
504     LOG (GNUNET_ERROR_TYPE_DEBUG,
505          "  last pid recv: %u, last ack sent: %u\n",
506          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
507     return;
508   }
509
510   /* Ok, ACK might be necessary, what PID to ACK? */
511   ack = prev_fc->last_pid_recv + buffer;
512   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
513   LOG (GNUNET_ERROR_TYPE_DEBUG,
514        " last pid %u, last ack %u, qmax %u, q %u\n",
515        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
516        next_fc->queue_max, next_fc->queue_n);
517   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
518   {
519     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
520     return;
521   }
522
523   /* Check if message is already in queue */
524   if (NULL != prev_fc->ack_msg)
525   {
526     if (GC_is_pid_bigger (ack, prev_fc->last_ack_sent))
527     {
528       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
529       GCC_cancel (prev_fc->ack_msg);
530       /* GCC_cancel triggers ack_sent(), which clears fc->ack_msg */
531     }
532     else
533     {
534       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
535       return;
536     }
537   }
538
539   prev_fc->last_ack_sent = ack;
540
541   /* Build ACK message and send on connection */
542   msg.header.size = htons (sizeof (msg));
543   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_ACK);
544   msg.ack = htonl (ack);
545   msg.cid = c->id;
546
547   prev_fc->ack_msg = GCC_send_prebuilt_message (&msg.header, 0, ack, c,
548                                                 !fwd, GNUNET_YES,
549                                                 &ack_sent, prev_fc);
550   GNUNET_assert (NULL != prev_fc->ack_msg);
551 }
552
553
554 /**
555  * Callback called when a connection queued message is sent.
556  *
557  * Calculates the average time and connection packet tracking.
558  *
559  * @param cls Closure (ConnectionQueue Handle).
560  * @param c Connection this message was on.
561  * @param sent Was it really sent? (Could have been canceled)
562  * @param type Type of message sent.
563  * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
564  * @param fwd Was this a FWD going message?
565  * @param size Size of the message.
566  * @param wait Time spent waiting for core (only the time for THIS message)
567  *
568  * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise.
569  */
570 static int
571 conn_message_sent (void *cls,
572                    struct CadetConnection *c, int sent,
573                    uint16_t type, uint32_t pid, int fwd, size_t size,
574                    struct GNUNET_TIME_Relative wait)
575 {
576   struct CadetConnectionPerformance *p;
577   struct CadetFlowControl *fc;
578   struct CadetConnectionQueue *q = cls;
579   double usecsperbyte;
580   int forced;
581
582   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
583
584   fc = fwd ? &c->fwd_fc : &c->bck_fc;
585   LOG (GNUNET_ERROR_TYPE_DEBUG, " %ssent %s %s\n",
586        sent ? "" : "not ", GC_f2s (fwd), GC_m2s (type));
587   if (NULL != q)
588   {
589     forced = q->forced;
590     if (NULL != q->cont)
591     {
592       LOG (GNUNET_ERROR_TYPE_DEBUG, " calling cont\n");
593       q->cont (q->cont_cls, c, q, type, fwd, size);
594     }
595     GNUNET_free (q);
596   }
597   else if (type == GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED)
598   {
599     /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
600     forced = GNUNET_YES;
601   }
602   else
603   {
604     forced = GNUNET_NO;
605   }
606   if (NULL == c)
607   {
608     if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
609         && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
610     {
611       LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
612            GC_m2s (type));
613     }
614     return GNUNET_NO;
615   }
616   LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P- %p %u\n", c, c->pending_messages);
617   c->pending_messages--;
618   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
619   {
620     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
621     GCC_destroy (c);
622     return GNUNET_YES;
623   }
624   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
625   switch (type)
626   {
627     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
628     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
629       c->maintenance_q = NULL;
630       /* Don't trigger a keepalive for sent ACKs, only SYN and SYNACKs */
631       if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE == type || !fwd)
632         schedule_next_keepalive (c, fwd);
633       break;
634
635     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
636       if (GNUNET_YES == sent)
637       {
638         GNUNET_assert (NULL != q);
639         fc->last_pid_sent = pid; // FIXME
640         GCC_send_ack (c, fwd, GNUNET_NO);
641         connection_reset_timeout (c, fwd);
642       }
643
644       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
645       if (GNUNET_NO == forced)
646       {
647         fc->queue_n--;
648         LOG (GNUNET_ERROR_TYPE_DEBUG,
649             "!   accounting pid %u\n",
650             fc->last_pid_sent);
651       }
652       else
653       {
654         LOG (GNUNET_ERROR_TYPE_DEBUG,
655              "!   forced, Q_N not accounting pid %u\n",
656              fc->last_pid_sent);
657       }
658       break;
659
660     case GNUNET_MESSAGE_TYPE_CADET_KX:
661       if (GNUNET_YES == sent)
662         connection_reset_timeout (c, fwd);
663       break;
664
665     case GNUNET_MESSAGE_TYPE_CADET_POLL:
666       fc->poll_msg = NULL;
667       break;
668
669     case GNUNET_MESSAGE_TYPE_CADET_ACK:
670       fc->ack_msg = NULL;
671       break;
672
673     default:
674       break;
675   }
676   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
677
678   if (NULL == c->perf)
679     return GNUNET_NO; /* Only endpoints are interested in timing. */
680
681   p = c->perf;
682   usecsperbyte = ((double) wait.rel_value_us) / size;
683   if (p->size == AVG_MSGS)
684   {
685     /* Array is full. Substract oldest value, add new one and store. */
686     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
687     p->usecsperbyte[p->idx] = usecsperbyte;
688     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
689   }
690   else
691   {
692     /* Array not yet full. Add current value to avg and store. */
693     p->usecsperbyte[p->idx] = usecsperbyte;
694     p->avg *= p->size;
695     p->avg += p->usecsperbyte[p->idx];
696     p->size++;
697     p->avg /= p->size;
698   }
699   p->idx = (p->idx + 1) % AVG_MSGS;
700   return GNUNET_NO;
701 }
702
703
704 /**
705  * Get the previous hop in a connection
706  *
707  * @param c Connection.
708  *
709  * @return Previous peer in the connection.
710  */
711 static struct CadetPeer *
712 get_prev_hop (const struct CadetConnection *c)
713 {
714   GNUNET_PEER_Id id;
715
716   LOG (GNUNET_ERROR_TYPE_DEBUG, " get prev hop %s [%u/%u]\n",
717        GCC_2s (c), c->own_pos, c->path->length);
718   if (0 == c->own_pos || c->path->length < 2)
719     id = c->path->peers[0];
720   else
721     id = c->path->peers[c->own_pos - 1];
722
723   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
724        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
725
726   return GCP_get_short (id);
727 }
728
729
730 /**
731  * Get the next hop in a connection
732  *
733  * @param c Connection.
734  *
735  * @return Next peer in the connection.
736  */
737 static struct CadetPeer *
738 get_next_hop (const struct CadetConnection *c)
739 {
740   GNUNET_PEER_Id id;
741
742   LOG (GNUNET_ERROR_TYPE_DEBUG, " get next hop %s [%u/%u]\n",
743        GCC_2s (c), c->own_pos, c->path->length);
744   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
745     id = c->path->peers[c->path->length - 1];
746   else
747     id = c->path->peers[c->own_pos + 1];
748
749   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
750        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
751
752   return GCP_get_short (id);
753 }
754
755
756 /**
757  * Get the hop in a connection.
758  *
759  * @param c Connection.
760  * @param fwd Next hop?
761  *
762  * @return Next peer in the connection.
763  */
764 static struct CadetPeer *
765 get_hop (struct CadetConnection *c, int fwd)
766 {
767   if (fwd)
768     return get_next_hop (c);
769   return get_prev_hop (c);
770 }
771
772
773 /**
774  * Is traffic coming from this sender 'FWD' traffic?
775  *
776  * @param c Connection to check.
777  * @param sender Peer identity of neighbor.
778  *
779  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
780  *         the traffic is 'FWD'.
781  *         #GNUNET_NO for BCK.
782  *         #GNUNET_SYSERR for errors.
783  */
784 static int
785 is_fwd (const struct CadetConnection *c,
786         const struct GNUNET_PeerIdentity *sender)
787 {
788   GNUNET_PEER_Id id;
789
790   id = GNUNET_PEER_search (sender);
791   if (GCP_get_short_id (get_prev_hop (c)) == id)
792     return GNUNET_YES;
793
794   if (GCP_get_short_id (get_next_hop (c)) == id)
795     return GNUNET_NO;
796
797   GNUNET_break (0);
798   return GNUNET_SYSERR;
799 }
800
801
802 /**
803  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
804  * or a first CONNECTION_ACK directed to us.
805  *
806  * @param connection Connection to confirm.
807  * @param fwd Should we send it FWD? (root->dest)
808  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
809  */
810 static void
811 send_connection_ack (struct CadetConnection *connection, int fwd)
812 {
813   struct CadetTunnel *t;
814
815   t = connection->t;
816   LOG (GNUNET_ERROR_TYPE_INFO, "===> {%14s ACK} on connection %s\n",
817        GC_f2s (!fwd), GCC_2s (connection));
818   GCP_queue_add (get_hop (connection, fwd), NULL,
819                  GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, 0, 0,
820                  sizeof (struct GNUNET_CADET_ConnectionACK),
821                  connection, fwd, &conn_message_sent, NULL);
822   connection->pending_messages++;
823   if (CADET_TUNNEL_NEW == GCT_get_cstate (t))
824     GCT_change_cstate (t, CADET_TUNNEL_WAITING);
825   if (CADET_CONNECTION_READY != connection->state)
826     connection_change_state (connection, CADET_CONNECTION_SENT);
827 }
828
829
830 /**
831  * Send a notification that a connection is broken.
832  *
833  * @param c Connection that is broken.
834  * @param id1 Peer that has disconnected.
835  * @param id2 Peer that has disconnected.
836  * @param fwd Direction towards which to send it.
837  */
838 static void
839 send_broken (struct CadetConnection *c,
840              const struct GNUNET_PeerIdentity *id1,
841              const struct GNUNET_PeerIdentity *id2,
842              int fwd)
843 {
844   struct GNUNET_CADET_ConnectionBroken msg;
845
846   msg.header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
847   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
848   msg.cid = c->id;
849   msg.peer1 = *id1;
850   msg.peer2 = *id2;
851   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c, fwd,
852                                                     GNUNET_YES, NULL, NULL));
853 }
854
855
856 /**
857  * Send a notification that a connection is broken, when a connection
858  * isn't even known to the local peer.
859  *
860  * @param connection_id Connection ID.
861  * @param id1 Peer that has disconnected, probably local peer.
862  * @param id2 Peer that has disconnected can be NULL if unknown.
863  * @param peer Peer to notify (neighbor who sent the connection).
864  */
865 static void
866 send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id,
867                      const struct GNUNET_PeerIdentity *id1,
868                      const struct GNUNET_PeerIdentity *id2,
869                      const struct GNUNET_PeerIdentity *peer_id)
870 {
871   struct GNUNET_CADET_ConnectionBroken *msg;
872   struct CadetPeer *neighbor;
873
874   LOG (GNUNET_ERROR_TYPE_INFO, "===> BROKEN on unknown connection %s\n",
875        GNUNET_h2s (GC_h2hc (connection_id)));
876
877   msg = GNUNET_new (struct GNUNET_CADET_ConnectionBroken);
878   msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
879   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
880   msg->cid = *connection_id;
881   msg->peer1 = *id1;
882   if (NULL != id2)
883     msg->peer2 = *id2;
884   else
885     memset (&msg->peer2, 0, sizeof (msg->peer2));
886   neighbor = GCP_get (peer_id);
887   GCP_queue_add (neighbor, msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
888                  0, 2, sizeof (struct GNUNET_CADET_ConnectionBroken),
889                  NULL, GNUNET_SYSERR, /* connection, fwd */
890                  NULL, NULL); /* continuation */
891 }
892
893
894 /**
895  * Send keepalive packets for a connection.
896  *
897  * @param c Connection to keep alive..
898  * @param fwd Is this a FWD keepalive? (owner -> dest).
899  */
900 static void
901 send_connection_keepalive (struct CadetConnection *c, int fwd)
902 {
903   struct GNUNET_MessageHeader msg;
904   struct CadetFlowControl *fc;
905
906   LOG (GNUNET_ERROR_TYPE_INFO, "keepalive %s for connection %s\n",
907        GC_f2s (fwd), GCC_2s (c));
908
909   fc = fwd ? &c->fwd_fc : &c->bck_fc;
910   if (0 < fc->queue_n)
911   {
912     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, traffic in queue\n");
913   }
914
915   GNUNET_STATISTICS_update (stats, "# keepalives sent", 1, GNUNET_NO);
916
917   GNUNET_assert (NULL != c->t);
918   msg.size = htons (sizeof (msg));
919   msg.type = htons (GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE);
920
921   GNUNET_assert (NULL ==
922                  GCT_send_prebuilt_message (&msg, c->t, c,
923                                             GNUNET_NO, NULL, NULL));
924 }
925
926
927 /**
928  * Send CONNECTION_{CREATE/ACK} packets for a connection.
929  *
930  * @param c Connection for which to send the message.
931  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
932  */
933 static void
934 connection_recreate (struct CadetConnection *c, int fwd)
935 {
936   LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
937   if (fwd)
938     GCC_send_create (c);
939   else
940     send_connection_ack (c, GNUNET_NO);
941 }
942
943
944 /**
945  * Generic connection timer management.
946  * Depending on the role of the peer in the connection will send the
947  * appropriate message (build or keepalive)
948  *
949  * @param c Conncetion to maintain.
950  * @param fwd Is FWD?
951  */
952 static void
953 connection_maintain (struct CadetConnection *c, int fwd)
954 {
955   if (GNUNET_NO != c->destroy)
956   {
957     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, being destroyed\n");
958     return;
959   }
960
961   if (NULL == c->t)
962   {
963     GNUNET_break (0);
964     GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
965     return;
966   }
967
968   if (CADET_TUNNEL_SEARCHING == GCT_get_cstate (c->t))
969   {
970     /* If status is SEARCHING, why is there a connection? Should be WAITING */
971     GNUNET_break (0);
972     GCT_debug (c->t, GNUNET_ERROR_TYPE_ERROR);
973     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, tunnel SEARCHING\n");
974     schedule_next_keepalive (c, fwd);
975     return;
976   }
977   switch (c->state)
978   {
979     case CADET_CONNECTION_NEW:
980       GNUNET_break (0);
981       /* fall-through */
982     case CADET_CONNECTION_SENT:
983       connection_recreate (c, fwd);
984       break;
985     case CADET_CONNECTION_READY:
986       send_connection_keepalive (c, fwd);
987       break;
988     default:
989       break;
990   }
991 }
992
993
994
995 /**
996  * Keep the connection alive.
997  *
998  * @param c Connection to keep alive.
999  * @param fwd Direction.
1000  * @param shutdown Are we shutting down? (Don't send traffic)
1001  *                 Non-zero value for true, not necessarily GNUNET_YES.
1002  */
1003 static void
1004 connection_keepalive (struct CadetConnection *c, int fwd, int shutdown)
1005 {
1006   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s keepalive for %s\n",
1007        GC_f2s (fwd), GCC_2s (c));
1008
1009   if (fwd)
1010     c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1011   else
1012     c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1013
1014   if (GNUNET_NO != shutdown)
1015     return;
1016
1017   connection_maintain (c, fwd);
1018
1019   /* Next execution will be scheduled by message_sent or _maintain*/
1020 }
1021
1022
1023 /**
1024  * Keep the connection alive in the FWD direction.
1025  *
1026  * @param cls Closure (connection to keepalive).
1027  * @param tc TaskContext.
1028  */
1029 static void
1030 connection_fwd_keepalive (void *cls,
1031                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1032 {
1033   connection_keepalive ((struct CadetConnection *) cls,
1034                         GNUNET_YES,
1035                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1036 }
1037
1038
1039 /**
1040  * Keep the connection alive in the BCK direction.
1041  *
1042  * @param cls Closure (connection to keepalive).
1043  * @param tc TaskContext.
1044  */
1045 static void
1046 connection_bck_keepalive (void *cls,
1047                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1048 {
1049   connection_keepalive ((struct CadetConnection *) cls,
1050                         GNUNET_NO,
1051                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1052 }
1053
1054
1055 /**
1056  * Schedule next keepalive task, taking in consideration
1057  * the connection state and number of retries.
1058  *
1059  * If the peer is not the origin, do nothing.
1060  *
1061  * @param c Connection for which to schedule the next keepalive.
1062  * @param fwd Direction for the next keepalive.
1063  */
1064 static void
1065 schedule_next_keepalive (struct CadetConnection *c, int fwd)
1066 {
1067   struct GNUNET_TIME_Relative delay;
1068   GNUNET_SCHEDULER_TaskIdentifier *task_id;
1069   GNUNET_SCHEDULER_Task keepalive_task;
1070
1071   if (GNUNET_NO == GCC_is_origin (c, fwd))
1072     return;
1073
1074   /* Calculate delay to use, depending on the state of the connection */
1075   if (CADET_CONNECTION_READY == c->state)
1076   {
1077     delay = refresh_connection_time;
1078   }
1079   else
1080   {
1081     if (1 > c->create_retry)
1082       c->create_retry = 1;
1083     delay = GNUNET_TIME_relative_multiply (create_connection_time,
1084                                            c->create_retry);
1085     if (c->create_retry < 64)
1086       c->create_retry *= 2;
1087   }
1088
1089   /* Select direction-dependent parameters */
1090   if (GNUNET_YES == fwd)
1091   {
1092     task_id = &c->fwd_maintenance_task;
1093     keepalive_task = &connection_fwd_keepalive;
1094   }
1095   else
1096   {
1097     task_id = &c->bck_maintenance_task;
1098     keepalive_task = &connection_bck_keepalive;
1099   }
1100
1101   /* Check that no one scheduled it before us */
1102   if (GNUNET_SCHEDULER_NO_TASK != *task_id)
1103   {
1104     /* No need for a _break. It can happen for instance when sending a SYNACK
1105      * for a duplicate SYN: the first SYNACK scheduled the task. */
1106     GNUNET_SCHEDULER_cancel (*task_id);
1107   }
1108
1109   /* Schedule the task */
1110   *task_id = GNUNET_SCHEDULER_add_delayed (delay, keepalive_task, c);
1111   LOG (GNUNET_ERROR_TYPE_DEBUG, "next keepalive in %s\n",
1112        GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1113 }
1114
1115
1116 /**
1117  * @brief Re-initiate traffic on this connection if necessary.
1118  *
1119  * Check if there is traffic queued towards this peer
1120  * and the core transmit handle is NULL (traffic was stalled).
1121  * If so, call core tmt rdy.
1122  *
1123  * @param c Connection on which initiate traffic.
1124  * @param fwd Is this about fwd traffic?
1125  */
1126 static void
1127 connection_unlock_queue (struct CadetConnection *c, int fwd)
1128 {
1129   struct CadetPeer *peer;
1130
1131   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_unlock_queue %s on %s\n",
1132        GC_f2s (fwd), GCC_2s (c));
1133
1134   if (GCC_is_terminal (c, fwd))
1135   {
1136     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal, can unlock!\n");
1137     return;
1138   }
1139
1140   peer = get_hop (c, fwd);
1141   GCP_queue_unlock (peer, c);
1142 }
1143
1144
1145 /**
1146  * Cancel all transmissions that belong to a certain connection.
1147  *
1148  * If the connection is scheduled for destruction and no more messages are left,
1149  * the connection will be destroyed by the continuation call.
1150  *
1151  * @param c Connection which to cancel. Might be destroyed during this call.
1152  * @param fwd Cancel fwd traffic?
1153  */
1154 static void
1155 connection_cancel_queues (struct CadetConnection *c, int fwd)
1156 {
1157   struct CadetFlowControl *fc;
1158   struct CadetPeer *peer;
1159
1160   LOG (GNUNET_ERROR_TYPE_DEBUG,
1161        " *** Cancel %s queues for connection %s\n",
1162        GC_f2s (fwd), GCC_2s (c));
1163   if (NULL == c)
1164   {
1165     GNUNET_break (0);
1166     return;
1167   }
1168
1169   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1170   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
1171   {
1172     GNUNET_SCHEDULER_cancel (fc->poll_task);
1173     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1174     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Cancel POLL in ccq for fc %p\n", fc);
1175   }
1176   peer = get_hop (c, fwd);
1177   GCP_queue_cancel (peer, c);
1178 }
1179
1180
1181 /**
1182  * Function called if a connection has been stalled for a while,
1183  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1184  *
1185  * @param cls Closure (poll ctx).
1186  * @param tc TaskContext.
1187  */
1188 static void
1189 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1190
1191
1192 /**
1193  * Callback called when a queued POLL message is sent.
1194  *
1195  * @param cls Closure (FC).
1196  * @param c Connection this message was on.
1197  * @param q Queue handler this call invalidates.
1198  * @param type Type of message sent.
1199  * @param fwd Was this a FWD going message?
1200  * @param size Size of the message.
1201  */
1202 static void
1203 poll_sent (void *cls,
1204            struct CadetConnection *c,
1205            struct CadetConnectionQueue *q,
1206            uint16_t type, int fwd, size_t size)
1207 {
1208   struct CadetFlowControl *fc = cls;
1209
1210   if (2 == c->destroy)
1211   {
1212     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL canceled on shutdown\n");
1213     return;
1214   }
1215   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL sent for , scheduling new one!\n");
1216   fc->poll_msg = NULL;
1217   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
1218   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1219                                                 &connection_poll, fc);
1220   LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
1221
1222 }
1223
1224 /**
1225  * Function called if a connection has been stalled for a while,
1226  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1227  *
1228  * @param cls Closure (poll ctx).
1229  * @param tc TaskContext.
1230  */
1231 static void
1232 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1233 {
1234   struct CadetFlowControl *fc = cls;
1235   struct GNUNET_CADET_Poll msg;
1236   struct CadetConnection *c;
1237
1238   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1239   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1240   {
1241     return;
1242   }
1243
1244   c = fc->c;
1245   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling connection %s %s\n",
1246        GCC_2s (c), fc == &c->fwd_fc ? "FWD" : "BCK");
1247
1248   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_POLL);
1249   msg.header.size = htons (sizeof (msg));
1250   msg.pid = htonl (fc->last_pid_sent);
1251   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent);
1252   fc->poll_msg =
1253       GCC_send_prebuilt_message (&msg.header, 0, fc->last_pid_sent, c,
1254                                  fc == &c->fwd_fc, GNUNET_YES, &poll_sent, fc);
1255   GNUNET_assert (NULL != fc->poll_msg);
1256 }
1257
1258
1259 /**
1260  * Timeout function due to lack of keepalive/traffic from the owner.
1261  * Destroys connection if called.
1262  *
1263  * @param cls Closure (connection to destroy).
1264  * @param tc TaskContext.
1265  */
1266 static void
1267 connection_fwd_timeout (void *cls,
1268                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1269 {
1270   struct CadetConnection *c = cls;
1271
1272   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1273   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1274     return;
1275
1276   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s FWD timed out. Destroying.\n",
1277        GCC_2s (c));
1278   if (GCC_is_origin (c, GNUNET_YES)) /* If local, leave. */
1279   {
1280     GNUNET_break (0);
1281     return;
1282   }
1283
1284   GCC_destroy (c);
1285 }
1286
1287
1288 /**
1289  * Timeout function due to lack of keepalive/traffic from the destination.
1290  * Destroys connection if called.
1291  *
1292  * @param cls Closure (connection to destroy).
1293  * @param tc TaskContext
1294  */
1295 static void
1296 connection_bck_timeout (void *cls,
1297                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1298 {
1299   struct CadetConnection *c = cls;
1300
1301   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1302   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1303     return;
1304
1305   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s BCK timed out. Destroying.\n",
1306        GCC_2s (c));
1307
1308   if (GCC_is_origin (c, GNUNET_NO)) /* If local, leave. */
1309   {
1310     GNUNET_break (0);
1311     return;
1312   }
1313
1314   GCC_destroy (c);
1315 }
1316
1317
1318 /**
1319  * Resets the connection timeout task, some other message has done the
1320  * task's job.
1321  * - For the first peer on the direction this means to send
1322  *   a keepalive or a path confirmation message (either create or ACK).
1323  * - For all other peers, this means to destroy the connection,
1324  *   due to lack of activity.
1325  * Starts the timeout if no timeout was running (connection just created).
1326  *
1327  * @param c Connection whose timeout to reset.
1328  * @param fwd Is this forward?
1329  *
1330  * TODO use heap to improve efficiency of scheduler.
1331  */
1332 static void
1333 connection_reset_timeout (struct CadetConnection *c, int fwd)
1334 {
1335   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GC_f2s (fwd));
1336
1337   if (GCC_is_origin (c, fwd)) /* Startpoint */
1338   {
1339     schedule_next_keepalive (c, fwd);
1340   }
1341   else /* Relay, endpoint. */
1342   {
1343     struct GNUNET_TIME_Relative delay;
1344     GNUNET_SCHEDULER_TaskIdentifier *ti;
1345     GNUNET_SCHEDULER_Task f;
1346
1347     ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1348
1349     if (GNUNET_SCHEDULER_NO_TASK != *ti)
1350       GNUNET_SCHEDULER_cancel (*ti);
1351     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1352     f = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1353     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1354   }
1355 }
1356
1357
1358 /**
1359  * Add the connection to the list of both neighbors.
1360  *
1361  * @param c Connection.
1362  *
1363  * @return #GNUNET_OK if everything went fine
1364  *         #GNUNET_SYSERR if the was an error and @c c is malformed.
1365  */
1366 static int
1367 register_neighbors (struct CadetConnection *c)
1368 {
1369   struct CadetPeer *next_peer;
1370   struct CadetPeer *prev_peer;
1371
1372   next_peer = get_next_hop (c);
1373   prev_peer = get_prev_hop (c);
1374
1375   LOG (GNUNET_ERROR_TYPE_DEBUG, "register neighbors for connection %s\n",
1376        GCC_2s (c));
1377   path_debug (c->path);
1378   LOG (GNUNET_ERROR_TYPE_DEBUG, "own pos %u\n", c->own_pos);
1379   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to next peer %p\n",
1380        GCC_2s (c), next_peer);
1381   LOG (GNUNET_ERROR_TYPE_DEBUG, "next peer %p %s\n", next_peer, GCP_2s (next_peer));
1382   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to prev peer %p\n",
1383        GCC_2s (c), prev_peer);
1384   LOG (GNUNET_ERROR_TYPE_DEBUG, "prev peer %p %s\n", prev_peer, GCP_2s (prev_peer));
1385
1386   if (GNUNET_NO == GCP_is_neighbor (next_peer)
1387       || GNUNET_NO == GCP_is_neighbor (prev_peer))
1388   {
1389     if (GCC_is_origin (c, GNUNET_YES))
1390       GNUNET_STATISTICS_update (stats, "# local bad paths", 1, GNUNET_NO);
1391     GNUNET_STATISTICS_update (stats, "# bad paths", 1, GNUNET_NO);
1392
1393     LOG (GNUNET_ERROR_TYPE_DEBUG, "  register neighbors failed\n");
1394     LOG (GNUNET_ERROR_TYPE_DEBUG, "  prev: %s, neighbor?: %d\n",
1395          GCP_2s (prev_peer), GCP_is_neighbor (prev_peer));
1396     LOG (GNUNET_ERROR_TYPE_DEBUG, "  next: %s, neighbor?: %d\n",
1397          GCP_2s (next_peer), GCP_is_neighbor (next_peer));
1398     return GNUNET_SYSERR;
1399   }
1400
1401   GCP_add_connection (next_peer, c);
1402   GCP_add_connection (prev_peer, c);
1403
1404   return GNUNET_OK;
1405 }
1406
1407
1408 /**
1409  * Remove the connection from the list of both neighbors.
1410  *
1411  * @param c Connection.
1412  */
1413 static void
1414 unregister_neighbors (struct CadetConnection *c)
1415 {
1416   struct CadetPeer *peer;
1417
1418   peer = get_next_hop (c);
1419   if (GNUNET_OK != GCP_remove_connection (peer, c))
1420   {
1421     GNUNET_assert (CADET_CONNECTION_NEW == c->state
1422                    || CADET_CONNECTION_DESTROYED <= c->state);
1423     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1424     if (NULL != c->t) GCT_debug (c->t, GNUNET_ERROR_TYPE_DEBUG);
1425   }
1426
1427   peer = get_prev_hop (c);
1428   if (GNUNET_OK != GCP_remove_connection (peer, c))
1429   {
1430     GNUNET_assert (CADET_CONNECTION_NEW == c->state
1431                    || CADET_CONNECTION_DESTROYED <= c->state);
1432     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1433     if (NULL != c->t) GCT_debug (c->t, GNUNET_ERROR_TYPE_DEBUG);
1434   }
1435 }
1436
1437
1438 /**
1439  * Bind the connection to the peer and the tunnel to that peer.
1440  *
1441  * If the peer has no tunnel, create one. Update tunnel and connection
1442  * data structres to reflect new status.
1443  *
1444  * @param c Connection.
1445  * @param peer Peer.
1446  */
1447 static void
1448 add_to_peer (struct CadetConnection *c, struct CadetPeer *peer)
1449 {
1450   GCP_add_tunnel (peer);
1451   c->t = GCP_get_tunnel (peer);
1452   GCT_add_connection (c->t, c);
1453 }
1454
1455
1456 /**
1457  * Builds a path from a PeerIdentity array.
1458  *
1459  * @param peers PeerIdentity array.
1460  * @param size Size of the @c peers array.
1461  * @param own_pos Output parameter: own position in the path.
1462  *
1463  * @return Fixed and shortened path.
1464  */
1465 static struct CadetPeerPath *
1466 build_path_from_peer_ids (struct GNUNET_PeerIdentity *peers,
1467                           unsigned int size,
1468                           unsigned int *own_pos)
1469 {
1470   struct CadetPeerPath *path;
1471   GNUNET_PEER_Id shortid;
1472   unsigned int i;
1473   unsigned int j;
1474   unsigned int offset;
1475
1476   /* Create path */
1477   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating path...\n");
1478   path = path_new (size);
1479   *own_pos = 0;
1480   offset = 0;
1481   for (i = 0; i < size; i++)
1482   {
1483     LOG (GNUNET_ERROR_TYPE_DEBUG, "  - %u: taking %s\n",
1484          i, GNUNET_i2s (&peers[i]));
1485     shortid = GNUNET_PEER_intern (&peers[i]);
1486
1487     /* Check for loops / duplicates */
1488     for (j = 0; j < i - offset; j++)
1489     {
1490       if (path->peers[j] == shortid)
1491       {
1492         LOG (GNUNET_ERROR_TYPE_DEBUG, "    already exists at pos %u\n", j);
1493         offset = i - j;
1494         LOG (GNUNET_ERROR_TYPE_DEBUG, "    offset now %u\n", offset);
1495         GNUNET_PEER_change_rc (shortid, -1);
1496       }
1497     }
1498     LOG (GNUNET_ERROR_TYPE_DEBUG, "    storing at %u\n", i - offset);
1499     path->peers[i - offset] = shortid;
1500     if (path->peers[i - offset] == myid)
1501       *own_pos = i - offset;
1502   }
1503   path->length -= offset;
1504
1505   if (path->peers[*own_pos] != myid)
1506   {
1507     /* create path: self not found in path through self */
1508     GNUNET_break_op (0);
1509     path_destroy (path);
1510     return NULL;
1511   }
1512
1513   return path;
1514 }
1515
1516
1517 /**
1518  * Log receipt of message on stderr (INFO level).
1519  *
1520  * @param message Message received.
1521  * @param peer Peer who sent the message.
1522  * @param hash Connection ID.
1523  */
1524 static void
1525 log_message (const struct GNUNET_MessageHeader *message,
1526              const struct GNUNET_PeerIdentity *peer,
1527              const struct GNUNET_CADET_Hash *hash)
1528 {
1529   LOG (GNUNET_ERROR_TYPE_INFO, "<-- %s on connection %s from %s\n",
1530        GC_m2s (ntohs (message->type)), GNUNET_h2s (GC_h2hc (hash)),
1531        GNUNET_i2s (peer));
1532 }
1533
1534 /******************************************************************************/
1535 /********************************    API    ***********************************/
1536 /******************************************************************************/
1537
1538 /**
1539  * Core handler for connection creation.
1540  *
1541  * @param cls Closure (unused).
1542  * @param peer Sender (neighbor).
1543  * @param message Message.
1544  *
1545  * @return GNUNET_OK to keep the connection open,
1546  *         GNUNET_SYSERR to close it (signal serious error)
1547  */
1548 int
1549 GCC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
1550                    const struct GNUNET_MessageHeader *message)
1551 {
1552   struct GNUNET_CADET_ConnectionCreate *msg;
1553   struct GNUNET_PeerIdentity *id;
1554   struct GNUNET_CADET_Hash *cid;
1555   struct CadetPeerPath *path;
1556   struct CadetPeer *dest_peer;
1557   struct CadetPeer *orig_peer;
1558   struct CadetConnection *c;
1559   unsigned int own_pos;
1560   uint16_t size;
1561
1562   /* Check size */
1563   size = ntohs (message->size);
1564   if (size < sizeof (struct GNUNET_CADET_ConnectionCreate))
1565   {
1566     GNUNET_break_op (0);
1567     return GNUNET_OK;
1568   }
1569
1570   /* Calculate hops */
1571   size -= sizeof (struct GNUNET_CADET_ConnectionCreate);
1572   if (size % sizeof (struct GNUNET_PeerIdentity))
1573   {
1574     GNUNET_break_op (0);
1575     return GNUNET_OK;
1576   }
1577   size /= sizeof (struct GNUNET_PeerIdentity);
1578   if (1 > size)
1579   {
1580     GNUNET_break_op (0);
1581     return GNUNET_OK;
1582   }
1583   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1584
1585   /* Get parameters */
1586   msg = (struct GNUNET_CADET_ConnectionCreate *) message;
1587   cid = &msg->cid;
1588   log_message (message, peer, cid);
1589   id = (struct GNUNET_PeerIdentity *) &msg[1];
1590   LOG (GNUNET_ERROR_TYPE_DEBUG, "    origin: %s\n", GNUNET_i2s (id));
1591
1592   /* Create connection */
1593   c = connection_get (cid);
1594   if (NULL == c)
1595   {
1596     path = build_path_from_peer_ids ((struct GNUNET_PeerIdentity *) &msg[1],
1597                                      size, &own_pos);
1598     if (NULL == path)
1599       return GNUNET_OK;
1600     if (0 == own_pos)
1601     {
1602       GNUNET_break_op (0);
1603       path_destroy (path);
1604       return GNUNET_OK;
1605     }
1606     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1607     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1608     c = GCC_new (cid, NULL, path_duplicate (path), own_pos);
1609     if (NULL == c)
1610     {
1611       if (path->length - 1 == own_pos)
1612       {
1613         /* If we are destination, why did the creation fail? */
1614         GNUNET_break (0);
1615         return GNUNET_OK;
1616       }
1617       send_broken_unknown (cid, &my_full_id,
1618                            GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
1619                            peer);
1620       path_destroy (path);
1621       return GNUNET_OK;
1622     }
1623     GCP_add_path_to_all (path, GNUNET_NO);
1624     connection_reset_timeout (c, GNUNET_YES);
1625   }
1626   else
1627   {
1628     path = path_duplicate (c->path);
1629   }
1630   if (CADET_CONNECTION_NEW == c->state)
1631     connection_change_state (c, CADET_CONNECTION_SENT);
1632
1633   /* Remember peers */
1634   dest_peer = GCP_get (&id[size - 1]);
1635   orig_peer = GCP_get (&id[0]);
1636
1637   /* Is it a connection to us? */
1638   if (c->own_pos == path->length - 1)
1639   {
1640     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
1641     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
1642
1643     add_to_peer (c, orig_peer);
1644     if (CADET_TUNNEL_NEW == GCT_get_cstate (c->t))
1645       GCT_change_cstate (c->t,  CADET_TUNNEL_WAITING);
1646
1647     send_connection_ack (c, GNUNET_NO);
1648     if (CADET_CONNECTION_SENT == c->state)
1649       connection_change_state (c, CADET_CONNECTION_ACK);
1650   }
1651   else
1652   {
1653     /* It's for somebody else! Retransmit. */
1654     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
1655     GCP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
1656     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
1657     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c,
1658                                                       GNUNET_YES, GNUNET_YES,
1659                                                       NULL, NULL));
1660   }
1661   path_destroy (path);
1662   return GNUNET_OK;
1663 }
1664
1665
1666 /**
1667  * Core handler for path confirmations.
1668  *
1669  * @param cls closure
1670  * @param message message
1671  * @param peer peer identity this notification is about
1672  *
1673  * @return GNUNET_OK to keep the connection open,
1674  *         GNUNET_SYSERR to close it (signal serious error)
1675  */
1676 int
1677 GCC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
1678                     const struct GNUNET_MessageHeader *message)
1679 {
1680   struct GNUNET_CADET_ConnectionACK *msg;
1681   struct CadetConnection *c;
1682   struct CadetPeerPath *p;
1683   struct CadetPeer *pi;
1684   enum CadetConnectionState oldstate;
1685   int fwd;
1686
1687   msg = (struct GNUNET_CADET_ConnectionACK *) message;
1688   log_message (message, peer, &msg->cid);
1689   c = connection_get (&msg->cid);
1690   if (NULL == c)
1691   {
1692     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1693                               1, GNUNET_NO);
1694     LOG (GNUNET_ERROR_TYPE_DEBUG, "  don't know the connection!\n");
1695     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
1696     return GNUNET_OK;
1697   }
1698
1699   if (GNUNET_NO != c->destroy)
1700   {
1701     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection being destroyed\n");
1702     return GNUNET_OK;
1703   }
1704
1705   oldstate = c->state;
1706   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
1707   pi = GCP_get (peer);
1708   if (get_next_hop (c) == pi)
1709   {
1710     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
1711     fwd = GNUNET_NO;
1712     if (CADET_CONNECTION_SENT == oldstate)
1713       connection_change_state (c, CADET_CONNECTION_ACK);
1714   }
1715   else if (get_prev_hop (c) == pi)
1716   {
1717     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FINAL ACK\n");
1718     fwd = GNUNET_YES;
1719     connection_change_state (c, CADET_CONNECTION_READY);
1720   }
1721   else
1722   {
1723     GNUNET_break_op (0);
1724     return GNUNET_OK;
1725   }
1726
1727   connection_reset_timeout (c, fwd);
1728
1729   /* Add path to peers? */
1730   p = c->path;
1731   if (NULL != p)
1732   {
1733     GCP_add_path_to_all (p, GNUNET_YES);
1734   }
1735   else
1736   {
1737     GNUNET_break (0);
1738   }
1739
1740   /* Message for us as creator? */
1741   if (GCC_is_origin (c, GNUNET_YES))
1742   {
1743     if (GNUNET_NO != fwd)
1744     {
1745       GNUNET_break_op (0);
1746       return GNUNET_OK;
1747     }
1748     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
1749
1750     /* If just created, cancel the short timeout and start a long one */
1751     if (CADET_CONNECTION_SENT == oldstate)
1752       connection_reset_timeout (c, GNUNET_YES);
1753
1754     /* Change connection state */
1755     connection_change_state (c, CADET_CONNECTION_READY);
1756     send_connection_ack (c, GNUNET_YES);
1757
1758     /* Change tunnel state, trigger KX */
1759     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
1760       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
1761
1762     return GNUNET_OK;
1763   }
1764
1765   /* Message for us as destination? */
1766   if (GCC_is_terminal (c, GNUNET_YES))
1767   {
1768     if (GNUNET_YES != fwd)
1769     {
1770       GNUNET_break_op (0);
1771       return GNUNET_OK;
1772     }
1773     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
1774
1775     /* If just created, cancel the short timeout and start a long one */
1776     if (CADET_CONNECTION_ACK == oldstate)
1777       connection_reset_timeout (c, GNUNET_NO);
1778
1779     /* Change tunnel state */
1780     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
1781       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
1782
1783     return GNUNET_OK;
1784   }
1785
1786   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1787   GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
1788                                                     GNUNET_YES, NULL, NULL));
1789   return GNUNET_OK;
1790 }
1791
1792
1793 /**
1794  * Core handler for notifications of broken connections.
1795  *
1796  * @param cls Closure (unused).
1797  * @param id Peer identity of sending neighbor.
1798  * @param message Message.
1799  *
1800  * @return GNUNET_OK to keep the connection open,
1801  *         GNUNET_SYSERR to close it (signal serious error)
1802  */
1803 int
1804 GCC_handle_broken (void* cls,
1805                    const struct GNUNET_PeerIdentity* id,
1806                    const struct GNUNET_MessageHeader* message)
1807 {
1808   struct GNUNET_CADET_ConnectionBroken *msg;
1809   struct CadetConnection *c;
1810   struct CadetTunnel *t;
1811   int destroyed;
1812   int fwd;
1813
1814   msg = (struct GNUNET_CADET_ConnectionBroken *) message;
1815   log_message (message, id, &msg->cid);
1816   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1817               GNUNET_i2s (&msg->peer1));
1818   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1819               GNUNET_i2s (&msg->peer2));
1820   c = connection_get (&msg->cid);
1821   if (NULL == c)
1822   {
1823     LOG (GNUNET_ERROR_TYPE_DEBUG, "  duplicate CONNECTION_BROKEN\n");
1824     return GNUNET_OK;
1825   }
1826
1827   t = c->t;
1828   fwd = is_fwd (c, id);
1829   if (GCC_is_terminal (c, fwd))
1830   {
1831     struct GNUNET_MessageHeader *out_msg;
1832     struct CadetPeer *neighbor;
1833     struct CadetPeer *endpoint;
1834
1835     if (NULL == t)
1836     {
1837       /* A terminal connection should not have 't' set to NULL. */
1838       GNUNET_break (0);
1839       GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
1840       return GNUNET_OK;
1841     }
1842     neighbor = get_hop (c, !fwd);
1843     endpoint = GCP_get_short (c->path->peers[c->path->length - 1]);
1844     path_invalidate (c->path);
1845     GCP_notify_broken_link (endpoint, &msg->peer1, &msg->peer2);
1846     c->state = CADET_CONNECTION_BROKEN;
1847     GCT_remove_connection (t, c);
1848     c->t = NULL;
1849     destroyed = GNUNET_NO;
1850
1851     /* GCP_connection_pop could destroy the connection! */
1852     while (NULL != (out_msg = GCP_connection_pop (neighbor, c, &destroyed)))
1853     {
1854       GCT_resend_message (out_msg, t);
1855     }
1856     /* All pending messages should have been popped,
1857      * and the connection destroyed by the continuation.
1858      */
1859     if (GNUNET_YES != destroyed)
1860     {
1861       GNUNET_break (0);
1862       GCC_destroy (c);
1863     }
1864   }
1865   else
1866   {
1867     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
1868                                                       GNUNET_YES, NULL, NULL));
1869     c->destroy = GNUNET_YES;
1870     connection_cancel_queues (c, !fwd);
1871   }
1872
1873   return GNUNET_OK;
1874 }
1875
1876
1877 /**
1878  * Core handler for tunnel destruction
1879  *
1880  * @param cls Closure (unused).
1881  * @param peer Peer identity of sending neighbor.
1882  * @param message Message.
1883  *
1884  * @return GNUNET_OK to keep the connection open,
1885  *         GNUNET_SYSERR to close it (signal serious error)
1886  */
1887 int
1888 GCC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
1889                     const struct GNUNET_MessageHeader *message)
1890 {
1891   struct GNUNET_CADET_ConnectionDestroy *msg;
1892   struct CadetConnection *c;
1893   int fwd;
1894
1895   msg = (struct GNUNET_CADET_ConnectionDestroy *) message;
1896   log_message (message, peer, &msg->cid);
1897   c = connection_get (&msg->cid);
1898   if (NULL == c)
1899   {
1900     /* Probably already got the message from another path,
1901      * destroyed the tunnel and retransmitted to children.
1902      * Safe to ignore.
1903      */
1904     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1905                               1, GNUNET_NO);
1906     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection unknown: already destroyed?\n");
1907     return GNUNET_OK;
1908   }
1909   fwd = is_fwd (c, peer);
1910   if (GNUNET_SYSERR == fwd)
1911   {
1912     GNUNET_break_op (0); /* FIXME */
1913     return GNUNET_OK;
1914   }
1915   if (GNUNET_NO == GCC_is_terminal (c, fwd))
1916     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
1917                                                       GNUNET_YES, NULL, NULL));
1918   else if (0 == c->pending_messages)
1919   {
1920     LOG (GNUNET_ERROR_TYPE_DEBUG, "  directly destroying connection!\n");
1921     GCC_destroy (c);
1922     return GNUNET_OK;
1923   }
1924   c->destroy = GNUNET_YES;
1925   c->state = CADET_CONNECTION_DESTROYED;
1926   if (NULL != c->t)
1927   {
1928     GCT_remove_connection (c->t, c);
1929     c->t = NULL;
1930   }
1931
1932   return GNUNET_OK;
1933 }
1934
1935 /**
1936  * Generic handler for cadet network encrypted traffic.
1937  *
1938  * @param peer Peer identity this notification is about.
1939  * @param msg Encrypted message.
1940  *
1941  * @return GNUNET_OK to keep the connection open,
1942  *         GNUNET_SYSERR to close it (signal serious error)
1943  */
1944 static int
1945 handle_cadet_encrypted (const struct GNUNET_PeerIdentity *peer,
1946                        const struct GNUNET_CADET_Encrypted *msg)
1947 {
1948   struct CadetConnection *c;
1949   struct CadetPeer *neighbor;
1950   struct CadetFlowControl *fc;
1951   GNUNET_PEER_Id peer_id;
1952   uint32_t pid;
1953   uint32_t ttl;
1954   size_t size;
1955   int fwd;
1956
1957   log_message (&msg->header, peer, &msg->cid);
1958
1959   /* Check size */
1960   size = ntohs (msg->header.size);
1961   if (size <
1962       sizeof (struct GNUNET_CADET_Encrypted) +
1963       sizeof (struct GNUNET_MessageHeader))
1964   {
1965     GNUNET_break_op (0);
1966     return GNUNET_OK;
1967   }
1968
1969   /* Check connection */
1970   c = connection_get (&msg->cid);
1971   if (NULL == c)
1972   {
1973     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1974     LOG (GNUNET_ERROR_TYPE_DEBUG, "enc on unknown connection %s\n",
1975          GNUNET_h2s (GC_h2hc (&msg->cid)));
1976     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
1977     return GNUNET_OK;
1978   }
1979
1980   LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection %s\n", GCC_2s (c));
1981
1982   /* Check if origin is as expected */
1983   neighbor = get_prev_hop (c);
1984   peer_id = GNUNET_PEER_search (peer);
1985   if (peer_id == GCP_get_short_id (neighbor))
1986   {
1987     fwd = GNUNET_YES;
1988   }
1989   else
1990   {
1991     neighbor = get_next_hop (c);
1992     if (peer_id == GCP_get_short_id (neighbor))
1993     {
1994       fwd = GNUNET_NO;
1995     }
1996     else
1997     {
1998       /* Unexpected peer sending traffic on a connection. */
1999       GNUNET_break_op (0);
2000       return GNUNET_OK;
2001     }
2002   }
2003
2004   /* Check PID */
2005   fc = fwd ? &c->bck_fc : &c->fwd_fc;
2006   pid = ntohl (msg->pid);
2007   LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u (expected %u+)\n",
2008        pid, fc->last_pid_recv + 1);
2009   if (GC_is_pid_bigger (pid, fc->last_ack_sent))
2010   {
2011     GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
2012     GNUNET_break_op (0);
2013     LOG (GNUNET_ERROR_TYPE_WARNING,
2014          "Received PID %u, (prev %u), ACK %u\n",
2015          pid, fc->last_pid_recv, fc->last_ack_sent);
2016     return GNUNET_OK;
2017   }
2018   if (GNUNET_NO == GC_is_pid_bigger (pid, fc->last_pid_recv))
2019   {
2020     GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
2021     LOG (GNUNET_ERROR_TYPE_DEBUG,
2022                 " PID %u not expected (%u+), dropping!\n",
2023                 pid, fc->last_pid_recv + 1);
2024     return GNUNET_OK;
2025   }
2026   if (CADET_CONNECTION_SENT == c->state || CADET_CONNECTION_ACK == c->state)
2027     connection_change_state (c, CADET_CONNECTION_READY);
2028   connection_reset_timeout (c, fwd);
2029   fc->last_pid_recv = pid;
2030
2031   /* Is this message for us? */
2032   if (GCC_is_terminal (c, fwd))
2033   {
2034     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2035     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
2036
2037     if (NULL == c->t)
2038     {
2039       GNUNET_break (GNUNET_NO != c->destroy);
2040       return GNUNET_OK;
2041     }
2042     fc->last_pid_recv = pid;
2043     GCT_handle_encrypted (c->t, msg);
2044     GCC_send_ack (c, fwd, GNUNET_NO);
2045     return GNUNET_OK;
2046   }
2047
2048   /* Message not for us: forward to next hop */
2049   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2050   ttl = ntohl (msg->ttl);
2051   LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
2052   if (ttl == 0)
2053   {
2054     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
2055     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
2056     GCC_send_ack (c, fwd, GNUNET_NO);
2057     return GNUNET_OK;
2058   }
2059
2060   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2061   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
2062                                                     GNUNET_NO, NULL, NULL));
2063
2064   return GNUNET_OK;
2065 }
2066
2067 /**
2068  * Generic handler for cadet network encrypted traffic.
2069  *
2070  * @param peer Peer identity this notification is about.
2071  * @param msg Encrypted message.
2072  *
2073  * @return GNUNET_OK to keep the connection open,
2074  *         GNUNET_SYSERR to close it (signal serious error)
2075  */
2076 static int
2077 handle_cadet_kx (const struct GNUNET_PeerIdentity *peer,
2078                 const struct GNUNET_CADET_KX *msg)
2079 {
2080   struct CadetConnection *c;
2081   struct CadetPeer *neighbor;
2082   GNUNET_PEER_Id peer_id;
2083   size_t size;
2084   int fwd;
2085
2086   log_message (&msg->header, peer, &msg->cid);
2087
2088   /* Check size */
2089   size = ntohs (msg->header.size);
2090   if (size <
2091       sizeof (struct GNUNET_CADET_KX) +
2092       sizeof (struct GNUNET_MessageHeader))
2093   {
2094     GNUNET_break_op (0);
2095     return GNUNET_OK;
2096   }
2097
2098   /* Check connection */
2099   c = connection_get (&msg->cid);
2100   if (NULL == c)
2101   {
2102     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
2103     LOG (GNUNET_ERROR_TYPE_DEBUG, "kx on unknown connection %s\n",
2104          GNUNET_h2s (GC_h2hc (&msg->cid)));
2105     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2106     return GNUNET_OK;
2107   }
2108   LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s\n", GCC_2s (c));
2109
2110   /* Check if origin is as expected */
2111   neighbor = get_prev_hop (c);
2112   peer_id = GNUNET_PEER_search (peer);
2113   if (peer_id == GCP_get_short_id (neighbor))
2114   {
2115     fwd = GNUNET_YES;
2116   }
2117   else
2118   {
2119     neighbor = get_next_hop (c);
2120     if (peer_id == GCP_get_short_id (neighbor))
2121     {
2122       fwd = GNUNET_NO;
2123     }
2124     else
2125     {
2126       /* Unexpected peer sending traffic on a connection. */
2127       GNUNET_break_op (0);
2128       return GNUNET_OK;
2129     }
2130   }
2131
2132   /* Count as connection confirmation. */
2133   if (CADET_CONNECTION_SENT == c->state || CADET_CONNECTION_ACK == c->state)
2134   {
2135     connection_change_state (c, CADET_CONNECTION_READY);
2136     if (NULL != c->t)
2137     {
2138       if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2139         GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2140     }
2141   }
2142   connection_reset_timeout (c, fwd);
2143
2144   /* Is this message for us? */
2145   if (GCC_is_terminal (c, fwd))
2146   {
2147     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2148     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
2149     if (NULL == c->t)
2150     {
2151       GNUNET_break (0);
2152       return GNUNET_OK;
2153     }
2154     GCT_handle_kx (c->t, &msg[1].header);
2155     return GNUNET_OK;
2156   }
2157
2158   /* Message not for us: forward to next hop */
2159   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2160   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2161   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
2162                                                     GNUNET_NO, NULL, NULL));
2163
2164   return GNUNET_OK;
2165 }
2166
2167
2168 /**
2169  * Core handler for encrypted cadet network traffic (channel mgmt, data).
2170  *
2171  * @param cls Closure (unused).
2172  * @param message Message received.
2173  * @param peer Peer who sent the message.
2174  *
2175  * @return GNUNET_OK to keep the connection open,
2176  *         GNUNET_SYSERR to close it (signal serious error)
2177  */
2178 int
2179 GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
2180                       const struct GNUNET_MessageHeader *message)
2181 {
2182   return handle_cadet_encrypted (peer,
2183                                 (struct GNUNET_CADET_Encrypted *)message);
2184 }
2185
2186
2187 /**
2188  * Core handler for key exchange traffic (ephemeral key, ping, pong).
2189  *
2190  * @param cls Closure (unused).
2191  * @param message Message received.
2192  * @param peer Peer who sent the message.
2193  *
2194  * @return GNUNET_OK to keep the connection open,
2195  *         GNUNET_SYSERR to close it (signal serious error)
2196  */
2197 int
2198 GCC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
2199                const struct GNUNET_MessageHeader *message)
2200 {
2201   return handle_cadet_kx (peer,
2202                          (struct GNUNET_CADET_KX *) message);
2203 }
2204
2205
2206 /**
2207  * Core handler for cadet network traffic point-to-point acks.
2208  *
2209  * @param cls closure
2210  * @param message message
2211  * @param peer peer identity this notification is about
2212  *
2213  * @return GNUNET_OK to keep the connection open,
2214  *         GNUNET_SYSERR to close it (signal serious error)
2215  */
2216 int
2217 GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
2218                 const struct GNUNET_MessageHeader *message)
2219 {
2220   struct GNUNET_CADET_ACK *msg;
2221   struct CadetConnection *c;
2222   struct CadetFlowControl *fc;
2223   GNUNET_PEER_Id id;
2224   uint32_t ack;
2225   int fwd;
2226
2227   msg = (struct GNUNET_CADET_ACK *) message;
2228   log_message (message, peer, &msg->cid);
2229   c = connection_get (&msg->cid);
2230   if (NULL == c)
2231   {
2232     GNUNET_STATISTICS_update (stats, "# ack on unknown connection", 1,
2233                               GNUNET_NO);
2234     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2235     return GNUNET_OK;
2236   }
2237
2238   /* Is this a forward or backward ACK? */
2239   id = GNUNET_PEER_search (peer);
2240   if (GCP_get_short_id (get_next_hop (c)) == id)
2241   {
2242     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
2243     fc = &c->fwd_fc;
2244     fwd = GNUNET_YES;
2245   }
2246   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2247   {
2248     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
2249     fc = &c->bck_fc;
2250     fwd = GNUNET_NO;
2251   }
2252   else
2253   {
2254     GNUNET_break_op (0);
2255     return GNUNET_OK;
2256   }
2257
2258   ack = ntohl (msg->ack);
2259   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK %u (was %u)\n",
2260               ack, fc->last_ack_recv);
2261   if (GC_is_pid_bigger (ack, fc->last_ack_recv))
2262     fc->last_ack_recv = ack;
2263
2264   /* Cancel polling if the ACK is big enough. */
2265   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
2266       GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2267   {
2268     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
2269     GNUNET_SCHEDULER_cancel (fc->poll_task);
2270     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2271     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
2272   }
2273
2274   connection_unlock_queue (c, fwd);
2275
2276   return GNUNET_OK;
2277 }
2278
2279
2280 /**
2281  * Core handler for cadet network traffic point-to-point ack polls.
2282  *
2283  * @param cls closure
2284  * @param message message
2285  * @param peer peer identity this notification is about
2286  *
2287  * @return GNUNET_OK to keep the connection open,
2288  *         GNUNET_SYSERR to close it (signal serious error)
2289  */
2290 int
2291 GCC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
2292                  const struct GNUNET_MessageHeader *message)
2293 {
2294   struct GNUNET_CADET_Poll *msg;
2295   struct CadetConnection *c;
2296   struct CadetFlowControl *fc;
2297   GNUNET_PEER_Id id;
2298   uint32_t pid;
2299   int fwd;
2300
2301   msg = (struct GNUNET_CADET_Poll *) message;
2302   log_message (message, peer, &msg->cid);
2303   c = connection_get (&msg->cid);
2304   if (NULL == c)
2305   {
2306     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
2307                               GNUNET_NO);
2308     LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL message on unknown connection %s!\n",
2309          GNUNET_h2s (GC_h2hc (&msg->cid)));
2310     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2311     return GNUNET_OK;
2312   }
2313
2314   /* Is this a forward or backward ACK?
2315    * Note: a poll should never be needed in a loopback case,
2316    * since there is no possiblility of packet loss there, so
2317    * this way of discerining FWD/BCK should not be a problem.
2318    */
2319   id = GNUNET_PEER_search (peer);
2320   if (GCP_get_short_id (get_next_hop (c)) == id)
2321   {
2322     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
2323     fc = &c->fwd_fc;
2324   }
2325   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2326   {
2327     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
2328     fc = &c->bck_fc;
2329   }
2330   else
2331   {
2332     GNUNET_break_op (0);
2333     return GNUNET_OK;
2334   }
2335
2336   pid = ntohl (msg->pid);
2337   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
2338   fc->last_pid_recv = pid;
2339   fwd = fc == &c->bck_fc;
2340   GCC_send_ack (c, fwd, GNUNET_YES);
2341
2342   return GNUNET_OK;
2343 }
2344
2345
2346 /**
2347  * Send an ACK on the appropriate connection/channel, depending on
2348  * the direction and the position of the peer.
2349  *
2350  * @param c Which connection to send the hop-by-hop ACK.
2351  * @param fwd Is this a fwd ACK? (will go dest->root).
2352  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
2353  */
2354 void
2355 GCC_send_ack (struct CadetConnection *c, int fwd, int force)
2356 {
2357   unsigned int buffer;
2358
2359   LOG (GNUNET_ERROR_TYPE_DEBUG,
2360        "GMC send %s ACK on %s\n",
2361        GC_f2s (fwd), GCC_2s (c));
2362
2363   if (NULL == c)
2364   {
2365     GNUNET_break (0);
2366     return;
2367   }
2368
2369   if (GNUNET_NO != c->destroy)
2370   {
2371     LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
2372     return;
2373   }
2374
2375   /* Get available buffer space */
2376   if (GCC_is_terminal (c, fwd))
2377   {
2378     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
2379     buffer = GCT_get_channels_buffer (c->t);
2380   }
2381   else
2382   {
2383     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
2384     buffer = GCC_get_buffer (c, fwd);
2385   }
2386   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
2387   if (0 == buffer && GNUNET_NO == force)
2388     return;
2389
2390   /* Send available buffer space */
2391   if (GCC_is_origin (c, fwd))
2392   {
2393     GNUNET_assert (NULL != c->t);
2394     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2395     GCT_unchoke_channels (c->t);
2396   }
2397   else
2398   {
2399     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2400     send_ack (c, buffer, fwd, force);
2401   }
2402 }
2403
2404
2405 /**
2406  * Initialize the connections subsystem
2407  *
2408  * @param c Configuration handle.
2409  */
2410 void
2411 GCC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2412 {
2413   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2414   if (GNUNET_OK !=
2415       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_MSGS_QUEUE",
2416                                              &max_msgs_queue))
2417   {
2418     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2419                                "CADET", "MAX_MSGS_QUEUE", "MISSING");
2420     GNUNET_SCHEDULER_shutdown ();
2421     return;
2422   }
2423
2424   if (GNUNET_OK !=
2425       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_CONNECTIONS",
2426                                              &max_connections))
2427   {
2428     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2429                                "CADET", "MAX_CONNECTIONS", "MISSING");
2430     GNUNET_SCHEDULER_shutdown ();
2431     return;
2432   }
2433
2434   if (GNUNET_OK !=
2435       GNUNET_CONFIGURATION_get_value_time (c, "CADET", "REFRESH_CONNECTION_TIME",
2436                                            &refresh_connection_time))
2437   {
2438     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2439                                "CADET", "REFRESH_CONNECTION_TIME", "MISSING");
2440     GNUNET_SCHEDULER_shutdown ();
2441     return;
2442   }
2443   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2444   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
2445 }
2446
2447
2448 /**
2449  * Destroy each connection on shutdown.
2450  *
2451  * @param cls Closure (unused).
2452  * @param key Current key code (CID, unused).
2453  * @param value Value in the hash map (connection)
2454  *
2455  * @return #GNUNET_YES, because we should continue to iterate,
2456  */
2457 static int
2458 shutdown_iterator (void *cls,
2459                    const struct GNUNET_HashCode *key,
2460                    void *value)
2461 {
2462   struct CadetConnection *c = value;
2463
2464   GCC_destroy (c);
2465   return GNUNET_YES;
2466 }
2467
2468
2469 /**
2470  * Shut down the connections subsystem.
2471  */
2472 void
2473 GCC_shutdown (void)
2474 {
2475   GNUNET_CONTAINER_multihashmap_iterate (connections, &shutdown_iterator, NULL);
2476   GNUNET_CONTAINER_multihashmap_destroy (connections);
2477   connections = NULL;
2478 }
2479
2480
2481 struct CadetConnection *
2482 GCC_new (const struct GNUNET_CADET_Hash *cid,
2483          struct CadetTunnel *t,
2484          struct CadetPeerPath *p,
2485          unsigned int own_pos)
2486 {
2487   struct CadetConnection *c;
2488
2489   c = GNUNET_new (struct CadetConnection);
2490   c->id = *cid;
2491   GNUNET_assert (GNUNET_OK ==
2492                  GNUNET_CONTAINER_multihashmap_put (connections,
2493                                                     GCC_get_h (c), c,
2494                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2495   fc_init (&c->fwd_fc);
2496   fc_init (&c->bck_fc);
2497   c->fwd_fc.c = c;
2498   c->bck_fc.c = c;
2499
2500   c->t = t;
2501   GNUNET_assert (own_pos <= p->length - 1);
2502   c->own_pos = own_pos;
2503   c->path = p;
2504   p->c = c;
2505
2506   if (GNUNET_OK != register_neighbors (c))
2507   {
2508     if (0 == own_pos)
2509     {
2510       path_invalidate (c->path);
2511       c->t = NULL;
2512       c->path = NULL;
2513     }
2514     GCC_destroy (c);
2515     return NULL;
2516   }
2517
2518   return c;
2519 }
2520
2521
2522 void
2523 GCC_destroy (struct CadetConnection *c)
2524 {
2525   if (NULL == c)
2526   {
2527     GNUNET_break (0);
2528     return;
2529   }
2530
2531   if (2 == c->destroy) /* cancel queues -> GCP_queue_cancel -> q_destroy -> */
2532     return;            /* -> message_sent -> GCC_destroy. Don't loop. */
2533   c->destroy = 2;
2534
2535   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GCC_2s (c));
2536   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc's f: %p, b: %p\n",
2537        &c->fwd_fc, &c->bck_fc);
2538   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2539        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2540
2541   /* Cancel all traffic */
2542   if (NULL != c->path)
2543   {
2544     connection_cancel_queues (c, GNUNET_YES);
2545     connection_cancel_queues (c, GNUNET_NO);
2546     unregister_neighbors (c);
2547   }
2548
2549   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2550        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2551
2552   /* Cancel maintainance task (keepalive/timeout) */
2553   if (NULL != c->fwd_fc.poll_msg)
2554   {
2555     GCC_cancel (c->fwd_fc.poll_msg);
2556     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg FWD canceled\n");
2557   }
2558   if (NULL != c->bck_fc.poll_msg)
2559   {
2560     GCC_cancel (c->bck_fc.poll_msg);
2561     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg BCK canceled\n");
2562   }
2563
2564   /* Delete from tunnel */
2565   if (NULL != c->t)
2566     GCT_remove_connection (c->t, c);
2567
2568   if (GNUNET_NO == GCC_is_origin (c, GNUNET_YES) && NULL != c->path)
2569     path_destroy (c->path);
2570   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
2571     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
2572   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
2573     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
2574   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_fc.poll_task)
2575   {
2576     GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
2577     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL FWD canceled\n");
2578   }
2579   if (GNUNET_SCHEDULER_NO_TASK != c->bck_fc.poll_task)
2580   {
2581     GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
2582     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL BCK canceled\n");
2583   }
2584
2585   GNUNET_break (GNUNET_YES ==
2586                 GNUNET_CONTAINER_multihashmap_remove (connections,
2587                                                       GCC_get_h (c), c));
2588
2589   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
2590   GNUNET_free (c);
2591 }
2592
2593 /**
2594  * Get the connection ID.
2595  *
2596  * @param c Connection to get the ID from.
2597  *
2598  * @return ID of the connection.
2599  */
2600 const struct GNUNET_CADET_Hash *
2601 GCC_get_id (const struct CadetConnection *c)
2602 {
2603   return &c->id;
2604 }
2605
2606
2607 /**
2608  * Get the connection ID.
2609  *
2610  * @param c Connection to get the ID from.
2611  *
2612  * @return ID of the connection.
2613  */
2614 const struct GNUNET_HashCode *
2615 GCC_get_h (const struct CadetConnection *c)
2616 {
2617   return GC_h2hc (&c->id);
2618 }
2619
2620
2621 /**
2622  * Get the connection path.
2623  *
2624  * @param c Connection to get the path from.
2625  *
2626  * @return path used by the connection.
2627  */
2628 const struct CadetPeerPath *
2629 GCC_get_path (const struct CadetConnection *c)
2630 {
2631   if (GNUNET_NO == c->destroy)
2632     return c->path;
2633   return NULL;
2634 }
2635
2636
2637 /**
2638  * Get the connection state.
2639  *
2640  * @param c Connection to get the state from.
2641  *
2642  * @return state of the connection.
2643  */
2644 enum CadetConnectionState
2645 GCC_get_state (const struct CadetConnection *c)
2646 {
2647   return c->state;
2648 }
2649
2650 /**
2651  * Get the connection tunnel.
2652  *
2653  * @param c Connection to get the tunnel from.
2654  *
2655  * @return tunnel of the connection.
2656  */
2657 struct CadetTunnel *
2658 GCC_get_tunnel (const struct CadetConnection *c)
2659 {
2660   return c->t;
2661 }
2662
2663
2664 /**
2665  * Get free buffer space in a connection.
2666  *
2667  * @param c Connection.
2668  * @param fwd Is query about FWD traffic?
2669  *
2670  * @return Free buffer space [0 - max_msgs_queue/max_connections]
2671  */
2672 unsigned int
2673 GCC_get_buffer (struct CadetConnection *c, int fwd)
2674 {
2675   struct CadetFlowControl *fc;
2676
2677   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2678
2679   return (fc->queue_max - fc->queue_n);
2680 }
2681
2682 /**
2683  * Get how many messages have we allowed to send to us from a direction.
2684  *
2685  * @param c Connection.
2686  * @param fwd Are we asking about traffic from FWD (BCK messages)?
2687  *
2688  * @return last_ack_sent - last_pid_recv
2689  */
2690 unsigned int
2691 GCC_get_allowed (struct CadetConnection *c, int fwd)
2692 {
2693   struct CadetFlowControl *fc;
2694
2695   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2696   if (GC_is_pid_bigger (fc->last_pid_recv, fc->last_ack_sent))
2697   {
2698     return 0;
2699   }
2700   return (fc->last_ack_sent - fc->last_pid_recv);
2701 }
2702
2703 /**
2704  * Get messages queued in a connection.
2705  *
2706  * @param c Connection.
2707  * @param fwd Is query about FWD traffic?
2708  *
2709  * @return Number of messages queued.
2710  */
2711 unsigned int
2712 GCC_get_qn (struct CadetConnection *c, int fwd)
2713 {
2714   struct CadetFlowControl *fc;
2715
2716   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2717
2718   return fc->queue_n;
2719 }
2720
2721
2722 /**
2723  * Get next PID to use.
2724  *
2725  * @param c Connection.
2726  * @param fwd Is query about FWD traffic?
2727  *
2728  * @return Last PID used + 1.
2729  */
2730 unsigned int
2731 GCC_get_pid (struct CadetConnection *c, int fwd)
2732 {
2733   struct CadetFlowControl *fc;
2734
2735   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2736
2737   return fc->last_pid_sent + 1;
2738 }
2739
2740
2741 /**
2742  * Allow the connection to advertise a buffer of the given size.
2743  *
2744  * The connection will send an @c fwd ACK message (so: in direction !fwd)
2745  * allowing up to last_pid_recv + buffer.
2746  *
2747  * @param c Connection.
2748  * @param buffer How many more messages the connection can accept.
2749  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
2750  */
2751 void
2752 GCC_allow (struct CadetConnection *c, unsigned int buffer, int fwd)
2753 {
2754   LOG (GNUNET_ERROR_TYPE_DEBUG, "  allowing %s %u messages %s\n",
2755        GCC_2s (c), buffer, GC_f2s (fwd));
2756   send_ack (c, buffer, fwd, GNUNET_NO);
2757 }
2758
2759
2760 /**
2761  * Notify other peers on a connection of a broken link. Mark connections
2762  * to destroy after all traffic has been sent.
2763  *
2764  * @param c Connection on which there has been a disconnection.
2765  * @param peer Peer that disconnected.
2766  */
2767 void
2768 GCC_notify_broken (struct CadetConnection *c,
2769                    struct CadetPeer *peer)
2770 {
2771   int fwd;
2772
2773   LOG (GNUNET_ERROR_TYPE_DEBUG,
2774        " notify broken on %s due to %s disconnect\n",
2775        GCC_2s (c), GCP_2s (peer));
2776
2777   fwd = peer == get_prev_hop (c);
2778
2779   if (GNUNET_YES == GCC_is_terminal (c, fwd))
2780   {
2781     /* Local shutdown, no one to notify about this. */
2782     GCC_destroy (c);
2783     return;
2784   }
2785   if (GNUNET_NO == c->destroy)
2786     send_broken (c, &my_full_id, GCP_get_id (peer), fwd);
2787
2788   /* Connection will have at least one pending message
2789    * (the one we just scheduled), so no point in checking whether to
2790    * destroy immediately. */
2791   c->destroy = GNUNET_YES;
2792   c->state = CADET_CONNECTION_DESTROYED;
2793
2794   /**
2795    * Cancel all queues, if no message is left, connection will be destroyed.
2796    */
2797   connection_cancel_queues (c, !fwd);
2798
2799   return;
2800 }
2801
2802
2803 /**
2804  * Is this peer the first one on the connection?
2805  *
2806  * @param c Connection.
2807  * @param fwd Is this about fwd traffic?
2808  *
2809  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
2810  */
2811 int
2812 GCC_is_origin (struct CadetConnection *c, int fwd)
2813 {
2814   if (!fwd && c->path->length - 1 == c->own_pos )
2815     return GNUNET_YES;
2816   if (fwd && 0 == c->own_pos)
2817     return GNUNET_YES;
2818   return GNUNET_NO;
2819 }
2820
2821
2822 /**
2823  * Is this peer the last one on the connection?
2824  *
2825  * @param c Connection.
2826  * @param fwd Is this about fwd traffic?
2827  *            Note that the ROOT is the terminal for BCK traffic!
2828  *
2829  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
2830  */
2831 int
2832 GCC_is_terminal (struct CadetConnection *c, int fwd)
2833 {
2834   return GCC_is_origin (c, !fwd);
2835 }
2836
2837
2838 /**
2839  * See if we are allowed to send by the next hop in the given direction.
2840  *
2841  * @param c Connection.
2842  * @param fwd Is this about fwd traffic?
2843  *
2844  * @return #GNUNET_YES in case it's OK to send.
2845  */
2846 int
2847 GCC_is_sendable (struct CadetConnection *c, int fwd)
2848 {
2849   struct CadetFlowControl *fc;
2850
2851   LOG (GNUNET_ERROR_TYPE_DEBUG, " checking sendability of %s traffic on %s\n",
2852        GC_f2s (fwd), GCC_2s (c));
2853   if (NULL == c)
2854   {
2855     GNUNET_break (0);
2856     return GNUNET_YES;
2857   }
2858   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2859   LOG (GNUNET_ERROR_TYPE_DEBUG,
2860        " last ack recv: %u, last pid sent: %u\n",
2861        fc->last_ack_recv, fc->last_pid_sent);
2862   if (GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2863   {
2864     LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable\n");
2865     return GNUNET_YES;
2866   }
2867   LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n");
2868   return GNUNET_NO;
2869 }
2870
2871
2872 /**
2873  * Check if this connection is a direct one (never trim a direct connection).
2874  *
2875  * @param c Connection.
2876  *
2877  * @return #GNUNET_YES in case it's a direct connection, #GNUNET_NO otherwise.
2878  */
2879 int
2880 GCC_is_direct (struct CadetConnection *c)
2881 {
2882   return (c->path->length == 2) ? GNUNET_YES : GNUNET_NO;
2883 }
2884
2885 /**
2886  * Sends an already built message on a connection, properly registering
2887  * all used resources.
2888  *
2889  * @param message Message to send. Function makes a copy of it.
2890  *                If message is not hop-by-hop, decrements TTL of copy.
2891  * @param payload_type Type of payload, in case the message is encrypted.
2892  * @param c Connection on which this message is transmitted.
2893  * @param fwd Is this a fwd message?
2894  * @param force Force the connection to accept the message (buffer overfill).
2895  * @param cont Continuation called once message is sent. Can be NULL.
2896  * @param cont_cls Closure for @c cont.
2897  *
2898  * @return Handle to cancel the message before it's sent.
2899  *         NULL on error or if @c cont is NULL.
2900  *         Invalid on @c cont call.
2901  */
2902 struct CadetConnectionQueue *
2903 GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2904                            uint16_t payload_type, uint32_t payload_id,
2905                            struct CadetConnection *c, int fwd, int force,
2906                            GCC_sent cont, void *cont_cls)
2907 {
2908   struct CadetFlowControl *fc;
2909   struct CadetConnectionQueue *q;
2910   void *data;
2911   size_t size;
2912   uint16_t type;
2913   int droppable;
2914
2915   size = ntohs (message->size);
2916   data = GNUNET_malloc (size);
2917   memcpy (data, message, size);
2918   type = ntohs (message->type);
2919   LOG (GNUNET_ERROR_TYPE_INFO, "--> %s (%s %u) on connection %s (%u bytes)\n",
2920        GC_m2s (type), GC_m2s (payload_type), payload_id, GCC_2s (c), size);
2921
2922   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2923   droppable = GNUNET_NO == force;
2924   switch (type)
2925   {
2926     struct GNUNET_CADET_Encrypted *emsg;
2927     struct GNUNET_CADET_KX        *kmsg;
2928     struct GNUNET_CADET_ACK       *amsg;
2929     struct GNUNET_CADET_Poll      *pmsg;
2930     struct GNUNET_CADET_ConnectionDestroy *dmsg;
2931     struct GNUNET_CADET_ConnectionBroken  *bmsg;
2932     uint32_t ttl;
2933
2934     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
2935       emsg = (struct GNUNET_CADET_Encrypted *) data;
2936       ttl = ntohl (emsg->ttl);
2937       if (0 == ttl)
2938       {
2939         GNUNET_break_op (0);
2940         GNUNET_free (data);
2941         return NULL;
2942       }
2943       emsg->cid = c->id;
2944       emsg->ttl = htonl (ttl - 1);
2945       emsg->pid = htonl (0);
2946       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
2947       LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
2948       LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
2949       if (GNUNET_YES == droppable)
2950       {
2951         fc->queue_n++;
2952       }
2953       else
2954       {
2955         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not droppable, Q_N stays the same\n");
2956       }
2957       if (GC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
2958       {
2959         GCC_start_poll (c, fwd);
2960       }
2961       break;
2962
2963     case GNUNET_MESSAGE_TYPE_CADET_KX:
2964       kmsg = (struct GNUNET_CADET_KX *) data;
2965       kmsg->cid = c->id;
2966       break;
2967
2968     case GNUNET_MESSAGE_TYPE_CADET_ACK:
2969       amsg = (struct GNUNET_CADET_ACK *) data;
2970       amsg->cid = c->id;
2971       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
2972       droppable = GNUNET_NO;
2973       break;
2974
2975     case GNUNET_MESSAGE_TYPE_CADET_POLL:
2976       pmsg = (struct GNUNET_CADET_Poll *) data;
2977       pmsg->cid = c->id;
2978       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
2979       droppable = GNUNET_NO;
2980       break;
2981
2982     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
2983       dmsg = (struct GNUNET_CADET_ConnectionDestroy *) data;
2984       dmsg->cid = c->id;
2985       break;
2986
2987     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
2988       bmsg = (struct GNUNET_CADET_ConnectionBroken *) data;
2989       bmsg->cid = c->id;
2990       break;
2991
2992     case GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE:
2993       GNUNET_break (0);
2994       /* falltrough */
2995     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
2996     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
2997       break;
2998
2999     default:
3000       GNUNET_break (0);
3001       GNUNET_free (data);
3002       return NULL;
3003   }
3004
3005   if (fc->queue_n > fc->queue_max && droppable)
3006   {
3007     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
3008                               1, GNUNET_NO);
3009     GNUNET_break (0);
3010     LOG (GNUNET_ERROR_TYPE_DEBUG, "queue full: %u/%u\n",
3011          fc->queue_n, fc->queue_max);
3012     if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == type)
3013     {
3014       fc->queue_n--;
3015     }
3016     GNUNET_free (data);
3017     return NULL; /* Drop this message */
3018   }
3019
3020   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %s %u\n",
3021        GCC_2s (c), c->pending_messages);
3022   c->pending_messages++;
3023
3024   q = GNUNET_new (struct CadetConnectionQueue);
3025   q->forced = !droppable;
3026   q->q = GCP_queue_add (get_hop (c, fwd), data, type, payload_type, payload_id,
3027                         size, c, fwd, &conn_message_sent, q);
3028   if (NULL == q->q)
3029   {
3030     LOG (GNUNET_ERROR_TYPE_DEBUG, "dropping msg on %s, NULL q\n", GCC_2s (c));
3031     GNUNET_free (data);
3032     GNUNET_free (q);
3033     return NULL;
3034   }
3035   q->cont = cont;
3036   q->cont_cls = cont_cls;
3037   return (NULL == cont) ? NULL : q;
3038 }
3039
3040
3041 /**
3042  * Cancel a previously sent message while it's in the queue.
3043  *
3044  * ONLY can be called before the continuation given to the send function
3045  * is called. Once the continuation is called, the message is no longer in the
3046  * queue.
3047  *
3048  * @param q Handle to the queue.
3049  */
3050 void
3051 GCC_cancel (struct CadetConnectionQueue *q)
3052 {
3053   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GMC cancel message\n");
3054
3055   /* queue destroy calls message_sent, which calls q->cont and frees q */
3056   GCP_queue_destroy (q->q, GNUNET_YES, GNUNET_NO, 0);
3057 }
3058
3059
3060 /**
3061  * Sends a CREATE CONNECTION message for a path to a peer.
3062  * Changes the connection and tunnel states if necessary.
3063  *
3064  * @param connection Connection to create.
3065  */
3066 void
3067 GCC_send_create (struct CadetConnection *connection)
3068 {
3069   enum CadetTunnelCState state;
3070   size_t size;
3071
3072   size = sizeof (struct GNUNET_CADET_ConnectionCreate);
3073   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
3074
3075   LOG (GNUNET_ERROR_TYPE_INFO, "===> %s on connection %s  (%u bytes)\n",
3076        GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE),
3077        GCC_2s (connection), size);
3078   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
3079        connection, connection->pending_messages);
3080   connection->pending_messages++;
3081
3082   connection->maintenance_q =
3083     GCP_queue_add (get_next_hop (connection), NULL,
3084                    GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0, 0,
3085                    size, connection, GNUNET_YES, &conn_message_sent, NULL);
3086
3087   state = GCT_get_cstate (connection->t);
3088   if (CADET_TUNNEL_SEARCHING == state || CADET_TUNNEL_NEW == state)
3089     GCT_change_cstate (connection->t, CADET_TUNNEL_WAITING);
3090   if (CADET_CONNECTION_NEW == connection->state)
3091     connection_change_state (connection, CADET_CONNECTION_SENT);
3092 }
3093
3094
3095 /**
3096  * Send a message to all peers in this connection that the connection
3097  * is no longer valid.
3098  *
3099  * If some peer should not receive the message, it should be zero'ed out
3100  * before calling this function.
3101  *
3102  * @param c The connection whose peers to notify.
3103  */
3104 void
3105 GCC_send_destroy (struct CadetConnection *c)
3106 {
3107   struct GNUNET_CADET_ConnectionDestroy msg;
3108
3109   if (GNUNET_YES == c->destroy)
3110     return;
3111
3112   msg.header.size = htons (sizeof (msg));
3113   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
3114   msg.cid = c->id;
3115   LOG (GNUNET_ERROR_TYPE_DEBUG,
3116               "  sending connection destroy for connection %s\n",
3117               GCC_2s (c));
3118
3119   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_YES))
3120     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c,
3121                                                       GNUNET_YES, GNUNET_YES,
3122                                                       NULL, NULL));
3123   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_NO))
3124     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, 0, 0, c,
3125                                                       GNUNET_NO, GNUNET_YES,
3126                                                       NULL, NULL));
3127   c->destroy = GNUNET_YES;
3128   c->state = CADET_CONNECTION_DESTROYED;
3129 }
3130
3131
3132 /**
3133  * @brief Start a polling timer for the connection.
3134  *
3135  * When a neighbor does not accept more traffic on the connection it could be
3136  * caused by a simple congestion or by a lost ACK. Polling enables to check
3137  * for the lastest ACK status for a connection.
3138  *
3139  * @param c Connection.
3140  * @param fwd Should we poll in the FWD direction?
3141  */
3142 void
3143 GCC_start_poll (struct CadetConnection *c, int fwd)
3144 {
3145   struct CadetFlowControl *fc;
3146
3147   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3148   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL %s requested\n",
3149        GC_f2s (fwd));
3150   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task || NULL != fc->poll_msg)
3151   {
3152     LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   not needed (%u, %p)\n",
3153          fc->poll_task, fc->poll_msg);
3154     return;
3155   }
3156   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL started on request\n");
3157   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
3158                                                 &connection_poll,
3159                                                 fc);
3160 }
3161
3162
3163 /**
3164  * @brief Stop polling a connection for ACKs.
3165  *
3166  * Once we have enough ACKs for future traffic, polls are no longer necessary.
3167  *
3168  * @param c Connection.
3169  * @param fwd Should we stop the poll in the FWD direction?
3170  */
3171 void
3172 GCC_stop_poll (struct CadetConnection *c, int fwd)
3173 {
3174   struct CadetFlowControl *fc;
3175
3176   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3177   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
3178   {
3179     GNUNET_SCHEDULER_cancel (fc->poll_task);
3180     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
3181   }
3182 }
3183
3184 /**
3185  * Get a (static) string for a connection.
3186  *
3187  * @param c Connection.
3188  */
3189 const char *
3190 GCC_2s (const struct CadetConnection *c)
3191 {
3192   if (NULL == c)
3193     return "NULL";
3194
3195   if (NULL != c->t)
3196   {
3197     static char buf[128];
3198
3199     sprintf (buf, "%s (->%s)",
3200              GNUNET_h2s (GC_h2hc (GCC_get_id (c))), GCT_2s (c->t));
3201     return buf;
3202   }
3203   return GNUNET_h2s (GC_h2hc (&c->id));
3204 }
3205
3206
3207 /**
3208  * Log all possible info about the connection state.
3209  *
3210  * @param c Connection to debug.
3211  * @param level Debug level to use.
3212  */
3213 void
3214 GCC_debug (const struct CadetConnection *c, enum GNUNET_ErrorType level)
3215 {
3216   int do_log;
3217   char *s;
3218
3219   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
3220                                        "cadet-con",
3221                                        __FILE__, __FUNCTION__, __LINE__);
3222   if (0 == do_log)
3223     return;
3224
3225   if (NULL == c)
3226   {
3227     LOG2 (level, "CCC DEBUG NULL CONNECTION\n");
3228     return;
3229   }
3230
3231   LOG2 (level, "CCC DEBUG CONNECTION %s\n", GCC_2s (c));
3232   s = path_2s (c->path);
3233   LOG2 (level, "CCC  path %s, own pos: %u\n", s, c->own_pos);
3234   GNUNET_free (s);
3235   LOG2 (level, "CCC  state: %s, destroy: %u\n",
3236         GCC_state2s (c->state), c->destroy);
3237   LOG2 (level, "CCC  pending messages: %u\n", c->pending_messages);
3238   if (NULL != c->perf)
3239     LOG2 (level, "CCC  us/byte: %f\n", c->perf->avg);
3240
3241   LOG2 (level, "CCC  FWD flow control:\n");
3242   LOG2 (level, "CCC   queue: %u/%u\n", c->fwd_fc.queue_n, c->fwd_fc.queue_max);
3243   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3244         c->fwd_fc.last_pid_sent, c->fwd_fc.last_pid_recv);
3245   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3246         c->fwd_fc.last_ack_sent, c->fwd_fc.last_ack_recv);
3247   LOG2 (level, "CCC   POLL: task %d, msg  %p, msg_ack %p)\n",
3248         c->fwd_fc.poll_task, c->fwd_fc.poll_msg, c->fwd_fc.ack_msg);
3249
3250   LOG2 (level, "CCC  BCK flow control:\n");
3251   LOG2 (level, "CCC   queue: %u/%u\n", c->bck_fc.queue_n, c->bck_fc.queue_max);
3252   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3253         c->bck_fc.last_pid_sent, c->bck_fc.last_pid_recv);
3254   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3255         c->bck_fc.last_ack_sent, c->bck_fc.last_ack_recv);
3256   LOG2 (level, "CCC   POLL: task %d, msg  %p, msg_ack %p)\n",
3257         c->bck_fc.poll_task, c->bck_fc.poll_msg, c->bck_fc.ack_msg);
3258
3259   LOG2 (level, "CCC DEBUG CONNECTION END\n");
3260 }
3261