0c11c24df9f832b8ae2507202486b1960d9e7706
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_connection.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/gnunet-service-cadet_connection.c
22  * @brief GNUnet CADET service connection handling
23  * @author Bartlomiej Polot
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_statistics_service.h"
28 #include "cadet_path.h"
29 #include "cadet_protocol.h"
30 #include "cadet.h"
31 #include "gnunet-service-cadet_connection.h"
32 #include "gnunet-service-cadet_peer.h"
33 #include "gnunet-service-cadet_tunnel.h"
34
35
36 /**
37  * Should we run somewhat expensive checks on our invariants?
38  */
39 #define CHECK_INVARIANTS 0
40
41
42 #define LOG(level, ...) GNUNET_log_from (level,"cadet-con",__VA_ARGS__)
43 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
44
45
46 #define CADET_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
47                                   GNUNET_TIME_UNIT_MINUTES,\
48                                   10)
49 #define AVG_MSGS                32
50
51
52 /******************************************************************************/
53 /********************************   STRUCTS  **********************************/
54 /******************************************************************************/
55
56 /**
57  * Struct to encapsulate all the Flow Control information to a peer to which
58  * we are directly connected (on a core level).
59  */
60 struct CadetFlowControl
61 {
62   /**
63    * Connection this controls.
64    */
65   struct CadetConnection *c;
66
67   /**
68    * How many messages are in the queue on this connection.
69    */
70   unsigned int queue_n;
71
72   /**
73    * How many messages do we accept in the queue.
74    * If 0, the connection is broken in this direction (next hop disconnected).
75    */
76   unsigned int queue_max;
77
78   /**
79    * ID of the last packet sent towards the peer.
80    */
81   uint32_t last_pid_sent;
82
83   /**
84    * ID of the last packet received from the peer.
85    */
86   uint32_t last_pid_recv;
87
88   /**
89    * Bitmap of past 32 messages received:
90    * - LSB being @c last_pid_recv.
91    * - MSB being @c last_pid_recv - 31 (mod UINTMAX).
92    */
93   uint32_t recv_bitmap;
94
95   /**
96    * Last ACK sent to the peer (peer can't send more than this PID).
97    */
98   uint32_t last_ack_sent;
99
100   /**
101    * Last ACK sent towards the origin (for traffic towards leaf node).
102    */
103   uint32_t last_ack_recv;
104
105   /**
106    * Task to poll the peer in case of a lost ACK causes stall.
107    */
108   struct GNUNET_SCHEDULER_Task *poll_task;
109
110   /**
111    * How frequently to poll for ACKs.
112    */
113   struct GNUNET_TIME_Relative poll_time;
114
115   /**
116    * Queued poll message, to cancel if not necessary anymore (got ACK).
117    */
118   struct CadetConnectionQueue *poll_msg;
119
120   /**
121    * Queued poll message, to cancel if not necessary anymore (got ACK).
122    */
123   struct CadetConnectionQueue *ack_msg;
124 };
125
126 /**
127  * Keep a record of the last messages sent on this connection.
128  */
129 struct CadetConnectionPerformance
130 {
131   /**
132    * Circular buffer for storing measurements.
133    */
134   double usecsperbyte[AVG_MSGS];
135
136   /**
137    * Running average of @c usecsperbyte.
138    */
139   double avg;
140
141   /**
142    * How many values of @c usecsperbyte are valid.
143    */
144   uint16_t size;
145
146   /**
147    * Index of the next "free" position in @c usecsperbyte.
148    */
149   uint16_t idx;
150 };
151
152
153 /**
154  * Struct containing all information regarding a connection to a peer.
155  */
156 struct CadetConnection
157 {
158   /**
159    * Tunnel this connection is part of.
160    */
161   struct CadetTunnel *t;
162
163   /**
164    * Flow control information for traffic fwd.
165    */
166   struct CadetFlowControl fwd_fc;
167
168   /**
169    * Flow control information for traffic bck.
170    */
171   struct CadetFlowControl bck_fc;
172
173   /**
174    * Measure connection performance on the endpoint.
175    */
176   struct CadetConnectionPerformance *perf;
177
178   /**
179    * ID of the connection.
180    */
181   struct GNUNET_CADET_Hash id;
182
183   /**
184    * Path being used for the tunnel. At the origin of the connection
185    * it's a pointer to the destination's path pool, otherwise just a copy.
186    */
187   struct CadetPeerPath *path;
188
189   /**
190    * Task to keep the used paths alive at the owner,
191    * time tunnel out on all the other peers.
192    */
193   struct GNUNET_SCHEDULER_Task *fwd_maintenance_task;
194
195   /**
196    * Task to keep the used paths alive at the destination,
197    * time tunnel out on all the other peers.
198    */
199   struct GNUNET_SCHEDULER_Task *bck_maintenance_task;
200
201   /**
202    * Queue handle for maintainance traffic. One handle for FWD and BCK since
203    * one peer never needs to maintain both directions (no loopback connections).
204    */
205   struct CadetPeerQueue *maintenance_q;
206
207   /**
208    * Should equal #get_next_hop(), or NULL if that peer disconnected.
209    */
210   struct CadetPeer *next_peer;
211
212   /**
213    * Should equal #get_prev_hop(), or NULL if that peer disconnected.
214    */
215   struct CadetPeer *prev_peer;
216
217   /**
218    * State of the connection.
219    */
220   enum CadetConnectionState state;
221
222   /**
223    * Position of the local peer in the path.
224    */
225   unsigned int own_pos;
226
227   /**
228    * Pending message count.
229    */
230   unsigned int pending_messages;
231
232   /**
233    * Destroy flag:
234    * - if 0, connection in use.
235    * - if 1, destroy on last message.
236    * - if 2, connection is being destroyed don't re-enter.
237    */
238   int destroy;
239
240   /**
241    * In-connection-map flag. Sometimes, when @e destroy is set but
242    * actual destruction is delayed to enable us to finish processing
243    * queues (i.e. in the direction that is still working), we remove
244    * the connection from the map to prevent it from still being
245    * found (and used) by accident. This flag is set to #GNUNET_YES
246    * for a connection that is not in the #connections map.  Should
247    * only be #GNUNET_YES if #destroy is also non-zero.
248    */
249   int was_removed;
250
251   /**
252    * Counter to do exponential backoff when creating a connection (max 64).
253    */
254   unsigned short create_retry;
255
256   /**
257    * Task to check if connection has duplicates.
258    */
259   struct GNUNET_SCHEDULER_Task *check_duplicates_task;
260 };
261
262
263 /**
264  * Handle for messages queued but not yet sent.
265  */
266 struct CadetConnectionQueue
267 {
268   /**
269    * Peer queue handle, to cancel if necessary.
270    */
271   struct CadetPeerQueue *q;
272
273   /**
274    * Continuation to call once sent.
275    */
276   GCC_sent cont;
277
278   /**
279    * Closure for @e cont.
280    */
281   void *cont_cls;
282
283   /**
284    * Was this a forced message? (Do not account for it)
285    */
286   int forced;
287 };
288
289 /******************************************************************************/
290 /*******************************   GLOBALS  ***********************************/
291 /******************************************************************************/
292
293 /**
294  * Global handle to the statistics service.
295  */
296 extern struct GNUNET_STATISTICS_Handle *stats;
297
298 /**
299  * Local peer own ID (memory efficient handle).
300  */
301 extern GNUNET_PEER_Id myid;
302
303 /**
304  * Local peer own ID (full value).
305  */
306 extern struct GNUNET_PeerIdentity my_full_id;
307
308 /**
309  * Connections known, indexed by cid (CadetConnection).
310  */
311 static struct GNUNET_CONTAINER_MultiHashMap *connections;
312
313 /**
314  * How many connections are we willing to maintain.
315  * Local connections are always allowed, even if there are more connections than max.
316  */
317 static unsigned long long max_connections;
318
319 /**
320  * How many messages *in total* are we willing to queue, divide by number of
321  * connections to get connection queue size.
322  */
323 static unsigned long long max_msgs_queue;
324
325 /**
326  * How often to send path keepalives. Paths timeout after 4 missed.
327  */
328 static struct GNUNET_TIME_Relative refresh_connection_time;
329
330 /**
331  * How often to send path create / ACKs.
332  */
333 static struct GNUNET_TIME_Relative create_connection_time;
334
335
336 /******************************************************************************/
337 /********************************   STATIC  ***********************************/
338 /******************************************************************************/
339
340
341
342 #if 0 // avoid compiler warning for unused static function
343 static void
344 fc_debug (struct CadetFlowControl *fc)
345 {
346   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
347               fc->last_pid_recv, fc->last_ack_sent);
348   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
349               fc->last_pid_sent, fc->last_ack_recv);
350   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
351               fc->queue_n, fc->queue_max);
352 }
353
354 static void
355 connection_debug (struct CadetConnection *c)
356 {
357   if (NULL == c)
358   {
359     LOG (GNUNET_ERROR_TYPE_INFO, "DEBUG NULL CONNECTION\n");
360     return;
361   }
362   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
363               peer2s (c->t->peer), GCC_2s (c));
364   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
365               c->state, c->pending_messages);
366   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
367   fc_debug (&c->fwd_fc);
368   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
369   fc_debug (&c->bck_fc);
370 }
371 #endif
372
373
374 /**
375  * Schedule next keepalive task, taking in consideration
376  * the connection state and number of retries.
377  *
378  * @param c Connection for which to schedule the next keepalive.
379  * @param fwd Direction for the next keepalive.
380  */
381 static void
382 schedule_next_keepalive (struct CadetConnection *c, int fwd);
383
384
385 /**
386  * Resets the connection timeout task, some other message has done the
387  * task's job.
388  * - For the first peer on the direction this means to send
389  *   a keepalive or a path confirmation message (either create or ACK).
390  * - For all other peers, this means to destroy the connection,
391  *   due to lack of activity.
392  * Starts the timeout if no timeout was running (connection just created).
393  *
394  * @param c Connection whose timeout to reset.
395  * @param fwd Is this forward?
396  */
397 static void
398 connection_reset_timeout (struct CadetConnection *c, int fwd);
399
400
401 /**
402  * Get string description for tunnel state. Reentrant.
403  *
404  * @param s Tunnel state.
405  *
406  * @return String representation.
407  */
408 static const char *
409 GCC_state2s (enum CadetConnectionState s)
410 {
411   switch (s)
412   {
413     case CADET_CONNECTION_NEW:
414       return "CADET_CONNECTION_NEW";
415     case CADET_CONNECTION_SENT:
416       return "CADET_CONNECTION_SENT";
417     case CADET_CONNECTION_ACK:
418       return "CADET_CONNECTION_ACK";
419     case CADET_CONNECTION_READY:
420       return "CADET_CONNECTION_READY";
421     case CADET_CONNECTION_DESTROYED:
422       return "CADET_CONNECTION_DESTROYED";
423     case CADET_CONNECTION_BROKEN:
424       return "CADET_CONNECTION_BROKEN";
425     default:
426       GNUNET_break (0);
427       LOG (GNUNET_ERROR_TYPE_ERROR, " conn state %u unknown!\n", s);
428       return "CADET_CONNECTION_STATE_ERROR";
429   }
430 }
431
432
433 /**
434  * Initialize a Flow Control structure to the initial state.
435  *
436  * @param fc Flow Control structure to initialize.
437  */
438 static void
439 fc_init (struct CadetFlowControl *fc)
440 {
441   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
442   fc->last_pid_recv = (uint32_t) -1;
443   fc->last_ack_sent = (uint32_t) 0;
444   fc->last_ack_recv = (uint32_t) 0;
445   fc->poll_task = NULL;
446   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
447   fc->queue_n = 0;
448   fc->queue_max = (max_msgs_queue / max_connections) + 1;
449 }
450
451
452 /**
453  * Find a connection.
454  *
455  * @param cid Connection ID.
456  *
457  * @return conntection with the given ID @cid or NULL if not found.
458  */
459 static struct CadetConnection *
460 connection_get (const struct GNUNET_CADET_Hash *cid)
461 {
462   return GNUNET_CONTAINER_multihashmap_get (connections, GC_h2hc (cid));
463 }
464
465
466 /**
467  * Change the connection state. Cannot change a connection marked as destroyed.
468  *
469  * @param c Connection to change.
470  * @param state New state to set.
471  */
472 static void
473 connection_change_state (struct CadetConnection* c,
474                          enum CadetConnectionState state)
475 {
476   LOG (GNUNET_ERROR_TYPE_DEBUG,
477        "Connection %s state %s -> %s\n",
478        GCC_2s (c), GCC_state2s (c->state), GCC_state2s (state));
479   if (CADET_CONNECTION_DESTROYED <= c->state) /* Destroyed or broken. */
480   {
481     LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
482     return;
483   }
484   c->state = state;
485   if (CADET_CONNECTION_READY == state)
486     c->create_retry = 1;
487 }
488
489
490 /**
491  * Mark a connection as "destroyed", to send all pending traffic and freeing
492  * all associated resources, without accepting new status changes on it.
493  *
494  * @param c Connection to mark as destroyed.
495  */
496 static void
497 mark_destroyed (struct CadetConnection *c)
498 {
499   c->destroy = GNUNET_YES;
500   connection_change_state (c, CADET_CONNECTION_DESTROYED);
501 }
502
503
504 /**
505  * Callback called when a queued ACK message is sent.
506  *
507  * @param cls Closure (FC).
508  * @param c Connection this message was on.
509  * @param q Queue handler this call invalidates.
510  * @param type Type of message sent.
511  * @param fwd Was this a FWD going message?
512  * @param size Size of the message.
513  */
514 static void
515 ack_sent (void *cls,
516           struct CadetConnection *c,
517           struct CadetConnectionQueue *q,
518           uint16_t type, int fwd, size_t size)
519 {
520   struct CadetFlowControl *fc = cls;
521
522   fc->ack_msg = NULL;
523 }
524
525
526 /**
527  * Send an ACK on the connection, informing the predecessor about
528  * the available buffer space. Should not be called in case the peer
529  * is origin (no predecessor) in the @c fwd direction.
530  *
531  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
532  * the ACK itself goes "back" (dest->root).
533  *
534  * @param c Connection on which to send the ACK.
535  * @param buffer How much space free to advertise?
536  * @param fwd Is this FWD ACK? (Going dest -> root)
537  * @param force Don't optimize out.
538  */
539 static void
540 send_ack (struct CadetConnection *c, unsigned int buffer, int fwd, int force)
541 {
542   struct CadetFlowControl *next_fc;
543   struct CadetFlowControl *prev_fc;
544   struct GNUNET_CADET_ACK msg;
545   uint32_t ack;
546   int delta;
547
548   /* If origin, there is no connection to send ACKs. Wrong function! */
549   GCC_check_connections ();
550   if (GCC_is_origin (c, fwd))
551   {
552     LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s is origin in %s\n",
553          GCC_2s (c), GC_f2s (fwd));
554     GNUNET_break (0);
555     return;
556   }
557
558   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
559   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
560
561   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection send %s ack on %s\n",
562        GC_f2s (fwd), GCC_2s (c));
563
564   /* Check if we need to transmit the ACK. */
565   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
566   if (3 < delta && buffer < delta && GNUNET_NO == force)
567   {
568     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
569     LOG (GNUNET_ERROR_TYPE_DEBUG,
570          "  last pid recv: %u, last ack sent: %u\n",
571          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
572     GCC_check_connections ();
573     return;
574   }
575
576   /* Ok, ACK might be necessary, what PID to ACK? */
577   ack = prev_fc->last_pid_recv + buffer;
578   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
579   LOG (GNUNET_ERROR_TYPE_DEBUG,
580        " last pid %u, last ack %u, qmax %u, q %u\n",
581        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
582        next_fc->queue_max, next_fc->queue_n);
583   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
584   {
585     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
586     GCC_check_connections ();
587     return;
588   }
589
590   /* Check if message is already in queue */
591   if (NULL != prev_fc->ack_msg)
592   {
593     if (GC_is_pid_bigger (ack, prev_fc->last_ack_sent))
594     {
595       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
596       GCC_cancel (prev_fc->ack_msg);
597       /* GCC_cancel triggers ack_sent(), which clears fc->ack_msg */
598     }
599     else
600     {
601       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
602       GCC_check_connections ();
603       return;
604     }
605   }
606
607   prev_fc->last_ack_sent = ack;
608
609   /* Build ACK message and send on conn */
610   msg.header.size = htons (sizeof (msg));
611   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_ACK);
612   msg.ack = htonl (ack);
613   msg.cid = c->id;
614
615   prev_fc->ack_msg = GCC_send_prebuilt_message (&msg.header, UINT16_MAX, ack,
616                                                 c, !fwd, GNUNET_YES,
617                                                 &ack_sent, prev_fc);
618   GNUNET_assert (NULL != prev_fc->ack_msg);
619   GCC_check_connections ();
620 }
621
622
623 /**
624  * Callback called when a connection queued message is sent.
625  *
626  * Calculates the average time and connection packet tracking.
627  *
628  * @param cls Closure (ConnectionQueue Handle).
629  * @param c Connection this message was on.
630  * @param sent Was it really sent? (Could have been canceled)
631  * @param type Type of message sent.
632  * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
633  * @param fwd Was this a FWD going message?
634  * @param size Size of the message.
635  * @param wait Time spent waiting for core (only the time for THIS message)
636  * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise.
637  */
638 static int
639 conn_message_sent (void *cls,
640                    struct CadetConnection *c, int sent,
641                    uint16_t type, uint32_t pid, int fwd, size_t size,
642                    struct GNUNET_TIME_Relative wait)
643 {
644   struct CadetConnectionPerformance *p;
645   struct CadetFlowControl *fc;
646   struct CadetConnectionQueue *q = cls;
647   double usecsperbyte;
648   int forced;
649
650   GCC_check_connections ();
651   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
652
653   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
654
655   fc = fwd ? &c->fwd_fc : &c->bck_fc;
656   LOG (GNUNET_ERROR_TYPE_DEBUG, " %ssent %s %s pid %u\n",
657        sent ? "" : "not ", GC_f2s (fwd), GC_m2s (type), pid);
658   if (NULL != q)
659   {
660     forced = q->forced;
661     if (NULL != q->cont)
662     {
663       LOG (GNUNET_ERROR_TYPE_DEBUG, " calling cont\n");
664       q->cont (q->cont_cls, c, q, type, fwd, size);
665     }
666     GNUNET_free (q);
667   }
668   else if (type == GNUNET_MESSAGE_TYPE_CADET_AX)
669   {
670     /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
671     forced = GNUNET_YES;
672   }
673   else
674   {
675     forced = GNUNET_NO;
676   }
677   if (NULL == c)
678   {
679     if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
680         && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
681     {
682       LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
683            GC_m2s (type));
684     }
685     GCC_check_connections ();
686     return GNUNET_NO;
687   }
688   LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P- %p %u\n", c, c->pending_messages);
689   c->pending_messages--;
690   if ( (GNUNET_YES == c->destroy) &&
691        (0 == c->pending_messages) )
692   {
693     LOG (GNUNET_ERROR_TYPE_DEBUG,
694          "!  destroying connection!\n");
695     GCC_destroy (c);
696     GCC_check_connections ();
697     return GNUNET_YES;
698   }
699   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
700   switch (type)
701   {
702     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
703     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
704       c->maintenance_q = NULL;
705       /* Don't trigger a keepalive for sent ACKs, only SYN and SYNACKs */
706       if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE == type || !fwd)
707         schedule_next_keepalive (c, fwd);
708       break;
709
710     case GNUNET_MESSAGE_TYPE_CADET_AX:
711       if (GNUNET_YES == sent)
712       {
713         GNUNET_assert (NULL != q);
714         fc->last_pid_sent = pid;
715         if (GC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
716           GCC_start_poll (c, fwd);
717         GCC_send_ack (c, fwd, GNUNET_NO);
718         connection_reset_timeout (c, fwd);
719       }
720
721       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
722       if (GNUNET_NO == forced)
723       {
724         fc->queue_n--;
725         LOG (GNUNET_ERROR_TYPE_DEBUG,
726             "!   accounting pid %u\n",
727             fc->last_pid_sent);
728       }
729       else
730       {
731         LOG (GNUNET_ERROR_TYPE_DEBUG,
732              "!   forced, Q_N not accounting pid %u\n",
733              fc->last_pid_sent);
734       }
735       break;
736
737     case GNUNET_MESSAGE_TYPE_CADET_KX:
738       if (GNUNET_YES == sent)
739         connection_reset_timeout (c, fwd);
740       break;
741
742     case GNUNET_MESSAGE_TYPE_CADET_POLL:
743       fc->poll_msg = NULL;
744       break;
745
746     case GNUNET_MESSAGE_TYPE_CADET_ACK:
747       fc->ack_msg = NULL;
748       break;
749
750     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
751     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
752       break;
753
754     default:
755       LOG (GNUNET_ERROR_TYPE_ERROR, "%s unknown\n", GC_m2s (type));
756       GNUNET_break (0);
757       break;
758   }
759   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
760
761   if (NULL == c->perf)
762     return GNUNET_NO; /* Only endpoints are interested in timing. */
763
764   p = c->perf;
765   usecsperbyte = ((double) wait.rel_value_us) / size;
766   if (p->size == AVG_MSGS)
767   {
768     /* Array is full. Substract oldest value, add new one and store. */
769     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
770     p->usecsperbyte[p->idx] = usecsperbyte;
771     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
772   }
773   else
774   {
775     /* Array not yet full. Add current value to avg and store. */
776     p->usecsperbyte[p->idx] = usecsperbyte;
777     p->avg *= p->size;
778     p->avg += p->usecsperbyte[p->idx];
779     p->size++;
780     p->avg /= p->size;
781   }
782   p->idx = (p->idx + 1) % AVG_MSGS;
783   GCC_check_connections ();
784   return GNUNET_NO;
785 }
786
787
788 /**
789  * Get the previous hop in a connection
790  *
791  * @param c Connection.
792  *
793  * @return Previous peer in the connection.
794  */
795 static struct CadetPeer *
796 get_prev_hop (const struct CadetConnection *c)
797 {
798   GNUNET_PEER_Id id;
799
800   if (NULL == c->path)
801     return NULL;
802   LOG (GNUNET_ERROR_TYPE_DEBUG,
803        " get prev hop %s [%u/%u]\n",
804        GCC_2s (c), c->own_pos, c->path->length);
805   if (0 == c->own_pos || c->path->length < 2)
806     id = c->path->peers[0];
807   else
808     id = c->path->peers[c->own_pos - 1];
809
810   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
811        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
812
813   return GCP_get_short (id, GNUNET_YES);
814 }
815
816
817 /**
818  * Get the next hop in a connection
819  *
820  * @param c Connection.
821  *
822  * @return Next peer in the connection.
823  */
824 static struct CadetPeer *
825 get_next_hop (const struct CadetConnection *c)
826 {
827   GNUNET_PEER_Id id;
828
829   if (NULL == c->path)
830     return NULL;
831
832   LOG (GNUNET_ERROR_TYPE_DEBUG, " get next hop %s [%u/%u]\n",
833        GCC_2s (c), c->own_pos, c->path->length);
834   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
835     id = c->path->peers[c->path->length - 1];
836   else
837     id = c->path->peers[c->own_pos + 1];
838
839   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
840        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
841
842   return GCP_get_short (id, GNUNET_YES);
843 }
844
845
846 /**
847  * Check that the direct neighbours (previous and next hop)
848  * are properly associated with this connection.
849  *
850  * @param c connection to check
851  */
852 static void
853 check_neighbours (const struct CadetConnection *c)
854 {
855   if (NULL == c->path)
856     return; /* nothing to check */
857   GCP_check_connection (get_next_hop (c), c);
858   GCP_check_connection (get_prev_hop (c), c);
859 }
860
861
862 /**
863  * Helper for #GCC_check_connections().  Calls #check_neighbours().
864  *
865  * @param cls NULL
866  * @param key ignored
867  * @param value the `struct CadetConnection` to check
868  * @return #GNUNET_OK (continue to iterate)
869  */
870 static int
871 check_connection (void *cls,
872                   const struct GNUNET_HashCode *key,
873                   void *value)
874 {
875   struct CadetConnection *c = value;
876
877   check_neighbours (c);
878   return GNUNET_OK;
879 }
880
881
882 /**
883  * Check invariants for all connections using #check_neighbours().
884  */
885 void
886 GCC_check_connections ()
887 {
888   if (0 == CHECK_INVARIANTS)
889     return;
890   if (NULL == connections)
891     return;
892   GNUNET_CONTAINER_multihashmap_iterate (connections,
893                                          &check_connection,
894                                          NULL);
895 }
896
897
898 /**
899  * Get the hop in a connection.
900  *
901  * @param c Connection.
902  * @param fwd Next in the FWD direction?
903  *
904  * @return Next peer in the connection.
905  */
906 static struct CadetPeer *
907 get_hop (struct CadetConnection *c, int fwd)
908 {
909   return (fwd) ? get_next_hop (c) : get_prev_hop (c);
910 }
911
912
913 /**
914  * Get a bit mask for a message received out-of-order.
915  *
916  * @param last_pid_recv Last PID we received prior to the out-of-order.
917  * @param ooo_pid PID of the out-of-order message.
918  */
919 static uint32_t
920 get_recv_bitmask (uint32_t last_pid_recv, uint32_t ooo_pid)
921 {
922   return 1 << (last_pid_recv - ooo_pid);
923 }
924
925
926 /**
927  * Check is an out-of-order message is ok:
928  * - at most 31 messages behind.
929  * - not duplicate.
930  *
931  * @param last_pid_recv Last in-order PID received.
932  */
933 static int
934 is_ooo_ok (uint32_t last_pid_recv, uint32_t ooo_pid, uint32_t ooo_bitmap)
935 {
936   uint32_t mask;
937
938   if (GC_is_pid_bigger (last_pid_recv - 31, ooo_pid))
939     return GNUNET_NO;
940
941   mask = get_recv_bitmask (last_pid_recv, ooo_pid);
942   if (0 != (ooo_bitmap & mask))
943     return GNUNET_NO;
944
945   return GNUNET_YES;
946 }
947
948
949 /**
950  * Is traffic coming from this sender 'FWD' traffic?
951  *
952  * @param c Connection to check.
953  * @param sender Peer identity of neighbor.
954  *
955  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
956  *         the traffic is 'FWD'.
957  *         #GNUNET_NO for BCK.
958  *         #GNUNET_SYSERR for errors.
959  */
960 static int
961 is_fwd (const struct CadetConnection *c,
962         const struct GNUNET_PeerIdentity *sender)
963 {
964   GNUNET_PEER_Id id;
965
966   id = GNUNET_PEER_search (sender);
967   if (GCP_get_short_id (get_prev_hop (c)) == id)
968     return GNUNET_YES;
969
970   if (GCP_get_short_id (get_next_hop (c)) == id)
971     return GNUNET_NO;
972
973   GNUNET_break (0);
974   return GNUNET_SYSERR;
975 }
976
977
978 /**
979  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
980  * or a first CONNECTION_ACK directed to us.
981  *
982  * @param connection Connection to confirm.
983  * @param fwd Should we send it FWD? (root->dest)
984  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
985  */
986 static void
987 send_connection_ack (struct CadetConnection *connection, int fwd)
988 {
989   struct CadetTunnel *t;
990   size_t size = sizeof (struct GNUNET_CADET_ConnectionACK);
991
992   GCC_check_connections ();
993   t = connection->t;
994   LOG (GNUNET_ERROR_TYPE_INFO,
995        "==> { C %s ACK} %19s on conn %s (%p) %s [%5u]\n",
996        GC_f2s (!fwd), "", GCC_2s (connection), connection, GC_f2s (fwd), size);
997   GCP_queue_add (get_hop (connection, fwd), NULL,
998                  GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, UINT16_MAX, 0,
999                  size, connection, fwd, &conn_message_sent, NULL);
1000   connection->pending_messages++;
1001   if (CADET_TUNNEL_NEW == GCT_get_cstate (t))
1002     GCT_change_cstate (t, CADET_TUNNEL_WAITING);
1003   if (CADET_CONNECTION_READY != connection->state)
1004     connection_change_state (connection, CADET_CONNECTION_SENT);
1005   GCC_check_connections ();
1006 }
1007
1008
1009 /**
1010  * Send a notification that a connection is broken.
1011  *
1012  * @param c Connection that is broken.
1013  * @param id1 Peer that has disconnected.
1014  * @param id2 Peer that has disconnected.
1015  * @param fwd Direction towards which to send it.
1016  */
1017 static void
1018 send_broken (struct CadetConnection *c,
1019              const struct GNUNET_PeerIdentity *id1,
1020              const struct GNUNET_PeerIdentity *id2,
1021              int fwd)
1022 {
1023   struct GNUNET_CADET_ConnectionBroken msg;
1024
1025   GCC_check_connections ();
1026   msg.header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
1027   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
1028   msg.cid = c->id;
1029   msg.peer1 = *id1;
1030   msg.peer2 = *id2;
1031   GNUNET_assert (NULL ==
1032                  GCC_send_prebuilt_message (&msg.header, UINT16_MAX, 0, c, fwd,
1033                                             GNUNET_YES, NULL, NULL));
1034   GCC_check_connections ();
1035 }
1036
1037
1038 /**
1039  * Send a notification that a connection is broken, when a connection
1040  * isn't even known to the local peer or soon to be destroyed.
1041  *
1042  * @param connection_id Connection ID.
1043  * @param id1 Peer that has disconnected, probably local peer.
1044  * @param id2 Peer that has disconnected can be NULL if unknown.
1045  * @param peer Peer to notify (neighbor who sent the connection).
1046  */
1047 static void
1048 send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id,
1049                      const struct GNUNET_PeerIdentity *id1,
1050                      const struct GNUNET_PeerIdentity *id2,
1051                      const struct GNUNET_PeerIdentity *peer_id)
1052 {
1053   struct GNUNET_CADET_ConnectionBroken *msg;
1054   struct CadetPeerQueue *q;
1055   struct CadetPeer *neighbor;
1056
1057   GCC_check_connections ();
1058   LOG (GNUNET_ERROR_TYPE_INFO, "--> BROKEN on unknown connection %s\n",
1059        GNUNET_h2s (GC_h2hc (connection_id)));
1060
1061   msg = GNUNET_new (struct GNUNET_CADET_ConnectionBroken);
1062   msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
1063   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
1064   msg->cid = *connection_id;
1065   msg->peer1 = *id1;
1066   if (NULL != id2)
1067     msg->peer2 = *id2;
1068   else
1069     memset (&msg->peer2, 0, sizeof (msg->peer2));
1070   neighbor = GCP_get (peer_id, GNUNET_NO); /* We MUST know neighbor. */
1071   GNUNET_assert (NULL != neighbor);
1072   q = GCP_queue_add (neighbor, msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1073                      UINT16_MAX, 2,
1074                      sizeof (struct GNUNET_CADET_ConnectionBroken),
1075                      NULL, GNUNET_SYSERR, /* connection, fwd */
1076                      NULL, NULL); /* continuation */
1077   GNUNET_assert (NULL != q);
1078   GCC_check_connections ();
1079 }
1080
1081
1082 /**
1083  * Send keepalive packets for a connection.
1084  *
1085  * @param c Connection to keep alive..
1086  * @param fwd Is this a FWD keepalive? (owner -> dest).
1087  */
1088 static void
1089 send_connection_keepalive (struct CadetConnection *c, int fwd)
1090 {
1091   struct GNUNET_MessageHeader msg;
1092   struct CadetFlowControl *fc;
1093
1094   GCC_check_connections ();
1095   LOG (GNUNET_ERROR_TYPE_INFO,
1096        "keepalive %s for connection %s\n",
1097        GC_f2s (fwd), GCC_2s (c));
1098
1099   GNUNET_assert (NULL != c->t);
1100   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1101   if (0 < fc->queue_n || GNUNET_YES == GCT_has_queued_traffic (c->t))
1102   {
1103     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, traffic in queue\n");
1104     return;
1105   }
1106
1107   GNUNET_STATISTICS_update (stats, "# keepalives sent", 1, GNUNET_NO);
1108
1109   GNUNET_assert (NULL != c->t);
1110   msg.size = htons (sizeof (msg));
1111   msg.type = htons (GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE);
1112
1113   GNUNET_assert (NULL ==
1114                  GCT_send_prebuilt_message (&msg, c->t, c,
1115                                             GNUNET_NO, NULL, NULL));
1116   GCC_check_connections ();
1117 }
1118
1119
1120 /**
1121  * Send CONNECTION_{CREATE/ACK} packets for a connection.
1122  *
1123  * @param c Connection for which to send the message.
1124  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
1125  */
1126 static void
1127 connection_recreate (struct CadetConnection *c, int fwd)
1128 {
1129   LOG (GNUNET_ERROR_TYPE_DEBUG,
1130        "sending connection recreate\n");
1131   if (fwd)
1132     GCC_send_create (c);
1133   else
1134     send_connection_ack (c, GNUNET_NO);
1135 }
1136
1137
1138 /**
1139  * Generic connection timer management.
1140  * Depending on the role of the peer in the connection will send the
1141  * appropriate message (build or keepalive)
1142  *
1143  * @param c Conncetion to maintain.
1144  * @param fwd Is FWD?
1145  */
1146 static void
1147 connection_maintain (struct CadetConnection *c, int fwd)
1148 {
1149   if (GNUNET_NO != c->destroy)
1150   {
1151     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, being destroyed\n");
1152     return;
1153   }
1154
1155   if (NULL == c->t)
1156   {
1157     GNUNET_break (0);
1158     GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
1159     return;
1160   }
1161
1162   if (CADET_TUNNEL_SEARCHING == GCT_get_cstate (c->t))
1163   {
1164     /* If status is SEARCHING, why is there a connection? Should be WAITING */
1165     GNUNET_break (0);
1166     GCT_debug (c->t, GNUNET_ERROR_TYPE_ERROR);
1167     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, tunnel SEARCHING\n");
1168     schedule_next_keepalive (c, fwd);
1169     return;
1170   }
1171   switch (c->state)
1172   {
1173     case CADET_CONNECTION_NEW:
1174       GNUNET_break (0);
1175       /* fall-through */
1176     case CADET_CONNECTION_SENT:
1177       connection_recreate (c, fwd);
1178       break;
1179     case CADET_CONNECTION_READY:
1180       send_connection_keepalive (c, fwd);
1181       break;
1182     default:
1183       break;
1184   }
1185 }
1186
1187
1188 /**
1189  * Keep the connection alive.
1190  *
1191  * @param c Connection to keep alive.
1192  * @param fwd Direction.
1193  */
1194 static void
1195 connection_keepalive (struct CadetConnection *c,
1196                       int fwd)
1197 {
1198   GCC_check_connections ();
1199   LOG (GNUNET_ERROR_TYPE_DEBUG,
1200        "%s keepalive for %s\n",
1201        GC_f2s (fwd), GCC_2s (c));
1202
1203   if (fwd)
1204     c->fwd_maintenance_task = NULL;
1205   else
1206     c->bck_maintenance_task = NULL;
1207   connection_maintain (c, fwd);
1208   GCC_check_connections ();
1209   /* Next execution will be scheduled by message_sent or _maintain*/
1210 }
1211
1212
1213 /**
1214  * Keep the connection alive in the FWD direction.
1215  *
1216  * @param cls Closure (connection to keepalive).
1217  */
1218 static void
1219 connection_fwd_keepalive (void *cls)
1220 {
1221   struct CadetConnection *c = cls;
1222
1223   GCC_check_connections ();
1224   connection_keepalive (c,
1225                         GNUNET_YES);
1226   GCC_check_connections ();
1227 }
1228
1229
1230 /**
1231  * Keep the connection alive in the BCK direction.
1232  *
1233  * @param cls Closure (connection to keepalive).
1234  */
1235 static void
1236 connection_bck_keepalive (void *cls)
1237 {
1238   struct CadetConnection *c = cls;
1239
1240   GCC_check_connections ();
1241   connection_keepalive (c,
1242                         GNUNET_NO);
1243   GCC_check_connections ();
1244 }
1245
1246
1247 /**
1248  * Schedule next keepalive task, taking in consideration
1249  * the connection state and number of retries.
1250  *
1251  * If the peer is not the origin, do nothing.
1252  *
1253  * @param c Connection for which to schedule the next keepalive.
1254  * @param fwd Direction for the next keepalive.
1255  */
1256 static void
1257 schedule_next_keepalive (struct CadetConnection *c, int fwd)
1258 {
1259   struct GNUNET_TIME_Relative delay;
1260   struct GNUNET_SCHEDULER_Task * *task_id;
1261   GNUNET_SCHEDULER_TaskCallback keepalive_task;
1262
1263   GCC_check_connections ();
1264   if (GNUNET_NO == GCC_is_origin (c, fwd))
1265     return;
1266
1267   /* Calculate delay to use, depending on the state of the connection */
1268   if (CADET_CONNECTION_READY == c->state)
1269   {
1270     delay = refresh_connection_time;
1271   }
1272   else
1273   {
1274     if (1 > c->create_retry)
1275       c->create_retry = 1;
1276     delay = GNUNET_TIME_relative_multiply (create_connection_time,
1277                                            c->create_retry);
1278     if (c->create_retry < 64)
1279       c->create_retry *= 2;
1280   }
1281
1282   /* Select direction-dependent parameters */
1283   if (GNUNET_YES == fwd)
1284   {
1285     task_id = &c->fwd_maintenance_task;
1286     keepalive_task = &connection_fwd_keepalive;
1287   }
1288   else
1289   {
1290     task_id = &c->bck_maintenance_task;
1291     keepalive_task = &connection_bck_keepalive;
1292   }
1293
1294   /* Check that no one scheduled it before us */
1295   if (NULL != *task_id)
1296   {
1297     /* No need for a _break. It can happen for instance when sending a SYNACK
1298      * for a duplicate SYN: the first SYNACK scheduled the task. */
1299     GNUNET_SCHEDULER_cancel (*task_id);
1300   }
1301
1302   /* Schedule the task */
1303   *task_id = GNUNET_SCHEDULER_add_delayed (delay,
1304                                            keepalive_task,
1305                                            c);
1306   LOG (GNUNET_ERROR_TYPE_DEBUG,
1307        "next keepalive in %s\n",
1308        GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1309   GCC_check_connections ();
1310 }
1311
1312
1313 /**
1314  * @brief Re-initiate traffic on this connection if necessary.
1315  *
1316  * Check if there is traffic queued towards this peer
1317  * and the core transmit handle is NULL (traffic was stalled).
1318  * If so, call core tmt rdy.
1319  *
1320  * @param c Connection on which initiate traffic.
1321  * @param fwd Is this about fwd traffic?
1322  */
1323 static void
1324 connection_unlock_queue (struct CadetConnection *c, int fwd)
1325 {
1326   struct CadetPeer *peer;
1327
1328   GCC_check_connections ();
1329   LOG (GNUNET_ERROR_TYPE_DEBUG,
1330        "connection_unlock_queue %s on %s\n",
1331        GC_f2s (fwd), GCC_2s (c));
1332
1333   if (GCC_is_terminal (c, fwd))
1334   {
1335     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal, can unlock!\n");
1336     return;
1337   }
1338
1339   peer = get_hop (c, fwd);
1340   GCP_queue_unlock (peer, c);
1341   GCC_check_connections ();
1342 }
1343
1344
1345 /**
1346  * Cancel all transmissions that belong to a certain connection.
1347  *
1348  * If the connection is scheduled for destruction and no more messages are left,
1349  * the connection will be destroyed by the continuation call.
1350  *
1351  * @param c Connection which to cancel. Might be destroyed during this call.
1352  * @param fwd Cancel fwd traffic?
1353  */
1354 static void
1355 connection_cancel_queues (struct CadetConnection *c,
1356                           int fwd)
1357 {
1358   struct CadetFlowControl *fc;
1359   struct CadetPeer *peer;
1360
1361   GCC_check_connections ();
1362   LOG (GNUNET_ERROR_TYPE_DEBUG,
1363        "Cancel %s queues for connection %s\n",
1364        GC_f2s (fwd), GCC_2s (c));
1365   if (NULL == c)
1366   {
1367     GNUNET_break (0);
1368     return;
1369   }
1370
1371   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1372   if (NULL != fc->poll_task)
1373   {
1374     GNUNET_SCHEDULER_cancel (fc->poll_task);
1375     fc->poll_task = NULL;
1376     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cancelled POLL task for fc %p\n", fc);
1377   }
1378   if (NULL != fc->poll_msg)
1379   {
1380     GCC_cancel (fc->poll_msg);
1381     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cancelled POLL msg for fc %p\n", fc);
1382   }
1383   peer = get_hop (c, fwd);
1384   GCP_queue_cancel (peer, c);
1385   GCC_check_connections ();
1386 }
1387
1388
1389 /**
1390  * Function called if a connection has been stalled for a while,
1391  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1392  *
1393  * @param cls Closure (poll ctx).
1394  */
1395 static void
1396 connection_poll (void *cls);
1397
1398
1399 /**
1400  * Callback called when a queued POLL message is sent.
1401  *
1402  * @param cls Closure (flow control context).
1403  * @param c Connection this message was on.
1404  * @param q Queue handler this call invalidates.
1405  * @param type Type of message sent.
1406  * @param fwd Was this a FWD going message?
1407  * @param size Size of the message.
1408  */
1409 static void
1410 poll_sent (void *cls,
1411            struct CadetConnection *c,
1412            struct CadetConnectionQueue *q,
1413            uint16_t type, int fwd, size_t size)
1414 {
1415   struct CadetFlowControl *fc = cls;
1416
1417   GNUNET_assert (fc->poll_msg == q);
1418   fc->poll_msg = NULL;
1419   if (2 == c->destroy)
1420   {
1421     LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL canceled on shutdown\n");
1422     return;
1423   }
1424   if (0 == fc->queue_max)
1425   {
1426     LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL cancelled: neighbor disconnected\n");
1427     return;
1428   }
1429   LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL sent for %s, scheduling new one!\n",
1430        GCC_2s (c));
1431   GNUNET_assert (NULL == fc->poll_task);
1432   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
1433   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1434                                                 &connection_poll,
1435                                                 fc);
1436   LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
1437 }
1438
1439
1440 /**
1441  * Function called if a connection has been stalled for a while,
1442  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1443  *
1444  * @param cls Closure (poll ctx).
1445  */
1446 static void
1447 connection_poll (void *cls)
1448 {
1449   struct CadetFlowControl *fc = cls;
1450   struct GNUNET_CADET_Poll msg;
1451   struct CadetConnection *c;
1452   int fwd;
1453
1454   fc->poll_task = NULL;
1455   GCC_check_connections ();
1456   c = fc->c;
1457   fwd = fc == &c->fwd_fc;
1458   LOG (GNUNET_ERROR_TYPE_DEBUG, "Polling connection %s %s\n",
1459        GCC_2s (c),  GC_f2s (fwd));
1460
1461   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_POLL);
1462   msg.header.size = htons (sizeof (msg));
1463   msg.pid = htonl (fc->last_pid_sent);
1464   LOG (GNUNET_ERROR_TYPE_DEBUG, " last pid sent: %u\n", fc->last_pid_sent);
1465   fc->poll_msg =
1466       GCC_send_prebuilt_message (&msg.header, UINT16_MAX, fc->last_pid_sent, c,
1467                                  fc == &c->fwd_fc, GNUNET_YES, &poll_sent, fc);
1468   GNUNET_assert (NULL != fc->poll_msg);
1469   GCC_check_connections ();
1470 }
1471
1472
1473 /**
1474  * Resend all queued messages for a connection on other connections of the
1475  * same tunnel, if possible. The connection WILL BE DESTROYED by this function.
1476  *
1477  * @param c Connection whose messages to resend.
1478  * @param fwd Resend fwd messages?
1479  */
1480 static void
1481 resend_messages_and_destroy (struct CadetConnection *c, int fwd)
1482 {
1483   struct GNUNET_MessageHeader *out_msg;
1484   struct CadetTunnel *t = c->t;
1485   struct CadetPeer *neighbor;
1486   unsigned int pending;
1487   int destroyed;
1488
1489   GCC_check_connections ();
1490   mark_destroyed (c);
1491
1492   destroyed = GNUNET_NO;
1493   neighbor = get_hop (c, fwd);
1494   pending = c->pending_messages;
1495
1496   while (NULL != (out_msg = GCP_connection_pop (neighbor, c, &destroyed)))
1497   {
1498     if (NULL != t)
1499       GCT_resend_message (out_msg, t);
1500     GNUNET_free (out_msg);
1501   }
1502
1503   /* All pending messages should have been popped,
1504    * and the connection destroyed by the continuation.
1505    */
1506   if (GNUNET_YES != destroyed)
1507   {
1508     if (0 != pending)
1509     {
1510       GNUNET_break (0);
1511       GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
1512       if (NULL != t) GCT_debug (t, GNUNET_ERROR_TYPE_ERROR);
1513     }
1514     GCC_destroy (c);
1515   }
1516   GCC_check_connections ();
1517 }
1518
1519
1520 /**
1521  * Generic connection timeout implementation.
1522  *
1523  * Timeout function due to lack of keepalive/traffic from an endpoint.
1524  * Destroys connection if called.
1525  *
1526  * @param c Connection to destroy.
1527  * @param fwd Was the timeout from the origin? (FWD timeout)
1528  */
1529 static void
1530 connection_timeout (struct CadetConnection *c, int fwd)
1531 {
1532   struct CadetFlowControl *reverse_fc;
1533
1534   GCC_check_connections ();
1535   reverse_fc = fwd ? &c->bck_fc : &c->fwd_fc;
1536
1537   LOG (GNUNET_ERROR_TYPE_INFO,
1538        "Connection %s %s timed out. Destroying.\n",
1539        GCC_2s (c),
1540        GC_f2s (fwd));
1541   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
1542
1543   if (GCC_is_origin (c, fwd)) /* Loopback? Something is wrong! */
1544   {
1545     GNUNET_break (0);
1546     return;
1547   }
1548
1549   /* If dest, salvage queued traffic. */
1550   if (GCC_is_terminal (c, fwd))
1551   {
1552     const struct GNUNET_PeerIdentity *next_hop;
1553
1554     next_hop = GCP_get_id (fwd ? get_prev_hop (c) : get_next_hop (c));
1555     send_broken_unknown (&c->id, &my_full_id, NULL, next_hop);
1556     if (0 < reverse_fc->queue_n)
1557       resend_messages_and_destroy (c, !fwd);
1558     GCC_check_connections ();
1559     return;
1560   }
1561
1562   GCC_destroy (c);
1563   GCC_check_connections ();
1564 }
1565
1566
1567 /**
1568  * Timeout function due to lack of keepalive/traffic from the owner.
1569  * Destroys connection if called.
1570  *
1571  * @param cls Closure (connection to destroy).
1572  */
1573 static void
1574 connection_fwd_timeout (void *cls)
1575 {
1576   struct CadetConnection *c = cls;
1577
1578   c->fwd_maintenance_task = NULL;
1579   GCC_check_connections ();
1580   connection_timeout (c, GNUNET_YES);
1581   GCC_check_connections ();
1582 }
1583
1584
1585 /**
1586  * Timeout function due to lack of keepalive/traffic from the destination.
1587  * Destroys connection if called.
1588  *
1589  * @param cls Closure (connection to destroy).
1590  */
1591 static void
1592 connection_bck_timeout (void *cls)
1593 {
1594   struct CadetConnection *c = cls;
1595
1596   c->bck_maintenance_task = NULL;
1597   GCC_check_connections ();
1598   connection_timeout (c, GNUNET_NO);
1599   GCC_check_connections ();
1600 }
1601
1602
1603 /**
1604  * Resets the connection timeout task, some other message has done the
1605  * task's job.
1606  * - For the first peer on the direction this means to send
1607  *   a keepalive or a path confirmation message (either create or ACK).
1608  * - For all other peers, this means to destroy the connection,
1609  *   due to lack of activity.
1610  * Starts the timeout if no timeout was running (connection just created).
1611  *
1612  * @param c Connection whose timeout to reset.
1613  * @param fwd Is this forward?
1614  *
1615  * TODO use heap to improve efficiency of scheduler.
1616  */
1617 static void
1618 connection_reset_timeout (struct CadetConnection *c, int fwd)
1619 {
1620   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GC_f2s (fwd));
1621   if (GCC_is_origin (c, fwd)) /* Startpoint */
1622   {
1623     schedule_next_keepalive (c, fwd);
1624   }
1625   else /* Relay, endpoint. */
1626   {
1627     struct GNUNET_TIME_Relative delay;
1628     struct GNUNET_SCHEDULER_Task * *ti;
1629     GNUNET_SCHEDULER_TaskCallback f;
1630
1631     ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1632
1633     if (NULL != *ti)
1634       GNUNET_SCHEDULER_cancel (*ti);
1635     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1636     LOG (GNUNET_ERROR_TYPE_DEBUG,
1637          "  timing out in %s\n",
1638          GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_NO));
1639     f = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1640     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1641   }
1642 }
1643
1644
1645 /**
1646  * Iterator to compare each connection's path with the path of a new connection.
1647  *
1648  * If the connection coincides, the c member of path is set to the connection
1649  * and the destroy flag of the connection is set.
1650  *
1651  * @param cls Closure (new path).
1652  * @param c Connection in the tunnel to check.
1653  */
1654 static void
1655 check_path (void *cls, struct CadetConnection *c)
1656 {
1657   struct CadetConnection *new_conn = cls;
1658   struct CadetPeerPath *path = new_conn->path;
1659
1660   LOG (GNUNET_ERROR_TYPE_DEBUG, "  checking %s (%p), length %u\n",
1661        GCC_2s (c), c, c->path->length);
1662
1663   if (c != new_conn
1664       && GNUNET_NO == c->destroy
1665       && CADET_CONNECTION_BROKEN != c->state
1666       && CADET_CONNECTION_DESTROYED != c->state
1667       && path_equivalent (path, c->path))
1668   {
1669     new_conn->destroy = GNUNET_YES; /* Do not mark_destroyed, */
1670     new_conn->path->c = c;          /* this is only a flag for the Iterator. */
1671     LOG (GNUNET_ERROR_TYPE_DEBUG, "  MATCH!\n");
1672   }
1673 }
1674
1675
1676 /**
1677  * Finds out if this path is already being used by an existing connection.
1678  *
1679  * Checks the tunnel towards the destination to see if it contains
1680  * any connection with the same path.
1681  *
1682  * If the existing connection is ready, it is kept.
1683  * Otherwise if the sender has a smaller ID that ours, we accept it (and
1684  * the peer will eventually reject our attempt).
1685  *
1686  * @param path Path to check.
1687  * @return #GNUNET_YES if the tunnel has a connection with the same path,
1688  *         #GNUNET_NO otherwise.
1689  */
1690 static int
1691 does_connection_exist (struct CadetConnection *conn)
1692 {
1693   struct CadetPeer *p;
1694   struct CadetTunnel *t;
1695   struct CadetConnection *c;
1696
1697   p = GCP_get_short (conn->path->peers[0], GNUNET_NO);
1698   if (NULL == p)
1699     return GNUNET_NO;
1700   t = GCP_get_tunnel (p);
1701   if (NULL == t)
1702     return GNUNET_NO;
1703
1704   LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking for duplicates\n");
1705
1706   GCT_iterate_connections (t, &check_path, conn);
1707
1708   if (GNUNET_YES == conn->destroy)
1709   {
1710     c = conn->path->c;
1711     conn->destroy = GNUNET_NO;
1712     conn->path->c = conn;
1713     LOG (GNUNET_ERROR_TYPE_DEBUG, " found duplicate of %s\n", GCC_2s (conn));
1714     LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate: %s\n", GCC_2s (c));
1715     GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
1716     if (CADET_CONNECTION_READY == c->state)
1717     {
1718       /* The other peer confirmed a live connection with this path,
1719        * why are they trying to duplicate it? */
1720       GNUNET_STATISTICS_update (stats, "# duplicate connections", 1, GNUNET_NO);
1721       return GNUNET_YES;
1722     }
1723     LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate not valid, connection unique\n");
1724     return GNUNET_NO;
1725   }
1726   else
1727   {
1728     LOG (GNUNET_ERROR_TYPE_DEBUG, " %s has no duplicates\n", GCC_2s (conn));
1729     return GNUNET_NO;
1730   }
1731 }
1732
1733
1734 /**
1735  * @brief Check if the tunnel this connection belongs to has any other
1736  * connection with the same path, and destroy one if so.
1737  *
1738  * @param cls Closure (connection to check).
1739  */
1740 static void
1741 check_duplicates (void *cls)
1742 {
1743   struct CadetConnection *c = cls;
1744
1745   c->check_duplicates_task = NULL;
1746   if (GNUNET_YES == does_connection_exist (c))
1747   {
1748     GCT_debug (c->t, GNUNET_ERROR_TYPE_DEBUG);
1749     send_broken (c, &my_full_id, &my_full_id, GCC_is_origin (c, GNUNET_YES));
1750     GCC_destroy (c);
1751   }
1752 }
1753
1754
1755 /**
1756  * Wait for enough time to let any dead connections time out and check for
1757  * any remaining duplicates.
1758  *
1759  * @param c Connection that is a potential duplicate.
1760  */
1761 static void
1762 schedule_check_duplicates (struct CadetConnection *c)
1763 {
1764   struct GNUNET_TIME_Relative delay;
1765
1766   if (NULL != c->check_duplicates_task)
1767     return;
1768   delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 5);
1769   c->check_duplicates_task = GNUNET_SCHEDULER_add_delayed (delay,
1770                                                            &check_duplicates,
1771                                                            c);
1772 }
1773
1774
1775 /**
1776  * Add the connection to the list of both neighbors.
1777  *
1778  * @param c Connection.
1779  *
1780  * @return #GNUNET_OK if everything went fine
1781  *         #GNUNET_SYSERR if the was an error and @c c is malformed.
1782  */
1783 static int
1784 register_neighbors (struct CadetConnection *c)
1785 {
1786   c->next_peer = get_next_hop (c);
1787   c->prev_peer = get_prev_hop (c);
1788   GNUNET_assert (c->next_peer != c->prev_peer);
1789   LOG (GNUNET_ERROR_TYPE_DEBUG,
1790        "register neighbors for connection %s\n",
1791        GCC_2s (c));
1792   path_debug (c->path);
1793   LOG (GNUNET_ERROR_TYPE_DEBUG,
1794        "own pos %u\n", c->own_pos);
1795   LOG (GNUNET_ERROR_TYPE_DEBUG,
1796        "putting connection %s to next peer %p\n",
1797        GCC_2s (c),
1798        c->next_peer);
1799   LOG (GNUNET_ERROR_TYPE_DEBUG, "next peer %p %s\n",
1800        c->next_peer,
1801        GCP_2s (c->next_peer));
1802   LOG (GNUNET_ERROR_TYPE_DEBUG,
1803        "putting connection %s to prev peer %p\n",
1804        GCC_2s (c),
1805        c->prev_peer);
1806   LOG (GNUNET_ERROR_TYPE_DEBUG,
1807        "prev peer %p %s\n",
1808        c->prev_peer,
1809        GCP_2s (c->prev_peer));
1810
1811   if ( (GNUNET_NO == GCP_is_neighbor (c->next_peer)) ||
1812        (GNUNET_NO == GCP_is_neighbor (c->prev_peer)) )
1813   {
1814     if (GCC_is_origin (c, GNUNET_YES))
1815       GNUNET_STATISTICS_update (stats, "# local bad paths", 1, GNUNET_NO);
1816     GNUNET_STATISTICS_update (stats, "# bad paths", 1, GNUNET_NO);
1817
1818     LOG (GNUNET_ERROR_TYPE_DEBUG,
1819          "  register neighbors failed\n");
1820     LOG (GNUNET_ERROR_TYPE_DEBUG,
1821          "  prev: %s, neighbor?: %d\n",
1822          GCP_2s (c->prev_peer),
1823          GCP_is_neighbor (c->prev_peer));
1824     LOG (GNUNET_ERROR_TYPE_DEBUG,
1825          "  next: %s, neighbor?: %d\n",
1826          GCP_2s (c->next_peer),
1827          GCP_is_neighbor (c->next_peer));
1828     return GNUNET_SYSERR;
1829   }
1830   GCP_add_connection (c->next_peer, c, GNUNET_NO);
1831   GCP_add_connection (c->prev_peer, c, GNUNET_YES);
1832
1833   return GNUNET_OK;
1834 }
1835
1836
1837 /**
1838  * Remove the connection from the list of both neighbors.
1839  *
1840  * @param c Connection.
1841  */
1842 static void
1843 unregister_neighbors (struct CadetConnection *c)
1844 {
1845 //  struct CadetPeer *peer; FIXME dont use next_peer, prev_peer
1846   /* Either already unregistered or never got registered, it's ok either way. */
1847   if (NULL == c->path)
1848     return;
1849   if (NULL != c->next_peer)
1850   {
1851     GCP_remove_connection (c->next_peer, c);
1852     c->next_peer = NULL;
1853   }
1854   if (NULL != c->prev_peer)
1855   {
1856     GCP_remove_connection (c->prev_peer, c);
1857     c->prev_peer = NULL;
1858   }
1859 }
1860
1861
1862 /**
1863  * Invalidates all paths towards all peers that comprise the connection which
1864  * rely on the disconnected peer.
1865  *
1866  * ~O(n^3) (peers in connection * paths/peer * links/path)
1867  *
1868  * @param c Connection whose peers' paths to clean.
1869  * @param disconnected Peer that disconnected.
1870  */
1871 static void
1872 invalidate_paths (struct CadetConnection *c,
1873                   struct CadetPeer *disconnected)
1874 {
1875   struct CadetPeer *peer;
1876   unsigned int i;
1877
1878   for (i = 0; i < c->path->length; i++)
1879   {
1880     peer = GCP_get_short (c->path->peers[i], GNUNET_NO);
1881     if (NULL != peer)
1882       GCP_notify_broken_link (peer, &my_full_id, GCP_get_id (disconnected));
1883   }
1884 }
1885
1886
1887 /**
1888  * Bind the connection to the peer and the tunnel to that peer.
1889  *
1890  * If the peer has no tunnel, create one. Update tunnel and connection
1891  * data structres to reflect new status.
1892  *
1893  * @param c Connection.
1894  * @param peer Peer.
1895  */
1896 static void
1897 add_to_peer (struct CadetConnection *c,
1898              struct CadetPeer *peer)
1899 {
1900   GCP_add_tunnel (peer);
1901   c->t = GCP_get_tunnel (peer);
1902   GCT_add_connection (c->t, c);
1903 }
1904
1905
1906 /**
1907  * Log receipt of message on stderr (INFO level).
1908  *
1909  * @param message Message received.
1910  * @param peer Peer who sent the message.
1911  * @param hash Connection ID.
1912  */
1913 static void
1914 log_message (const struct GNUNET_MessageHeader *message,
1915              const struct GNUNET_PeerIdentity *peer,
1916              const struct GNUNET_CADET_Hash *hash)
1917 {
1918   uint16_t size;
1919   uint16_t type;
1920   char *arrow;
1921
1922   size = ntohs (message->size);
1923   type = ntohs (message->type);
1924   switch (type)
1925   {
1926     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
1927     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
1928     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
1929     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
1930       arrow = "==";
1931       break;
1932     default:
1933       arrow = "--";
1934   }
1935   LOG (GNUNET_ERROR_TYPE_INFO, "<%s %s on conn %s from %s, %6u bytes\n",
1936        arrow, GC_m2s (type), GNUNET_h2s (GC_h2hc (hash)),
1937        GNUNET_i2s (peer), (unsigned int) size);
1938 }
1939
1940 /******************************************************************************/
1941 /********************************    API    ***********************************/
1942 /******************************************************************************/
1943
1944 /**
1945  * Core handler for connection creation.
1946  *
1947  * @param cls Closure (unused).
1948  * @param peer Sender (neighbor).
1949  * @param message Message.
1950  * @return #GNUNET_OK to keep the connection open,
1951  *         #GNUNET_SYSERR to close it (signal serious error)
1952  */
1953 int
1954 GCC_handle_create (void *cls,
1955                    const struct GNUNET_PeerIdentity *peer,
1956                    const struct GNUNET_MessageHeader *message)
1957 {
1958   struct GNUNET_CADET_ConnectionCreate *msg;
1959   struct GNUNET_PeerIdentity *id;
1960   struct GNUNET_CADET_Hash *cid;
1961   struct CadetPeerPath *path;
1962   struct CadetPeer *dest_peer;
1963   struct CadetPeer *orig_peer;
1964   struct CadetConnection *c;
1965   unsigned int own_pos;
1966   uint16_t size;
1967
1968   GCC_check_connections ();
1969   /* Check size */
1970   size = ntohs (message->size);
1971   if (size < sizeof (struct GNUNET_CADET_ConnectionCreate))
1972   {
1973     GNUNET_break_op (0);
1974     return GNUNET_OK;
1975   }
1976
1977   /* Calculate hops */
1978   size -= sizeof (struct GNUNET_CADET_ConnectionCreate);
1979   if (size % sizeof (struct GNUNET_PeerIdentity))
1980   {
1981     GNUNET_break_op (0);
1982     return GNUNET_OK;
1983   }
1984   if (0 != size % sizeof (struct GNUNET_PeerIdentity))
1985   {
1986     GNUNET_break_op (0);
1987     return GNUNET_OK;
1988   }
1989   size /= sizeof (struct GNUNET_PeerIdentity);
1990   if (1 > size)
1991   {
1992     GNUNET_break_op (0);
1993     return GNUNET_OK;
1994   }
1995   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1996
1997   /* Get parameters */
1998   msg = (struct GNUNET_CADET_ConnectionCreate *) message;
1999   cid = &msg->cid;
2000   log_message (message, peer, cid);
2001   id = (struct GNUNET_PeerIdentity *) &msg[1];
2002   LOG (GNUNET_ERROR_TYPE_DEBUG, "    origin: %s\n", GNUNET_i2s (id));
2003
2004   /* Create connection */
2005   c = connection_get (cid);
2006   if (NULL == c)
2007   {
2008     path = path_build_from_peer_ids ((struct GNUNET_PeerIdentity *) &msg[1],
2009                                      size, myid, &own_pos);
2010     if (NULL == path)
2011     {
2012       /* Path was malformed, probably our own ID was not in it. */
2013       GNUNET_STATISTICS_update (stats, "# malformed paths", 1, GNUNET_NO);
2014       GNUNET_break_op (0);
2015       return GNUNET_OK;
2016     }
2017
2018     if (0 == own_pos)
2019     {
2020       /* We received this request from a neighbor, we cannot be origin */
2021       GNUNET_STATISTICS_update (stats, "# fake paths", 1, GNUNET_NO);
2022       GNUNET_break_op (0);
2023       path_destroy (path);
2024       return GNUNET_OK;
2025     }
2026
2027     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
2028     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
2029     c = GCC_new (cid, NULL, path, own_pos);
2030     if (NULL == c)
2031     {
2032       if (path->length - 1 == own_pos)
2033       {
2034         /* If we are destination, why did the creation fail? */
2035         GNUNET_break (0);
2036         path_destroy (path);
2037         GCC_check_connections ();
2038         return GNUNET_OK;
2039       }
2040       send_broken_unknown (cid, &my_full_id,
2041                            GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
2042                            peer);
2043       path_destroy (path);
2044       GCC_check_connections ();
2045       return GNUNET_OK;
2046     }
2047     GCP_add_path_to_all (path, GNUNET_NO);
2048     connection_reset_timeout (c, GNUNET_YES);
2049   }
2050   else
2051   {
2052     path = path_duplicate (c->path);
2053   }
2054   if (CADET_CONNECTION_NEW == c->state)
2055     connection_change_state (c, CADET_CONNECTION_SENT);
2056
2057   /* Remember peers */
2058   dest_peer = GCP_get (&id[size - 1], GNUNET_YES);
2059   orig_peer = GCP_get (&id[0], GNUNET_YES);
2060
2061   /* Is it a connection to us? */
2062   if (c->own_pos == path->length - 1)
2063   {
2064     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
2065     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
2066
2067     add_to_peer (c, orig_peer);
2068     if (GNUNET_YES == does_connection_exist (c))
2069     {
2070       /* Peer created a connection equal to one we think exists
2071        * and is fine.
2072        * Solution: Keep both and postpone disambiguation. In the meantime
2073        * the connection will time out or peer will inform us it is broken.
2074        *
2075        * Other options:
2076        * - Use explicit duplicate.
2077        * - Accept new conn and destroy the old. (interruption in higher level)
2078        * - Keep the one with higher ID / created by peer with higher ID. */
2079        schedule_check_duplicates (c);
2080     }
2081
2082     if (CADET_TUNNEL_NEW == GCT_get_cstate (c->t))
2083       GCT_change_cstate (c->t,  CADET_TUNNEL_WAITING);
2084
2085     send_connection_ack (c, GNUNET_NO);
2086     if (CADET_CONNECTION_SENT == c->state)
2087       connection_change_state (c, CADET_CONNECTION_ACK);
2088   }
2089   else
2090   {
2091     /* It's for somebody else! Retransmit. */
2092     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
2093     GCP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
2094     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
2095     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c,
2096                                                       GNUNET_YES, GNUNET_YES,
2097                                                       NULL, NULL));
2098   }
2099   path_destroy (path);
2100   GCC_check_connections ();
2101   return GNUNET_OK;
2102 }
2103
2104
2105 /**
2106  * Core handler for path confirmations.
2107  *
2108  * @param cls closure
2109  * @param message message
2110  * @param peer peer identity this notification is about
2111  * @return #GNUNET_OK to keep the connection open,
2112  *         #GNUNET_SYSERR to close it (signal serious error)
2113  */
2114 int
2115 GCC_handle_confirm (void *cls,
2116                     const struct GNUNET_PeerIdentity *peer,
2117                     const struct GNUNET_MessageHeader *message)
2118 {
2119   struct GNUNET_CADET_ConnectionACK *msg;
2120   struct CadetConnection *c;
2121   struct CadetPeerPath *p;
2122   struct CadetPeer *pi;
2123   enum CadetConnectionState oldstate;
2124   int fwd;
2125
2126   GCC_check_connections ();
2127   msg = (struct GNUNET_CADET_ConnectionACK *) message;
2128   log_message (message, peer, &msg->cid);
2129   c = connection_get (&msg->cid);
2130   if (NULL == c)
2131   {
2132     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
2133                               1, GNUNET_NO);
2134     LOG (GNUNET_ERROR_TYPE_DEBUG,
2135          "  don't know the connection!\n");
2136     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2137     GCC_check_connections ();
2138     return GNUNET_OK;
2139   }
2140
2141   if (GNUNET_NO != c->destroy)
2142   {
2143     GNUNET_assert (CADET_CONNECTION_DESTROYED == c->state);
2144     LOG (GNUNET_ERROR_TYPE_DEBUG,
2145          "connection %s being destroyed, ignoring confirm\n",
2146          GCC_2s (c));
2147     GCC_check_connections ();
2148     return GNUNET_OK;
2149   }
2150
2151   oldstate = c->state;
2152   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
2153   pi = GCP_get (peer, GNUNET_YES);
2154   if (get_next_hop (c) == pi)
2155   {
2156     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
2157     fwd = GNUNET_NO;
2158     if (CADET_CONNECTION_SENT == oldstate)
2159       connection_change_state (c, CADET_CONNECTION_ACK);
2160   }
2161   else if (get_prev_hop (c) == pi)
2162   {
2163     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FINAL ACK\n");
2164     fwd = GNUNET_YES;
2165     connection_change_state (c, CADET_CONNECTION_READY);
2166   }
2167   else
2168   {
2169     GNUNET_break_op (0);
2170     return GNUNET_OK;
2171   }
2172
2173   connection_reset_timeout (c, fwd);
2174
2175   /* Add path to peers? */
2176   p = c->path;
2177   if (NULL != p)
2178   {
2179     GCP_add_path_to_all (p, GNUNET_YES);
2180   }
2181   else
2182   {
2183     GNUNET_break (0);
2184   }
2185
2186   /* Message for us as creator? */
2187   if (GCC_is_origin (c, GNUNET_YES))
2188   {
2189     if (GNUNET_NO != fwd)
2190     {
2191       GNUNET_break_op (0);
2192       return GNUNET_OK;
2193     }
2194     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
2195
2196     /* If just created, cancel the short timeout and start a long one */
2197     if (CADET_CONNECTION_SENT == oldstate)
2198       connection_reset_timeout (c, GNUNET_YES);
2199
2200     /* Change connection state */
2201     connection_change_state (c, CADET_CONNECTION_READY);
2202     send_connection_ack (c, GNUNET_YES);
2203
2204     /* Change tunnel state, trigger KX */
2205     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2206       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2207     GCC_check_connections ();
2208     return GNUNET_OK;
2209   }
2210
2211   /* Message for us as destination? */
2212   if (GCC_is_terminal (c, GNUNET_YES))
2213   {
2214     if (GNUNET_YES != fwd)
2215     {
2216       GNUNET_break_op (0);
2217       return GNUNET_OK;
2218     }
2219     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
2220
2221     /* If just created, cancel the short timeout and start a long one */
2222     if (CADET_CONNECTION_ACK == oldstate)
2223       connection_reset_timeout (c, GNUNET_NO);
2224
2225     /* Change tunnel state */
2226     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2227       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2228     GCC_check_connections ();
2229     return GNUNET_OK;
2230   }
2231
2232   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2233   GNUNET_assert (NULL ==
2234                  GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2235                                             GNUNET_YES, NULL, NULL));
2236   GCC_check_connections ();
2237   return GNUNET_OK;
2238 }
2239
2240
2241 /**
2242  * Core handler for notifications of broken connections.
2243  *
2244  * @param cls Closure (unused).
2245  * @param id Peer identity of sending neighbor.
2246  * @param message Message.
2247  * @return #GNUNET_OK to keep the connection open,
2248  *         #GNUNET_SYSERR to close it (signal serious error)
2249  */
2250 int
2251 GCC_handle_broken (void* cls,
2252                    const struct GNUNET_PeerIdentity* id,
2253                    const struct GNUNET_MessageHeader* message)
2254 {
2255   struct GNUNET_CADET_ConnectionBroken *msg;
2256   struct CadetConnection *c;
2257   struct CadetTunnel *t;
2258   int pending;
2259   int fwd;
2260
2261   GCC_check_connections ();
2262   msg = (struct GNUNET_CADET_ConnectionBroken *) message;
2263   log_message (message, id, &msg->cid);
2264   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
2265               GNUNET_i2s (&msg->peer1));
2266   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
2267               GNUNET_i2s (&msg->peer2));
2268   c = connection_get (&msg->cid);
2269   if (NULL == c)
2270   {
2271     LOG (GNUNET_ERROR_TYPE_DEBUG, "  duplicate CONNECTION_BROKEN\n");
2272     GCC_check_connections ();
2273     return GNUNET_OK;
2274   }
2275
2276   t = c->t;
2277
2278   fwd = is_fwd (c, id);
2279   mark_destroyed (c);
2280   if (GCC_is_terminal (c, fwd))
2281   {
2282     struct CadetPeer *endpoint;
2283
2284     if (NULL == t)
2285     {
2286       /* A terminal connection should not have 't' set to NULL. */
2287       GNUNET_break (0);
2288       GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
2289       return GNUNET_OK;
2290     }
2291     endpoint = GCP_get_short (c->path->peers[c->path->length - 1], GNUNET_YES);
2292     if (2 < c->path->length)
2293       path_invalidate (c->path);
2294     GCP_notify_broken_link (endpoint, &msg->peer1, &msg->peer2);
2295
2296     connection_change_state (c, CADET_CONNECTION_BROKEN);
2297     GCT_remove_connection (t, c);
2298     c->t = NULL;
2299
2300     pending = c->pending_messages;
2301     if (0 < pending)
2302       resend_messages_and_destroy (c, !fwd);
2303     else
2304       GCC_destroy (c);
2305   }
2306   else
2307   {
2308     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2309                                                       GNUNET_YES, NULL, NULL));
2310     connection_cancel_queues (c, !fwd);
2311   }
2312   GCC_check_connections ();
2313   return GNUNET_OK;
2314 }
2315
2316
2317 /**
2318  * Core handler for tunnel destruction
2319  *
2320  * @param cls Closure (unused).
2321  * @param peer Peer identity of sending neighbor.
2322  * @param message Message.
2323  * @return #GNUNET_OK to keep the connection open,
2324  *         #GNUNET_SYSERR to close it (signal serious error)
2325  */
2326 int
2327 GCC_handle_destroy (void *cls,
2328                     const struct GNUNET_PeerIdentity *peer,
2329                     const struct GNUNET_MessageHeader *message)
2330 {
2331   const struct GNUNET_CADET_ConnectionDestroy *msg;
2332   struct CadetConnection *c;
2333   int fwd;
2334
2335   GCC_check_connections ();
2336   msg = (const struct GNUNET_CADET_ConnectionDestroy *) message;
2337   log_message (message, peer, &msg->cid);
2338   c = connection_get (&msg->cid);
2339   if (NULL == c)
2340   {
2341     /* Probably already got the message from another path,
2342      * destroyed the tunnel and retransmitted to children.
2343      * Safe to ignore.
2344      */
2345     GNUNET_STATISTICS_update (stats,
2346                               "# control on unknown connection",
2347                               1, GNUNET_NO);
2348     LOG (GNUNET_ERROR_TYPE_DEBUG,
2349          "  connection unknown: already destroyed?\n");
2350     GCC_check_connections ();
2351     return GNUNET_OK;
2352   }
2353   fwd = is_fwd (c, peer);
2354   if (GNUNET_SYSERR == fwd)
2355   {
2356     GNUNET_break_op (0); /* FIXME */
2357     return GNUNET_OK;
2358   }
2359   if (GNUNET_NO == GCC_is_terminal (c, fwd))
2360   {
2361     GNUNET_assert (NULL ==
2362                    GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2363                                               GNUNET_YES, NULL, NULL));
2364   }
2365   else if (0 == c->pending_messages)
2366   {
2367     LOG (GNUNET_ERROR_TYPE_DEBUG, "  directly destroying connection!\n");
2368     GCC_destroy (c);
2369     GCC_check_connections ();
2370     return GNUNET_OK;
2371   }
2372   mark_destroyed (c);
2373   if (NULL != c->t)
2374   {
2375     GCT_remove_connection (c->t, c);
2376     c->t = NULL;
2377   }
2378   GCC_check_connections ();
2379   return GNUNET_OK;
2380 }
2381
2382
2383 /**
2384  * Check the message against internal state and test if it goes FWD or BCK.
2385  *
2386  * Updates the PID, state and timeout values for the connection.
2387  *
2388  * @param message Message to check. It must belong to an existing connection.
2389  * @param minimum_size The message cannot be smaller than this value.
2390  * @param cid Connection ID (even if @a c is NULL, the ID is still needed).
2391  * @param c Connection this message should belong. If NULL, check fails.
2392  * @param neighbor Neighbor that sent the message.
2393  */
2394 static int
2395 check_message (const struct GNUNET_MessageHeader *message,
2396                size_t minimum_size,
2397                const struct GNUNET_CADET_Hash* cid,
2398                struct CadetConnection *c,
2399                const struct GNUNET_PeerIdentity *neighbor,
2400                uint32_t pid)
2401 {
2402   GNUNET_PEER_Id neighbor_id;
2403   struct CadetFlowControl *fc;
2404   struct CadetPeer *hop;
2405   int fwd;
2406   uint16_t type;
2407
2408   /* Check size */
2409   if (ntohs (message->size) < minimum_size)
2410   {
2411     GNUNET_break_op (0);
2412     LOG (GNUNET_ERROR_TYPE_WARNING, "Size %u < %u\n",
2413          ntohs (message->size), minimum_size);
2414     return GNUNET_SYSERR;
2415   }
2416
2417   /* Check connection */
2418   if (NULL == c)
2419   {
2420     GNUNET_STATISTICS_update (stats,
2421                               "# unknown connection",
2422                               1, GNUNET_NO);
2423     LOG (GNUNET_ERROR_TYPE_DEBUG,
2424          "%s on unknown connection %s\n",
2425          GC_m2s (ntohs (message->type)),
2426          GNUNET_h2s (GC_h2hc (cid)));
2427     send_broken_unknown (cid,
2428                          &my_full_id,
2429                          NULL,
2430                          neighbor);
2431     return GNUNET_SYSERR;
2432   }
2433
2434   /* Check if origin is as expected */
2435   neighbor_id = GNUNET_PEER_search (neighbor);
2436   hop = get_prev_hop (c);
2437   if (neighbor_id == GCP_get_short_id (hop))
2438   {
2439     fwd = GNUNET_YES;
2440   }
2441   else
2442   {
2443     hop = get_next_hop (c);
2444     GNUNET_break (hop == c->next_peer);
2445     if (neighbor_id == GCP_get_short_id (hop))
2446     {
2447       fwd = GNUNET_NO;
2448     }
2449     else
2450     {
2451       /* Unexpected peer sending traffic on a connection. */
2452       GNUNET_break_op (0);
2453       return GNUNET_SYSERR;
2454     }
2455   }
2456
2457   /* Check PID for payload messages */
2458   type = ntohs (message->type);
2459   if (GNUNET_MESSAGE_TYPE_CADET_AX == type)
2460   {
2461     fc = fwd ? &c->bck_fc : &c->fwd_fc;
2462     LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u (expected %u - %u)\n",
2463          pid, fc->last_pid_recv + 1, fc->last_ack_sent);
2464     if (GC_is_pid_bigger (pid, fc->last_ack_sent))
2465     {
2466       GNUNET_break_op (0);
2467       GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
2468       LOG (GNUNET_ERROR_TYPE_WARNING, "Received PID %u, (prev %u), ACK %u\n",
2469           pid, fc->last_pid_recv, fc->last_ack_sent);
2470       return GNUNET_SYSERR;
2471     }
2472     if (GC_is_pid_bigger (pid, fc->last_pid_recv))
2473     {
2474       unsigned int delta;
2475
2476       delta = pid - fc->last_pid_recv;
2477       fc->last_pid_recv = pid;
2478       fc->recv_bitmap <<= delta;
2479       fc->recv_bitmap |= 1;
2480     }
2481     else
2482     {
2483       GNUNET_STATISTICS_update (stats, "# out of order PID", 1, GNUNET_NO);
2484       if (GNUNET_NO == is_ooo_ok (fc->last_pid_recv, pid, fc->recv_bitmap))
2485       {
2486         LOG (GNUNET_ERROR_TYPE_WARNING, "PID %u unexpected (%u+), dropping!\n",
2487              pid, fc->last_pid_recv - 31);
2488         return GNUNET_SYSERR;
2489       }
2490       fc->recv_bitmap |= get_recv_bitmask (fc->last_pid_recv, pid);
2491     }
2492   }
2493
2494   /* Count as connection confirmation. */
2495   if (CADET_CONNECTION_SENT == c->state || CADET_CONNECTION_ACK == c->state)
2496   {
2497     connection_change_state (c, CADET_CONNECTION_READY);
2498     if (NULL != c->t)
2499     {
2500       if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2501         GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2502     }
2503   }
2504   connection_reset_timeout (c, fwd);
2505
2506   return fwd;
2507 }
2508
2509
2510 /**
2511  * Generic handler for cadet network encrypted traffic.
2512  *
2513  * @param peer Peer identity this notification is about.
2514  * @param msg Encrypted message.
2515  * @return #GNUNET_OK to keep the connection open,
2516  *         #GNUNET_SYSERR to close it (signal serious error)
2517  */
2518 static int
2519 handle_cadet_encrypted (const struct GNUNET_PeerIdentity *peer,
2520                         const struct GNUNET_MessageHeader *message)
2521 {
2522   const struct GNUNET_CADET_AX *ax_msg;
2523   const struct GNUNET_CADET_Hash* cid;
2524   struct CadetConnection *c;
2525   size_t minimum_size;
2526   size_t overhead;
2527   uint32_t pid;
2528   int fwd;
2529
2530   GCC_check_connections ();
2531   GNUNET_assert (GNUNET_MESSAGE_TYPE_CADET_AX == ntohs (message->type));
2532   overhead = sizeof (struct GNUNET_CADET_AX);
2533   ax_msg = (const struct GNUNET_CADET_AX *) message;
2534   cid = &ax_msg->cid;
2535   pid = ntohl (ax_msg->pid);
2536   log_message (message, peer, cid);
2537
2538   minimum_size = sizeof (struct GNUNET_MessageHeader) + overhead;
2539   c = connection_get (cid);
2540   fwd = check_message (message,
2541                        minimum_size,
2542                        cid,
2543                        c,
2544                        peer,
2545                        pid);
2546
2547   /* If something went wrong, discard message. */
2548   if (GNUNET_SYSERR == fwd)
2549   {
2550     GCC_check_connections ();
2551     return GNUNET_OK;
2552   }
2553
2554   /* Is this message for us? */
2555   if (GCC_is_terminal (c, fwd))
2556   {
2557     GNUNET_STATISTICS_update (stats, "# received encrypted", 1, GNUNET_NO);
2558
2559     if (NULL == c->t)
2560     {
2561       GNUNET_break (GNUNET_NO != c->destroy);
2562       return GNUNET_OK;
2563     }
2564     GCT_handle_encrypted (c->t, message);
2565     GCC_send_ack (c, fwd, GNUNET_NO);
2566     GCC_check_connections ();
2567     return GNUNET_OK;
2568   }
2569
2570   /* Message not for us: forward to next hop */
2571   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2572   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2573   GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2574                                                     GNUNET_NO, NULL, NULL));
2575   GCC_check_connections ();
2576   return GNUNET_OK;
2577 }
2578
2579
2580 /**
2581  * Generic handler for cadet network encrypted traffic.
2582  *
2583  * @param peer Peer identity this notification is about.
2584  * @param msg Encrypted message.
2585  * @return #GNUNET_OK to keep the connection open,
2586  *         #GNUNET_SYSERR to close it (signal serious error)
2587  */
2588 static int
2589 handle_cadet_kx (const struct GNUNET_PeerIdentity *peer,
2590                  const struct GNUNET_CADET_KX *msg)
2591 {
2592   const struct GNUNET_CADET_Hash* cid;
2593   struct CadetConnection *c;
2594   size_t expected_size;
2595   int fwd;
2596
2597   GCC_check_connections ();
2598   cid = &msg->cid;
2599   log_message (&msg->header, peer, cid);
2600
2601   expected_size = sizeof (struct GNUNET_CADET_KX)
2602                   + sizeof (struct GNUNET_MessageHeader);
2603   c = connection_get (cid);
2604   fwd = check_message (&msg->header,
2605                        expected_size,
2606                        cid,
2607                        c,
2608                        peer,
2609                        0);
2610
2611   /* If something went wrong, discard message. */
2612   if (GNUNET_SYSERR == fwd)
2613     return GNUNET_OK;
2614
2615   /* Is this message for us? */
2616   if (GCC_is_terminal (c, fwd))
2617   {
2618     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2619     GNUNET_STATISTICS_update (stats, "# received KX", 1, GNUNET_NO);
2620     if (NULL == c->t)
2621     {
2622       GNUNET_break (0);
2623       return GNUNET_OK;
2624     }
2625     GCT_handle_kx (c->t, &msg[1].header);
2626     GCC_check_connections ();
2627     return GNUNET_OK;
2628   }
2629
2630   /* Message not for us: forward to next hop */
2631   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2632   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2633   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
2634                                                     GNUNET_NO, NULL, NULL));
2635   GCC_check_connections ();
2636   return GNUNET_OK;
2637 }
2638
2639
2640 /**
2641  * Core handler for key exchange traffic (ephemeral key, ping, pong).
2642  *
2643  * @param cls Closure (unused).
2644  * @param message Message received.
2645  * @param peer Peer who sent the message.
2646  * @return #GNUNET_OK to keep the connection open,
2647  *         #GNUNET_SYSERR to close it (signal serious error)
2648  */
2649 int
2650 GCC_handle_kx (void *cls,
2651                const struct GNUNET_PeerIdentity *peer,
2652                const struct GNUNET_MessageHeader *message)
2653 {
2654   GCC_check_connections ();
2655   return handle_cadet_kx (peer, (struct GNUNET_CADET_KX *) message);
2656 }
2657
2658
2659 /**
2660  * Core handler for encrypted cadet network traffic (channel mgmt, data).
2661  *
2662  * @param cls Closure (unused).
2663  * @param message Message received.
2664  * @param peer Peer who sent the message.
2665  * @return #GNUNET_OK to keep the connection open,
2666  *         #GNUNET_SYSERR to close it (signal serious error)
2667  */
2668 int
2669 GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
2670                       const struct GNUNET_MessageHeader *message)
2671 {
2672   GCC_check_connections ();
2673   return handle_cadet_encrypted (peer, message);
2674 }
2675
2676
2677 /**
2678  * Core handler for cadet network traffic point-to-point acks.
2679  *
2680  * @param cls closure
2681  * @param message message
2682  * @param peer peer identity this notification is about
2683  * @return #GNUNET_OK to keep the connection open,
2684  *         #GNUNET_SYSERR to close it (signal serious error)
2685  */
2686 int
2687 GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
2688                 const struct GNUNET_MessageHeader *message)
2689 {
2690   struct GNUNET_CADET_ACK *msg;
2691   struct CadetConnection *c;
2692   struct CadetFlowControl *fc;
2693   GNUNET_PEER_Id id;
2694   uint32_t ack;
2695   int fwd;
2696
2697   GCC_check_connections ();
2698   msg = (struct GNUNET_CADET_ACK *) message;
2699   log_message (message, peer, &msg->cid);
2700   c = connection_get (&msg->cid);
2701   if (NULL == c)
2702   {
2703     GNUNET_STATISTICS_update (stats,
2704                               "# ack on unknown connection",
2705                               1,
2706                               GNUNET_NO);
2707     send_broken_unknown (&msg->cid,
2708                          &my_full_id,
2709                          NULL,
2710                          peer);
2711     GCC_check_connections ();
2712     return GNUNET_OK;
2713   }
2714
2715   /* Is this a forward or backward ACK? */
2716   id = GNUNET_PEER_search (peer);
2717   if (GCP_get_short_id (get_next_hop (c)) == id)
2718   {
2719     fc = &c->fwd_fc;
2720     fwd = GNUNET_YES;
2721   }
2722   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2723   {
2724     fc = &c->bck_fc;
2725     fwd = GNUNET_NO;
2726   }
2727   else
2728   {
2729     GNUNET_break_op (0);
2730     return GNUNET_OK;
2731   }
2732
2733   ack = ntohl (msg->ack);
2734   LOG (GNUNET_ERROR_TYPE_DEBUG, " %s ACK %u (was %u)\n",
2735        GC_f2s (fwd), ack, fc->last_ack_recv);
2736   if (GC_is_pid_bigger (ack, fc->last_ack_recv))
2737     fc->last_ack_recv = ack;
2738
2739   /* Cancel polling if the ACK is big enough. */
2740   if (NULL != fc->poll_task &&
2741       GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2742   {
2743     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
2744     GNUNET_SCHEDULER_cancel (fc->poll_task);
2745     fc->poll_task = NULL;
2746     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
2747   }
2748
2749   connection_unlock_queue (c, fwd);
2750   GCC_check_connections ();
2751   return GNUNET_OK;
2752 }
2753
2754
2755 /**
2756  * Core handler for cadet network traffic point-to-point ack polls.
2757  *
2758  * @param cls closure
2759  * @param message message
2760  * @param peer peer identity this notification is about
2761  * @return #GNUNET_OK to keep the connection open,
2762  *         #GNUNET_SYSERR to close it (signal serious error)
2763  */
2764 int
2765 GCC_handle_poll (void *cls,
2766                  const struct GNUNET_PeerIdentity *peer,
2767                  const struct GNUNET_MessageHeader *message)
2768 {
2769   struct GNUNET_CADET_Poll *msg;
2770   struct CadetConnection *c;
2771   struct CadetFlowControl *fc;
2772   GNUNET_PEER_Id id;
2773   uint32_t pid;
2774   int fwd;
2775
2776   GCC_check_connections ();
2777   msg = (struct GNUNET_CADET_Poll *) message;
2778   log_message (message, peer, &msg->cid);
2779   c = connection_get (&msg->cid);
2780   if (NULL == c)
2781   {
2782     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
2783                               GNUNET_NO);
2784     LOG (GNUNET_ERROR_TYPE_DEBUG,
2785          "POLL message on unknown connection %s!\n",
2786          GNUNET_h2s (GC_h2hc (&msg->cid)));
2787     send_broken_unknown (&msg->cid,
2788                          &my_full_id,
2789                          NULL,
2790                          peer);
2791     GCC_check_connections ();
2792     return GNUNET_OK;
2793   }
2794
2795   /* Is this a forward or backward ACK?
2796    * Note: a poll should never be needed in a loopback case,
2797    * since there is no possiblility of packet loss there, so
2798    * this way of discerining FWD/BCK should not be a problem.
2799    */
2800   id = GNUNET_PEER_search (peer);
2801   if (GCP_get_short_id (get_next_hop (c)) == id)
2802   {
2803     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
2804     fc = &c->fwd_fc;
2805   }
2806   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2807   {
2808     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
2809     fc = &c->bck_fc;
2810   }
2811   else
2812   {
2813     GNUNET_break_op (0);
2814     return GNUNET_OK;
2815   }
2816
2817   pid = ntohl (msg->pid);
2818   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
2819   fc->last_pid_recv = pid;
2820   fwd = fc == &c->bck_fc;
2821   GCC_send_ack (c, fwd, GNUNET_YES);
2822   GCC_check_connections ();
2823
2824   return GNUNET_OK;
2825 }
2826
2827
2828 /**
2829  * Send an ACK on the appropriate connection/channel, depending on
2830  * the direction and the position of the peer.
2831  *
2832  * @param c Which connection to send the hop-by-hop ACK.
2833  * @param fwd Is this a fwd ACK? (will go dest->root).
2834  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
2835  */
2836 void
2837 GCC_send_ack (struct CadetConnection *c, int fwd, int force)
2838 {
2839   unsigned int buffer;
2840
2841   GCC_check_connections ();
2842   LOG (GNUNET_ERROR_TYPE_DEBUG, "GCC send %s ACK on %s\n",
2843        GC_f2s (fwd), GCC_2s (c));
2844
2845   if (NULL == c)
2846   {
2847     GNUNET_break (0);
2848     return;
2849   }
2850
2851   if (GNUNET_NO != c->destroy)
2852   {
2853     LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
2854     GCC_check_connections ();
2855     return;
2856   }
2857
2858   /* Get available buffer space */
2859   if (GCC_is_terminal (c, fwd))
2860   {
2861     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
2862     buffer = GCT_get_channels_buffer (c->t);
2863   }
2864   else
2865   {
2866     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
2867     buffer = GCC_get_buffer (c, fwd);
2868   }
2869   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
2870   if (0 == buffer && GNUNET_NO == force)
2871   {
2872     GCC_check_connections ();
2873     return;
2874   }
2875
2876   /* Send available buffer space */
2877   if (GCC_is_origin (c, fwd))
2878   {
2879     GNUNET_assert (NULL != c->t);
2880     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2881     GCT_unchoke_channels (c->t);
2882   }
2883   else
2884   {
2885     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2886     send_ack (c, buffer, fwd, force);
2887   }
2888   GCC_check_connections ();
2889 }
2890
2891
2892 /**
2893  * Initialize the connections subsystem
2894  *
2895  * @param c Configuration handle.
2896  */
2897 void
2898 GCC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2899 {
2900   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2901   if (GNUNET_OK !=
2902       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_MSGS_QUEUE",
2903                                              &max_msgs_queue))
2904   {
2905     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2906                                "CADET", "MAX_MSGS_QUEUE", "MISSING");
2907     GNUNET_SCHEDULER_shutdown ();
2908     return;
2909   }
2910
2911   if (GNUNET_OK !=
2912       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_CONNECTIONS",
2913                                              &max_connections))
2914   {
2915     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2916                                "CADET", "MAX_CONNECTIONS", "MISSING");
2917     GNUNET_SCHEDULER_shutdown ();
2918     return;
2919   }
2920
2921   if (GNUNET_OK !=
2922       GNUNET_CONFIGURATION_get_value_time (c, "CADET", "REFRESH_CONNECTION_TIME",
2923                                            &refresh_connection_time))
2924   {
2925     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2926                                "CADET", "REFRESH_CONNECTION_TIME", "MISSING");
2927     GNUNET_SCHEDULER_shutdown ();
2928     return;
2929   }
2930   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2931   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
2932 }
2933
2934
2935 /**
2936  * Destroy each connection on shutdown.
2937  *
2938  * @param cls Closure (unused).
2939  * @param key Current key code (CID, unused).
2940  * @param value Value in the hash map (`struct CadetConnection`)
2941  *
2942  * @return #GNUNET_YES, because we should continue to iterate
2943  */
2944 static int
2945 shutdown_iterator (void *cls,
2946                    const struct GNUNET_HashCode *key,
2947                    void *value)
2948 {
2949   struct CadetConnection *c = value;
2950
2951   c->state = CADET_CONNECTION_DESTROYED;
2952   GCC_destroy (c);
2953   return GNUNET_YES;
2954 }
2955
2956
2957 /**
2958  * Shut down the connections subsystem.
2959  */
2960 void
2961 GCC_shutdown (void)
2962 {
2963   LOG (GNUNET_ERROR_TYPE_DEBUG, "Shutting down connections\n");
2964   GCC_check_connections ();
2965   GNUNET_CONTAINER_multihashmap_iterate (connections,
2966                                          &shutdown_iterator,
2967                                          NULL);
2968   GNUNET_CONTAINER_multihashmap_destroy (connections);
2969   connections = NULL;
2970 }
2971
2972
2973 /**
2974  * Create a connection.
2975  *
2976  * @param cid Connection ID (either created locally or imposed remotely).
2977  * @param t Tunnel this connection belongs to (or NULL);
2978  * @param path Path this connection has to use (copy is made).
2979  * @param own_pos Own position in the @c path path.
2980  *
2981  * @return Newly created connection, NULL in case of error (own id not in path).
2982  */
2983 struct CadetConnection *
2984 GCC_new (const struct GNUNET_CADET_Hash *cid,
2985          struct CadetTunnel *t,
2986          struct CadetPeerPath *path,
2987          unsigned int own_pos)
2988 {
2989   struct CadetConnection *c;
2990   struct CadetPeerPath *cpath;
2991
2992   GCC_check_connections ();
2993   cpath = path_duplicate (path);
2994   GNUNET_assert (NULL != cpath);
2995   c = GNUNET_new (struct CadetConnection);
2996   c->id = *cid;
2997   GNUNET_assert (GNUNET_OK ==
2998                  GNUNET_CONTAINER_multihashmap_put (connections,
2999                                                     GCC_get_h (c), c,
3000                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3001   fc_init (&c->fwd_fc);
3002   fc_init (&c->bck_fc);
3003   c->fwd_fc.c = c;
3004   c->bck_fc.c = c;
3005
3006   c->t = t;
3007   GNUNET_assert (own_pos <= cpath->length - 1);
3008   c->own_pos = own_pos;
3009   c->path = cpath;
3010   cpath->c = c;
3011   if (GNUNET_OK != register_neighbors (c))
3012   {
3013     if (0 == own_pos)
3014     {
3015       /* We were the origin of this request, this means we have invalid
3016        * info about the paths to reach the destination. We must invalidate
3017        * the *original* path to avoid trying it again in the next minute.
3018        */
3019       if (2 < path->length)
3020         path_invalidate (path);
3021       else
3022       {
3023         GNUNET_break (0);
3024         GCT_debug(t, GNUNET_ERROR_TYPE_WARNING);
3025       }
3026       c->t = NULL;
3027     }
3028     path_destroy (c->path);
3029     c->path = NULL;
3030     GCC_destroy (c);
3031     return NULL;
3032   }
3033   LOG (GNUNET_ERROR_TYPE_INFO, "New connection %s\n", GCC_2s (c));
3034   GCC_check_connections ();
3035   return c;
3036 }
3037
3038
3039 void
3040 GCC_destroy (struct CadetConnection *c)
3041 {
3042   GCC_check_connections ();
3043   if (NULL == c)
3044   {
3045     GNUNET_break (0);
3046     return;
3047   }
3048
3049   if (2 == c->destroy) /* cancel queues -> GCP_queue_cancel -> q_destroy -> */
3050     return;            /* -> message_sent -> GCC_destroy. Don't loop. */
3051   c->destroy = 2;
3052
3053   LOG (GNUNET_ERROR_TYPE_DEBUG,
3054        "destroying connection %s\n",
3055        GCC_2s (c));
3056   LOG (GNUNET_ERROR_TYPE_DEBUG,
3057        " fc's f: %p, b: %p\n",
3058        &c->fwd_fc, &c->bck_fc);
3059   LOG (GNUNET_ERROR_TYPE_DEBUG,
3060        " fc tasks f: %u, b: %u\n",
3061        c->fwd_fc.poll_task,
3062        c->bck_fc.poll_task);
3063
3064   /* Cancel all traffic */
3065   if (NULL != c->path)
3066   {
3067     connection_cancel_queues (c, GNUNET_YES);
3068     connection_cancel_queues (c, GNUNET_NO);
3069   }
3070   unregister_neighbors (c);
3071   path_destroy (c->path);
3072   c->path = NULL;
3073
3074   /* Delete from tunnel */
3075   if (NULL != c->t)
3076     GCT_remove_connection (c->t, c);
3077
3078   if (NULL != c->check_duplicates_task)
3079     GNUNET_SCHEDULER_cancel (c->check_duplicates_task);
3080   if (NULL != c->fwd_maintenance_task)
3081     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
3082   if (NULL != c->bck_maintenance_task)
3083     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
3084
3085   if (GNUNET_NO == c->was_removed)
3086   {
3087     GNUNET_break (GNUNET_YES ==
3088                   GNUNET_CONTAINER_multihashmap_remove (connections,
3089                                                         GCC_get_h (c),
3090                                                         c));
3091   }
3092   GNUNET_STATISTICS_update (stats,
3093                             "# connections",
3094                             -1,
3095                             GNUNET_NO);
3096   GNUNET_free (c);
3097   GCC_check_connections ();
3098 }
3099
3100
3101 /**
3102  * Get the connection ID.
3103  *
3104  * @param c Connection to get the ID from.
3105  *
3106  * @return ID of the connection.
3107  */
3108 const struct GNUNET_CADET_Hash *
3109 GCC_get_id (const struct CadetConnection *c)
3110 {
3111   return &c->id;
3112 }
3113
3114
3115 /**
3116  * Get the connection ID.
3117  *
3118  * @param c Connection to get the ID from.
3119  *
3120  * @return ID of the connection.
3121  */
3122 const struct GNUNET_HashCode *
3123 GCC_get_h (const struct CadetConnection *c)
3124 {
3125   return GC_h2hc (&c->id);
3126 }
3127
3128
3129 /**
3130  * Get the connection path.
3131  *
3132  * @param c Connection to get the path from.
3133  *
3134  * @return path used by the connection.
3135  */
3136 const struct CadetPeerPath *
3137 GCC_get_path (const struct CadetConnection *c)
3138 {
3139   if (GNUNET_NO == c->destroy)
3140     return c->path;
3141   return NULL;
3142 }
3143
3144
3145 /**
3146  * Get the connection state.
3147  *
3148  * @param c Connection to get the state from.
3149  *
3150  * @return state of the connection.
3151  */
3152 enum CadetConnectionState
3153 GCC_get_state (const struct CadetConnection *c)
3154 {
3155   return c->state;
3156 }
3157
3158 /**
3159  * Get the connection tunnel.
3160  *
3161  * @param c Connection to get the tunnel from.
3162  *
3163  * @return tunnel of the connection.
3164  */
3165 struct CadetTunnel *
3166 GCC_get_tunnel (const struct CadetConnection *c)
3167 {
3168   return c->t;
3169 }
3170
3171
3172 /**
3173  * Get free buffer space in a connection.
3174  *
3175  * @param c Connection.
3176  * @param fwd Is query about FWD traffic?
3177  *
3178  * @return Free buffer space [0 - max_msgs_queue/max_connections]
3179  */
3180 unsigned int
3181 GCC_get_buffer (struct CadetConnection *c, int fwd)
3182 {
3183   struct CadetFlowControl *fc;
3184
3185   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3186
3187   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Get %s buffer on %s: %u - %u\n",
3188        GC_f2s (fwd), GCC_2s (c), fc->queue_max, fc->queue_n);
3189   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
3190
3191   return (fc->queue_max - fc->queue_n);
3192 }
3193
3194
3195 /**
3196  * Get how many messages have we allowed to send to us from a direction.
3197  *
3198  * @param c Connection.
3199  * @param fwd Are we asking about traffic from FWD (BCK messages)?
3200  *
3201  * @return last_ack_sent - last_pid_recv
3202  */
3203 unsigned int
3204 GCC_get_allowed (struct CadetConnection *c, int fwd)
3205 {
3206   struct CadetFlowControl *fc;
3207
3208   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3209   if (CADET_CONNECTION_READY != c->state
3210       || GC_is_pid_bigger (fc->last_pid_recv, fc->last_ack_sent))
3211   {
3212     return 0;
3213   }
3214   return (fc->last_ack_sent - fc->last_pid_recv);
3215 }
3216
3217
3218 /**
3219  * Get messages queued in a connection.
3220  *
3221  * @param c Connection.
3222  * @param fwd Is query about FWD traffic?
3223  *
3224  * @return Number of messages queued.
3225  */
3226 unsigned int
3227 GCC_get_qn (struct CadetConnection *c, int fwd)
3228 {
3229   struct CadetFlowControl *fc;
3230
3231   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3232
3233   return fc->queue_n;
3234 }
3235
3236
3237 /**
3238  * Get next PID to use.
3239  *
3240  * @param c Connection.
3241  * @param fwd Is query about FWD traffic?
3242  *
3243  * @return Last PID used + 1.
3244  */
3245 unsigned int
3246 GCC_get_pid (struct CadetConnection *c, int fwd)
3247 {
3248   struct CadetFlowControl *fc;
3249
3250   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3251
3252   return fc->last_pid_sent + 1;
3253 }
3254
3255
3256 /**
3257  * Allow the connection to advertise a buffer of the given size.
3258  *
3259  * The connection will send an @c fwd ACK message (so: in direction !fwd)
3260  * allowing up to last_pid_recv + buffer.
3261  *
3262  * @param c Connection.
3263  * @param buffer How many more messages the connection can accept.
3264  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
3265  */
3266 void
3267 GCC_allow (struct CadetConnection *c, unsigned int buffer, int fwd)
3268 {
3269   LOG (GNUNET_ERROR_TYPE_DEBUG, "  allowing %s %u messages %s\n",
3270        GCC_2s (c), buffer, GC_f2s (fwd));
3271   send_ack (c, buffer, fwd, GNUNET_NO);
3272 }
3273
3274
3275 /**
3276  * Notify other peers on a connection of a broken link. Mark connections
3277  * to destroy after all traffic has been sent.
3278  *
3279  * @param c Connection on which there has been a disconnection.
3280  * @param peer Peer that disconnected.
3281  */
3282 void
3283 GCC_neighbor_disconnected (struct CadetConnection *c, struct CadetPeer *peer)
3284 {
3285   struct CadetFlowControl *fc;
3286   struct CadetPeer *hop;
3287   char peer_name[16];
3288   int fwd;
3289
3290   GCC_check_connections ();
3291   strncpy (peer_name, GCP_2s (peer), 16);
3292   peer_name[15] = '\0';
3293   LOG (GNUNET_ERROR_TYPE_DEBUG,
3294        "shutting down %s, %s disconnected\n",
3295        GCC_2s (c), peer_name);
3296
3297   invalidate_paths (c, peer);
3298
3299   hop = get_prev_hop (c);
3300   if (NULL == hop)
3301   {
3302     /* Path was NULL, we should have deleted the connection. */
3303     GNUNET_break (0);
3304     return;
3305   }
3306   fwd = (peer == hop);
3307   if ( (GNUNET_YES == GCC_is_terminal (c, fwd)) ||
3308        (GNUNET_NO != c->destroy) )
3309   {
3310     /* Local shutdown, or other peer already down (hence 'c->destroy');
3311        so there is no one to notify about this, just clean up. */
3312     GCC_destroy (c);
3313     GCC_check_connections ();
3314     return;
3315   }
3316   /* Mark FlowControl towards the peer as unavaliable. */
3317   fc = fwd ? &c->bck_fc : &c->fwd_fc;
3318   fc->queue_max = 0;
3319
3320   send_broken (c, &my_full_id, GCP_get_id (peer), fwd);
3321
3322   /* Connection will have at least one pending message
3323    * (the one we just scheduled), so delay destruction
3324    * and remove from map so we don't use accidentally. */
3325   mark_destroyed (c);
3326   GNUNET_assert (GNUNET_NO == c->was_removed);
3327   c->was_removed = GNUNET_YES;
3328   GNUNET_break (GNUNET_YES ==
3329                 GNUNET_CONTAINER_multihashmap_remove (connections,
3330                                                       GCC_get_h (c),
3331                                                       c));
3332   /* Cancel queue in the direction that just died. */
3333   connection_cancel_queues (c, ! fwd);
3334   GCC_stop_poll (c, ! fwd);
3335   unregister_neighbors (c);
3336   GCC_check_connections ();
3337 }
3338
3339
3340 /**
3341  * Is this peer the first one on the connection?
3342  *
3343  * @param c Connection.
3344  * @param fwd Is this about fwd traffic?
3345  *
3346  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
3347  */
3348 int
3349 GCC_is_origin (struct CadetConnection *c, int fwd)
3350 {
3351   if (!fwd && c->path->length - 1 == c->own_pos )
3352     return GNUNET_YES;
3353   if (fwd && 0 == c->own_pos)
3354     return GNUNET_YES;
3355   return GNUNET_NO;
3356 }
3357
3358
3359 /**
3360  * Is this peer the last one on the connection?
3361  *
3362  * @param c Connection.
3363  * @param fwd Is this about fwd traffic?
3364  *            Note that the ROOT is the terminal for BCK traffic!
3365  *
3366  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
3367  */
3368 int
3369 GCC_is_terminal (struct CadetConnection *c, int fwd)
3370 {
3371   return GCC_is_origin (c, ! fwd);
3372 }
3373
3374
3375 /**
3376  * See if we are allowed to send by the next hop in the given direction.
3377  *
3378  * @param c Connection.
3379  * @param fwd Is this about fwd traffic?
3380  *
3381  * @return #GNUNET_YES in case it's OK to send.
3382  */
3383 int
3384 GCC_is_sendable (struct CadetConnection *c, int fwd)
3385 {
3386   struct CadetFlowControl *fc;
3387
3388   LOG (GNUNET_ERROR_TYPE_DEBUG,
3389        " checking sendability of %s traffic on %s\n",
3390        GC_f2s (fwd), GCC_2s (c));
3391   if (NULL == c)
3392   {
3393     GNUNET_break (0);
3394     return GNUNET_YES;
3395   }
3396   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3397   LOG (GNUNET_ERROR_TYPE_DEBUG,
3398        " last ack recv: %u, last pid sent: %u\n",
3399        fc->last_ack_recv, fc->last_pid_sent);
3400   if (GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
3401   {
3402     LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable\n");
3403     return GNUNET_YES;
3404   }
3405   LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n");
3406   return GNUNET_NO;
3407 }
3408
3409
3410 /**
3411  * Check if this connection is a direct one (never trim a direct connection).
3412  *
3413  * @param c Connection.
3414  *
3415  * @return #GNUNET_YES in case it's a direct connection, #GNUNET_NO otherwise.
3416  */
3417 int
3418 GCC_is_direct (struct CadetConnection *c)
3419 {
3420   return (c->path->length == 2) ? GNUNET_YES : GNUNET_NO;
3421 }
3422
3423
3424 /**
3425  * Sends an already built message on a connection, properly registering
3426  * all used resources.
3427  *
3428  * @param message Message to send. Function makes a copy of it.
3429  *                If message is not hop-by-hop, decrements TTL of copy.
3430  * @param payload_type Type of payload, in case the message is encrypted.
3431  * @param c Connection on which this message is transmitted.
3432  * @param fwd Is this a fwd message?
3433  * @param force Force the connection to accept the message (buffer overfill).
3434  * @param cont Continuation called once message is sent. Can be NULL.
3435  * @param cont_cls Closure for @c cont.
3436  *
3437  * @return Handle to cancel the message before it's sent.
3438  *         NULL on error or if @c cont is NULL.
3439  *         Invalid on @c cont call.
3440  */
3441 struct CadetConnectionQueue *
3442 GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
3443                            uint16_t payload_type, uint32_t payload_id,
3444                            struct CadetConnection *c, int fwd, int force,
3445                            GCC_sent cont, void *cont_cls)
3446 {
3447   struct CadetFlowControl *fc;
3448   struct CadetConnectionQueue *q;
3449   void *data;
3450   size_t size;
3451   uint16_t type;
3452   int droppable;
3453
3454   GCC_check_connections ();
3455   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3456   if (0 == fc->queue_max)
3457   {
3458     GNUNET_break (0);
3459     return NULL;
3460   }
3461
3462   size = ntohs (message->size);
3463   data = GNUNET_malloc (size);
3464   GNUNET_memcpy (data, message, size);
3465   type = ntohs (message->type);
3466   LOG (GNUNET_ERROR_TYPE_INFO,
3467        "--> %s (%s %4u) on conn %s (%p) %s [%5u]\n",
3468        GC_m2s (type), GC_m2s (payload_type), payload_id, GCC_2s (c), c,
3469        GC_f2s(fwd), size);
3470   droppable = GNUNET_NO == force;
3471   switch (type)
3472   {
3473     struct GNUNET_CADET_AX        *axmsg;
3474     struct GNUNET_CADET_KX        *kmsg;
3475     struct GNUNET_CADET_ACK       *amsg;
3476     struct GNUNET_CADET_Poll      *pmsg;
3477     struct GNUNET_CADET_ConnectionDestroy *dmsg;
3478     struct GNUNET_CADET_ConnectionBroken  *bmsg;
3479
3480     case GNUNET_MESSAGE_TYPE_CADET_AX:
3481       axmsg = (struct GNUNET_CADET_AX *) data;
3482       axmsg->cid = c->id;
3483       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
3484       LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
3485       LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
3486       if (GNUNET_YES == droppable)
3487       {
3488         fc->queue_n++;
3489       }
3490       else
3491       {
3492         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not droppable, Q_N stays the same\n");
3493       }
3494       break;
3495
3496     case GNUNET_MESSAGE_TYPE_CADET_KX:
3497       kmsg = (struct GNUNET_CADET_KX *) data;
3498       kmsg->cid = c->id;
3499       break;
3500
3501     case GNUNET_MESSAGE_TYPE_CADET_ACK:
3502       amsg = (struct GNUNET_CADET_ACK *) data;
3503       amsg->cid = c->id;
3504       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
3505       droppable = GNUNET_NO;
3506       break;
3507
3508     case GNUNET_MESSAGE_TYPE_CADET_POLL:
3509       pmsg = (struct GNUNET_CADET_Poll *) data;
3510       pmsg->cid = c->id;
3511       LOG (GNUNET_ERROR_TYPE_DEBUG, " POLL %u\n", ntohl (pmsg->pid));
3512       droppable = GNUNET_NO;
3513       break;
3514
3515     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
3516       dmsg = (struct GNUNET_CADET_ConnectionDestroy *) data;
3517       dmsg->cid = c->id;
3518       break;
3519
3520     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
3521       bmsg = (struct GNUNET_CADET_ConnectionBroken *) data;
3522       bmsg->cid = c->id;
3523       break;
3524
3525     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
3526     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
3527       break;
3528
3529     default:
3530       GNUNET_break (0);
3531       GNUNET_free (data);
3532       return NULL;
3533   }
3534
3535   if (fc->queue_n > fc->queue_max && droppable)
3536   {
3537     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
3538                               1, GNUNET_NO);
3539     GNUNET_break (0);
3540     LOG (GNUNET_ERROR_TYPE_DEBUG, "queue full: %u/%u\n",
3541          fc->queue_n, fc->queue_max);
3542     if (GNUNET_MESSAGE_TYPE_CADET_AX == type)
3543     {
3544       fc->queue_n--;
3545     }
3546     GNUNET_free (data);
3547     return NULL; /* Drop this message */
3548   }
3549
3550   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %s %u\n",
3551        GCC_2s (c), c->pending_messages);
3552   c->pending_messages++;
3553
3554   q = GNUNET_new (struct CadetConnectionQueue);
3555   q->forced = !droppable;
3556   q->q = GCP_queue_add (get_hop (c, fwd), data, type, payload_type, payload_id,
3557                         size, c, fwd, &conn_message_sent, q);
3558   if (NULL == q->q)
3559   {
3560     LOG (GNUNET_ERROR_TYPE_DEBUG, "dropping msg on %s, NULL q\n", GCC_2s (c));
3561     GNUNET_free (data);
3562     GNUNET_free (q);
3563     GCC_check_connections ();
3564     return NULL;
3565   }
3566   q->cont = cont;
3567   q->cont_cls = cont_cls;
3568   GCC_check_connections ();
3569   return (NULL == cont) ? NULL : q;
3570 }
3571
3572
3573 /**
3574  * Cancel a previously sent message while it's in the queue.
3575  *
3576  * ONLY can be called before the continuation given to the send function
3577  * is called. Once the continuation is called, the message is no longer in the
3578  * queue.
3579  *
3580  * @param q Handle to the queue.
3581  */
3582 void
3583 GCC_cancel (struct CadetConnectionQueue *q)
3584 {
3585   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GCC cancel message\n");
3586
3587   /* queue destroy calls message_sent, which calls q->cont and frees q */
3588   GCP_queue_destroy (q->q, GNUNET_YES, GNUNET_NO, 0);
3589   GCC_check_connections ();
3590 }
3591
3592
3593 /**
3594  * Sends a CREATE CONNECTION message for a path to a peer.
3595  * Changes the connection and tunnel states if necessary.
3596  *
3597  * @param connection Connection to create.
3598  */
3599 void
3600 GCC_send_create (struct CadetConnection *connection)
3601 {
3602   enum CadetTunnelCState state;
3603   size_t size;
3604
3605   GCC_check_connections ();
3606   size = sizeof (struct GNUNET_CADET_ConnectionCreate);
3607   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
3608
3609   LOG (GNUNET_ERROR_TYPE_INFO, "==> %s %19s on conn %s (%p) FWD [%5u]\n",
3610        GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE), "",
3611        GCC_2s (connection), connection, size);
3612   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
3613        connection, connection->pending_messages);
3614   connection->pending_messages++;
3615
3616   connection->maintenance_q =
3617     GCP_queue_add (get_next_hop (connection), NULL,
3618                    GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, UINT16_MAX, 0,
3619                    size, connection, GNUNET_YES, &conn_message_sent, NULL);
3620
3621   state = GCT_get_cstate (connection->t);
3622   if (CADET_TUNNEL_SEARCHING == state || CADET_TUNNEL_NEW == state)
3623     GCT_change_cstate (connection->t, CADET_TUNNEL_WAITING);
3624   if (CADET_CONNECTION_NEW == connection->state)
3625     connection_change_state (connection, CADET_CONNECTION_SENT);
3626   GCC_check_connections ();
3627 }
3628
3629
3630 /**
3631  * Send a message to all peers in this connection that the connection
3632  * is no longer valid.
3633  *
3634  * If some peer should not receive the message, it should be zero'ed out
3635  * before calling this function.
3636  *
3637  * @param c The connection whose peers to notify.
3638  */
3639 void
3640 GCC_send_destroy (struct CadetConnection *c)
3641 {
3642   struct GNUNET_CADET_ConnectionDestroy msg;
3643
3644   if (GNUNET_YES == c->destroy)
3645     return;
3646   GCC_check_connections ();
3647   msg.header.size = htons (sizeof (msg));
3648   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
3649   msg.cid = c->id;
3650   LOG (GNUNET_ERROR_TYPE_DEBUG,
3651               "  sending connection destroy for connection %s\n",
3652               GCC_2s (c));
3653
3654   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_YES))
3655     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, UINT16_MAX,
3656                                                       0, c, GNUNET_YES,
3657                                                       GNUNET_YES, NULL, NULL));
3658   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_NO))
3659     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, UINT16_MAX,
3660                                                       0, c, GNUNET_NO,
3661                                                       GNUNET_YES, NULL, NULL));
3662   mark_destroyed (c);
3663   GCC_check_connections ();
3664 }
3665
3666
3667 /**
3668  * @brief Start a polling timer for the connection.
3669  *
3670  * When a neighbor does not accept more traffic on the connection it could be
3671  * caused by a simple congestion or by a lost ACK. Polling enables to check
3672  * for the lastest ACK status for a connection.
3673  *
3674  * @param c Connection.
3675  * @param fwd Should we poll in the FWD direction?
3676  */
3677 void
3678 GCC_start_poll (struct CadetConnection *c, int fwd)
3679 {
3680   struct CadetFlowControl *fc;
3681
3682   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3683   LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL %s requested\n",
3684        GC_f2s (fwd));
3685   if (NULL != fc->poll_task || NULL != fc->poll_msg)
3686   {
3687     LOG (GNUNET_ERROR_TYPE_DEBUG, "  POLL already in progress (t: %p, m: %p)\n",
3688          fc->poll_task, fc->poll_msg);
3689     return;
3690   }
3691   if (0 == fc->queue_max)
3692   {
3693     /* Should not be needed, traffic should've been cancelled. */
3694     GNUNET_break (0);
3695     LOG (GNUNET_ERROR_TYPE_DEBUG, "  POLL not possible, peer disconnected\n");
3696     return;
3697   }
3698   LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL started on request\n");
3699   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
3700                                                 &connection_poll,
3701                                                 fc);
3702 }
3703
3704
3705 /**
3706  * @brief Stop polling a connection for ACKs.
3707  *
3708  * Once we have enough ACKs for future traffic, polls are no longer necessary.
3709  *
3710  * @param c Connection.
3711  * @param fwd Should we stop the poll in the FWD direction?
3712  */
3713 void
3714 GCC_stop_poll (struct CadetConnection *c, int fwd)
3715 {
3716   struct CadetFlowControl *fc;
3717
3718   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3719   if (NULL != fc->poll_task)
3720   {
3721     GNUNET_SCHEDULER_cancel (fc->poll_task);
3722     fc->poll_task = NULL;
3723   }
3724   if (NULL != fc->poll_msg)
3725   {
3726     GCC_cancel (fc->poll_msg);
3727     fc->poll_msg = NULL;
3728   }
3729 }
3730
3731
3732 /**
3733  * Get a (static) string for a connection.
3734  *
3735  * @param c Connection.
3736  */
3737 const char *
3738 GCC_2s (const struct CadetConnection *c)
3739 {
3740   if (NULL == c)
3741     return "NULL";
3742
3743   if (NULL != c->t)
3744   {
3745     static char buf[128];
3746
3747     SPRINTF (buf, "%s (->%s)",
3748              GNUNET_h2s (GC_h2hc (GCC_get_id (c))), GCT_2s (c->t));
3749     return buf;
3750   }
3751   return GNUNET_h2s (GC_h2hc (&c->id));
3752 }
3753
3754
3755 /**
3756  * Log all possible info about the connection state.
3757  *
3758  * @param c Connection to debug.
3759  * @param level Debug level to use.
3760  */
3761 void
3762 GCC_debug (const struct CadetConnection *c, enum GNUNET_ErrorType level)
3763 {
3764   int do_log;
3765   char *s;
3766
3767   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
3768                                        "cadet-con",
3769                                        __FILE__, __FUNCTION__, __LINE__);
3770   if (0 == do_log)
3771     return;
3772
3773   if (NULL == c)
3774   {
3775     LOG2 (level, "CCC DEBUG NULL CONNECTION\n");
3776     return;
3777   }
3778
3779   LOG2 (level, "CCC DEBUG CONNECTION %s\n", GCC_2s (c));
3780   s = path_2s (c->path);
3781   LOG2 (level, "CCC  path %s, own pos: %u\n", s, c->own_pos);
3782   GNUNET_free (s);
3783   LOG2 (level, "CCC  state: %s, destroy: %u\n",
3784         GCC_state2s (c->state), c->destroy);
3785   LOG2 (level, "CCC  pending messages: %u\n", c->pending_messages);
3786   if (NULL != c->perf)
3787     LOG2 (level, "CCC  us/byte: %f\n", c->perf->avg);
3788
3789   LOG2 (level, "CCC  FWD flow control:\n");
3790   LOG2 (level, "CCC   queue: %u/%u\n", c->fwd_fc.queue_n, c->fwd_fc.queue_max);
3791   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3792         c->fwd_fc.last_pid_sent, c->fwd_fc.last_pid_recv);
3793   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3794         c->fwd_fc.last_ack_sent, c->fwd_fc.last_ack_recv);
3795   LOG2 (level, "CCC   recv PID bitmap: %X\n", c->fwd_fc.recv_bitmap);
3796   LOG2 (level, "CCC   poll: task %d, msg  %p, msg_ack %p)\n",
3797         c->fwd_fc.poll_task, c->fwd_fc.poll_msg, c->fwd_fc.ack_msg);
3798
3799   LOG2 (level, "CCC  BCK flow control:\n");
3800   LOG2 (level, "CCC   queue: %u/%u\n", c->bck_fc.queue_n, c->bck_fc.queue_max);
3801   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3802         c->bck_fc.last_pid_sent, c->bck_fc.last_pid_recv);
3803   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3804         c->bck_fc.last_ack_sent, c->bck_fc.last_ack_recv);
3805   LOG2 (level, "CCC   recv PID bitmap: %X\n", c->bck_fc.recv_bitmap);
3806   LOG2 (level, "CCC   poll: task %d, msg  %p, msg_ack %p)\n",
3807         c->bck_fc.poll_task, c->bck_fc.poll_msg, c->bck_fc.ack_msg);
3808
3809   LOG2 (level, "CCC DEBUG CONNECTION END\n");
3810 }