26ccabbcd32bd92321c492f00353517e7699523a
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_connection.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2001-2015 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20 /**
21  * @file cadet/gnunet-service-cadet_connection.c
22  * @brief GNUnet CADET service connection handling
23  * @author Bartlomiej Polot
24  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_statistics_service.h"
28 #include "cadet_path.h"
29 #include "cadet_protocol.h"
30 #include "cadet.h"
31 #include "gnunet-service-cadet_connection.h"
32 #include "gnunet-service-cadet_peer.h"
33 #include "gnunet-service-cadet_tunnel.h"
34
35
36 /**
37  * Should we run somewhat expensive checks on our invariants?
38  */
39 #define CHECK_INVARIANTS 0
40
41
42 #define LOG(level, ...) GNUNET_log_from (level,"cadet-con",__VA_ARGS__)
43 #define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
44
45
46 #define CADET_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
47                                   GNUNET_TIME_UNIT_MINUTES,\
48                                   10)
49 #define AVG_MSGS                32
50
51
52 /******************************************************************************/
53 /********************************   STRUCTS  **********************************/
54 /******************************************************************************/
55
56 /**
57  * Struct to encapsulate all the Flow Control information to a peer to which
58  * we are directly connected (on a core level).
59  */
60 struct CadetFlowControl
61 {
62   /**
63    * Connection this controls.
64    */
65   struct CadetConnection *c;
66
67   /**
68    * How many messages are in the queue on this connection.
69    */
70   unsigned int queue_n;
71
72   /**
73    * How many messages do we accept in the queue.
74    */
75   unsigned int queue_max;
76
77   /**
78    * ID of the last packet sent towards the peer.
79    */
80   uint32_t last_pid_sent;
81
82   /**
83    * ID of the last packet received from the peer.
84    */
85   uint32_t last_pid_recv;
86
87   /**
88    * Bitmap of past 32 messages received:
89    * - LSB being @c last_pid_recv.
90    * - MSB being @c last_pid_recv - 31 (mod UINTMAX).
91    */
92   uint32_t recv_bitmap;
93
94   /**
95    * Last ACK sent to the peer (peer can't send more than this PID).
96    */
97   uint32_t last_ack_sent;
98
99   /**
100    * Last ACK sent towards the origin (for traffic towards leaf node).
101    */
102   uint32_t last_ack_recv;
103
104   /**
105    * Task to poll the peer in case of a lost ACK causes stall.
106    */
107   struct GNUNET_SCHEDULER_Task *poll_task;
108
109   /**
110    * How frequently to poll for ACKs.
111    */
112   struct GNUNET_TIME_Relative poll_time;
113
114   /**
115    * Queued poll message, to cancel if not necessary anymore (got ACK).
116    */
117   struct CadetConnectionQueue *poll_msg;
118
119   /**
120    * Queued poll message, to cancel if not necessary anymore (got ACK).
121    */
122   struct CadetConnectionQueue *ack_msg;
123 };
124
125 /**
126  * Keep a record of the last messages sent on this connection.
127  */
128 struct CadetConnectionPerformance
129 {
130   /**
131    * Circular buffer for storing measurements.
132    */
133   double usecsperbyte[AVG_MSGS];
134
135   /**
136    * Running average of @c usecsperbyte.
137    */
138   double avg;
139
140   /**
141    * How many values of @c usecsperbyte are valid.
142    */
143   uint16_t size;
144
145   /**
146    * Index of the next "free" position in @c usecsperbyte.
147    */
148   uint16_t idx;
149 };
150
151
152 /**
153  * Struct containing all information regarding a connection to a peer.
154  */
155 struct CadetConnection
156 {
157   /**
158    * Tunnel this connection is part of.
159    */
160   struct CadetTunnel *t;
161
162   /**
163    * Flow control information for traffic fwd.
164    */
165   struct CadetFlowControl fwd_fc;
166
167   /**
168    * Flow control information for traffic bck.
169    */
170   struct CadetFlowControl bck_fc;
171
172   /**
173    * Measure connection performance on the endpoint.
174    */
175   struct CadetConnectionPerformance *perf;
176
177   /**
178    * ID of the connection.
179    */
180   struct GNUNET_CADET_Hash id;
181
182   /**
183    * Path being used for the tunnel. At the origin of the connection
184    * it's a pointer to the destination's path pool, otherwise just a copy.
185    */
186   struct CadetPeerPath *path;
187
188   /**
189    * Task to keep the used paths alive at the owner,
190    * time tunnel out on all the other peers.
191    */
192   struct GNUNET_SCHEDULER_Task *fwd_maintenance_task;
193
194   /**
195    * Task to keep the used paths alive at the destination,
196    * time tunnel out on all the other peers.
197    */
198   struct GNUNET_SCHEDULER_Task *bck_maintenance_task;
199
200   /**
201    * Queue handle for maintainance traffic. One handle for FWD and BCK since
202    * one peer never needs to maintain both directions (no loopback connections).
203    */
204   struct CadetPeerQueue *maintenance_q;
205
206   /**
207    * Should equal #get_next_hop(), or NULL if that peer disconnected.
208    */
209   struct CadetPeer *next_peer;
210
211   /**
212    * Should equal #get_prev_hop(), or NULL if that peer disconnected.
213    */
214   struct CadetPeer *prev_peer;
215
216   /**
217    * State of the connection.
218    */
219   enum CadetConnectionState state;
220
221   /**
222    * Position of the local peer in the path.
223    */
224   unsigned int own_pos;
225
226   /**
227    * Pending message count.
228    */
229   unsigned int pending_messages;
230
231   /**
232    * Destroy flag:
233    * - if 0, connection in use.
234    * - if 1, destroy on last message.
235    * - if 2, connection is being destroyed don't re-enter.
236    */
237   int destroy;
238
239   /**
240    * In-connection-map flag. Sometimes, when @e destroy is set but
241    * actual destruction is delayed to enable us to finish processing
242    * queues (i.e. in the direction that is still working), we remove
243    * the connection from the map to prevent it from still being
244    * found (and used) by accident. This flag is set to #GNUNET_YES
245    * for a connection that is not in the #connections map.  Should
246    * only be #GNUNET_YES if #destroy is also non-zero.
247    */
248   int was_removed;
249
250   /**
251    * Counter to do exponential backoff when creating a connection (max 64).
252    */
253   unsigned short create_retry;
254
255   /**
256    * Task to check if connection has duplicates.
257    */
258   struct GNUNET_SCHEDULER_Task *check_duplicates_task;
259 };
260
261
262 /**
263  * Handle for messages queued but not yet sent.
264  */
265 struct CadetConnectionQueue
266 {
267   /**
268    * Peer queue handle, to cancel if necessary.
269    */
270   struct CadetPeerQueue *q;
271
272   /**
273    * Continuation to call once sent.
274    */
275   GCC_sent cont;
276
277   /**
278    * Closure for @e cont.
279    */
280   void *cont_cls;
281
282   /**
283    * Was this a forced message? (Do not account for it)
284    */
285   int forced;
286 };
287
288 /******************************************************************************/
289 /*******************************   GLOBALS  ***********************************/
290 /******************************************************************************/
291
292 /**
293  * Global handle to the statistics service.
294  */
295 extern struct GNUNET_STATISTICS_Handle *stats;
296
297 /**
298  * Local peer own ID (memory efficient handle).
299  */
300 extern GNUNET_PEER_Id myid;
301
302 /**
303  * Local peer own ID (full value).
304  */
305 extern struct GNUNET_PeerIdentity my_full_id;
306
307 /**
308  * Connections known, indexed by cid (CadetConnection).
309  */
310 static struct GNUNET_CONTAINER_MultiHashMap *connections;
311
312 /**
313  * How many connections are we willing to maintain.
314  * Local connections are always allowed, even if there are more connections than max.
315  */
316 static unsigned long long max_connections;
317
318 /**
319  * How many messages *in total* are we willing to queue, divide by number of
320  * connections to get connection queue size.
321  */
322 static unsigned long long max_msgs_queue;
323
324 /**
325  * How often to send path keepalives. Paths timeout after 4 missed.
326  */
327 static struct GNUNET_TIME_Relative refresh_connection_time;
328
329 /**
330  * How often to send path create / ACKs.
331  */
332 static struct GNUNET_TIME_Relative create_connection_time;
333
334
335 /******************************************************************************/
336 /********************************   STATIC  ***********************************/
337 /******************************************************************************/
338
339
340
341 #if 0 // avoid compiler warning for unused static function
342 static void
343 fc_debug (struct CadetFlowControl *fc)
344 {
345   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
346               fc->last_pid_recv, fc->last_ack_sent);
347   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
348               fc->last_pid_sent, fc->last_ack_recv);
349   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
350               fc->queue_n, fc->queue_max);
351 }
352
353 static void
354 connection_debug (struct CadetConnection *c)
355 {
356   if (NULL == c)
357   {
358     LOG (GNUNET_ERROR_TYPE_INFO, "DEBUG NULL CONNECTION\n");
359     return;
360   }
361   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
362               peer2s (c->t->peer), GCC_2s (c));
363   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
364               c->state, c->pending_messages);
365   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
366   fc_debug (&c->fwd_fc);
367   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
368   fc_debug (&c->bck_fc);
369 }
370 #endif
371
372
373 /**
374  * Schedule next keepalive task, taking in consideration
375  * the connection state and number of retries.
376  *
377  * @param c Connection for which to schedule the next keepalive.
378  * @param fwd Direction for the next keepalive.
379  */
380 static void
381 schedule_next_keepalive (struct CadetConnection *c, int fwd);
382
383
384 /**
385  * Resets the connection timeout task, some other message has done the
386  * task's job.
387  * - For the first peer on the direction this means to send
388  *   a keepalive or a path confirmation message (either create or ACK).
389  * - For all other peers, this means to destroy the connection,
390  *   due to lack of activity.
391  * Starts the timeout if no timeout was running (connection just created).
392  *
393  * @param c Connection whose timeout to reset.
394  * @param fwd Is this forward?
395  */
396 static void
397 connection_reset_timeout (struct CadetConnection *c, int fwd);
398
399
400 /**
401  * Get string description for tunnel state. Reentrant.
402  *
403  * @param s Tunnel state.
404  *
405  * @return String representation.
406  */
407 static const char *
408 GCC_state2s (enum CadetConnectionState s)
409 {
410   switch (s)
411   {
412     case CADET_CONNECTION_NEW:
413       return "CADET_CONNECTION_NEW";
414     case CADET_CONNECTION_SENT:
415       return "CADET_CONNECTION_SENT";
416     case CADET_CONNECTION_ACK:
417       return "CADET_CONNECTION_ACK";
418     case CADET_CONNECTION_READY:
419       return "CADET_CONNECTION_READY";
420     case CADET_CONNECTION_DESTROYED:
421       return "CADET_CONNECTION_DESTROYED";
422     case CADET_CONNECTION_BROKEN:
423       return "CADET_CONNECTION_BROKEN";
424     default:
425       GNUNET_break (0);
426       LOG (GNUNET_ERROR_TYPE_ERROR, " conn state %u unknown!\n", s);
427       return "CADET_CONNECTION_STATE_ERROR";
428   }
429 }
430
431
432 /**
433  * Initialize a Flow Control structure to the initial state.
434  *
435  * @param fc Flow Control structure to initialize.
436  */
437 static void
438 fc_init (struct CadetFlowControl *fc)
439 {
440   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
441   fc->last_pid_recv = (uint32_t) -1;
442   fc->last_ack_sent = (uint32_t) 0;
443   fc->last_ack_recv = (uint32_t) 0;
444   fc->poll_task = NULL;
445   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
446   fc->queue_n = 0;
447   fc->queue_max = (max_msgs_queue / max_connections) + 1;
448 }
449
450
451 /**
452  * Find a connection.
453  *
454  * @param cid Connection ID.
455  */
456 static struct CadetConnection *
457 connection_get (const struct GNUNET_CADET_Hash *cid)
458 {
459   return GNUNET_CONTAINER_multihashmap_get (connections, GC_h2hc (cid));
460 }
461
462
463 /**
464  * Change the connection state. Cannot change a connection marked as destroyed.
465  *
466  * @param c Connection to change.
467  * @param state New state to set.
468  */
469 static void
470 connection_change_state (struct CadetConnection* c,
471                          enum CadetConnectionState state)
472 {
473   LOG (GNUNET_ERROR_TYPE_DEBUG,
474        "Connection %s state %s -> %s\n",
475        GCC_2s (c), GCC_state2s (c->state), GCC_state2s (state));
476   if (CADET_CONNECTION_DESTROYED <= c->state) /* Destroyed or broken. */
477   {
478     LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
479     return;
480   }
481   c->state = state;
482   if (CADET_CONNECTION_READY == state)
483     c->create_retry = 1;
484 }
485
486
487 /**
488  * Callback called when a queued ACK message is sent.
489  *
490  * @param cls Closure (FC).
491  * @param c Connection this message was on.
492  * @param q Queue handler this call invalidates.
493  * @param type Type of message sent.
494  * @param fwd Was this a FWD going message?
495  * @param size Size of the message.
496  */
497 static void
498 ack_sent (void *cls,
499           struct CadetConnection *c,
500           struct CadetConnectionQueue *q,
501           uint16_t type, int fwd, size_t size)
502 {
503   struct CadetFlowControl *fc = cls;
504
505   fc->ack_msg = NULL;
506 }
507
508
509 /**
510  * Send an ACK on the connection, informing the predecessor about
511  * the available buffer space. Should not be called in case the peer
512  * is origin (no predecessor) in the @c fwd direction.
513  *
514  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
515  * the ACK itself goes "back" (dest->root).
516  *
517  * @param c Connection on which to send the ACK.
518  * @param buffer How much space free to advertise?
519  * @param fwd Is this FWD ACK? (Going dest -> root)
520  * @param force Don't optimize out.
521  */
522 static void
523 send_ack (struct CadetConnection *c, unsigned int buffer, int fwd, int force)
524 {
525   struct CadetFlowControl *next_fc;
526   struct CadetFlowControl *prev_fc;
527   struct GNUNET_CADET_ACK msg;
528   uint32_t ack;
529   int delta;
530
531   /* If origin, there is no connection to send ACKs. Wrong function! */
532   GCC_check_connections ();
533   if (GCC_is_origin (c, fwd))
534   {
535     LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s is origin in %s\n",
536          GCC_2s (c), GC_f2s (fwd));
537     GNUNET_break (0);
538     return;
539   }
540
541   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
542   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
543
544   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection send %s ack on %s\n",
545        GC_f2s (fwd), GCC_2s (c));
546
547   /* Check if we need to transmit the ACK. */
548   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
549   if (3 < delta && buffer < delta && GNUNET_NO == force)
550   {
551     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
552     LOG (GNUNET_ERROR_TYPE_DEBUG,
553          "  last pid recv: %u, last ack sent: %u\n",
554          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
555     GCC_check_connections ();
556     return;
557   }
558
559   /* Ok, ACK might be necessary, what PID to ACK? */
560   ack = prev_fc->last_pid_recv + buffer;
561   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
562   LOG (GNUNET_ERROR_TYPE_DEBUG,
563        " last pid %u, last ack %u, qmax %u, q %u\n",
564        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
565        next_fc->queue_max, next_fc->queue_n);
566   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
567   {
568     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
569     GCC_check_connections ();
570     return;
571   }
572
573   /* Check if message is already in queue */
574   if (NULL != prev_fc->ack_msg)
575   {
576     if (GC_is_pid_bigger (ack, prev_fc->last_ack_sent))
577     {
578       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
579       GCC_cancel (prev_fc->ack_msg);
580       /* GCC_cancel triggers ack_sent(), which clears fc->ack_msg */
581     }
582     else
583     {
584       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
585       GCC_check_connections ();
586       return;
587     }
588   }
589
590   prev_fc->last_ack_sent = ack;
591
592   /* Build ACK message and send on conn */
593   msg.header.size = htons (sizeof (msg));
594   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_ACK);
595   msg.ack = htonl (ack);
596   msg.cid = c->id;
597
598   prev_fc->ack_msg = GCC_send_prebuilt_message (&msg.header, UINT16_MAX, ack,
599                                                 c, !fwd, GNUNET_YES,
600                                                 &ack_sent, prev_fc);
601   GNUNET_assert (NULL != prev_fc->ack_msg);
602   GCC_check_connections ();
603 }
604
605
606 /**
607  * Callback called when a connection queued message is sent.
608  *
609  * Calculates the average time and connection packet tracking.
610  *
611  * @param cls Closure (ConnectionQueue Handle).
612  * @param c Connection this message was on.
613  * @param sent Was it really sent? (Could have been canceled)
614  * @param type Type of message sent.
615  * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
616  * @param fwd Was this a FWD going message?
617  * @param size Size of the message.
618  * @param wait Time spent waiting for core (only the time for THIS message)
619  * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise.
620  */
621 static int
622 conn_message_sent (void *cls,
623                    struct CadetConnection *c, int sent,
624                    uint16_t type, uint32_t pid, int fwd, size_t size,
625                    struct GNUNET_TIME_Relative wait)
626 {
627   struct CadetConnectionPerformance *p;
628   struct CadetFlowControl *fc;
629   struct CadetConnectionQueue *q = cls;
630   double usecsperbyte;
631   int forced;
632
633   GCC_check_connections ();
634   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
635
636   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
637
638   fc = fwd ? &c->fwd_fc : &c->bck_fc;
639   LOG (GNUNET_ERROR_TYPE_DEBUG, " %ssent %s %s pid %u\n",
640        sent ? "" : "not ", GC_f2s (fwd), GC_m2s (type), pid);
641   if (NULL != q)
642   {
643     forced = q->forced;
644     if (NULL != q->cont)
645     {
646       LOG (GNUNET_ERROR_TYPE_DEBUG, " calling cont\n");
647       q->cont (q->cont_cls, c, q, type, fwd, size);
648     }
649     GNUNET_free (q);
650   }
651   else if (type == GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED
652            || type == GNUNET_MESSAGE_TYPE_CADET_AX)
653   {
654     /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
655     forced = GNUNET_YES;
656   }
657   else
658   {
659     forced = GNUNET_NO;
660   }
661   if (NULL == c)
662   {
663     if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
664         && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
665     {
666       LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
667            GC_m2s (type));
668     }
669     GCC_check_connections ();
670     return GNUNET_NO;
671   }
672   LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P- %p %u\n", c, c->pending_messages);
673   c->pending_messages--;
674   if ( (GNUNET_YES == c->destroy) &&
675        (0 == c->pending_messages) )
676   {
677     LOG (GNUNET_ERROR_TYPE_DEBUG,
678          "!  destroying connection!\n");
679     GCC_destroy (c);
680     GCC_check_connections ();
681     return GNUNET_YES;
682   }
683   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
684   switch (type)
685   {
686     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
687     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
688       c->maintenance_q = NULL;
689       /* Don't trigger a keepalive for sent ACKs, only SYN and SYNACKs */
690       if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE == type || !fwd)
691         schedule_next_keepalive (c, fwd);
692       break;
693
694     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
695     case GNUNET_MESSAGE_TYPE_CADET_AX:
696       if (GNUNET_YES == sent)
697       {
698         GNUNET_assert (NULL != q);
699         fc->last_pid_sent = pid;
700         if (GC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
701           GCC_start_poll (c, fwd);
702         GCC_send_ack (c, fwd, GNUNET_NO);
703         connection_reset_timeout (c, fwd);
704       }
705
706       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
707       if (GNUNET_NO == forced)
708       {
709         fc->queue_n--;
710         LOG (GNUNET_ERROR_TYPE_DEBUG,
711             "!   accounting pid %u\n",
712             fc->last_pid_sent);
713       }
714       else
715       {
716         LOG (GNUNET_ERROR_TYPE_DEBUG,
717              "!   forced, Q_N not accounting pid %u\n",
718              fc->last_pid_sent);
719       }
720       break;
721
722     case GNUNET_MESSAGE_TYPE_CADET_KX:
723       if (GNUNET_YES == sent)
724         connection_reset_timeout (c, fwd);
725       break;
726
727     case GNUNET_MESSAGE_TYPE_CADET_POLL:
728       fc->poll_msg = NULL;
729       break;
730
731     case GNUNET_MESSAGE_TYPE_CADET_ACK:
732       fc->ack_msg = NULL;
733       break;
734
735     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
736     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
737       break;
738
739     default:
740       LOG (GNUNET_ERROR_TYPE_ERROR, "%s unknown\n", GC_m2s (type));
741       GNUNET_break (0);
742       break;
743   }
744   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
745
746   if (NULL == c->perf)
747     return GNUNET_NO; /* Only endpoints are interested in timing. */
748
749   p = c->perf;
750   usecsperbyte = ((double) wait.rel_value_us) / size;
751   if (p->size == AVG_MSGS)
752   {
753     /* Array is full. Substract oldest value, add new one and store. */
754     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
755     p->usecsperbyte[p->idx] = usecsperbyte;
756     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
757   }
758   else
759   {
760     /* Array not yet full. Add current value to avg and store. */
761     p->usecsperbyte[p->idx] = usecsperbyte;
762     p->avg *= p->size;
763     p->avg += p->usecsperbyte[p->idx];
764     p->size++;
765     p->avg /= p->size;
766   }
767   p->idx = (p->idx + 1) % AVG_MSGS;
768   GCC_check_connections ();
769   return GNUNET_NO;
770 }
771
772
773 /**
774  * Get the previous hop in a connection
775  *
776  * @param c Connection.
777  *
778  * @return Previous peer in the connection.
779  */
780 static struct CadetPeer *
781 get_prev_hop (const struct CadetConnection *c)
782 {
783   GNUNET_PEER_Id id;
784
785   if (NULL == c->path)
786     return NULL;
787   LOG (GNUNET_ERROR_TYPE_DEBUG,
788        " get prev hop %s [%u/%u]\n",
789        GCC_2s (c), c->own_pos, c->path->length);
790   if (0 == c->own_pos || c->path->length < 2)
791     id = c->path->peers[0];
792   else
793     id = c->path->peers[c->own_pos - 1];
794
795   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
796        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
797
798   return GCP_get_short (id, GNUNET_YES);
799 }
800
801
802 /**
803  * Get the next hop in a connection
804  *
805  * @param c Connection.
806  *
807  * @return Next peer in the connection.
808  */
809 static struct CadetPeer *
810 get_next_hop (const struct CadetConnection *c)
811 {
812   GNUNET_PEER_Id id;
813
814   if (NULL == c->path)
815     return NULL;
816
817   LOG (GNUNET_ERROR_TYPE_DEBUG, " get next hop %s [%u/%u]\n",
818        GCC_2s (c), c->own_pos, c->path->length);
819   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
820     id = c->path->peers[c->path->length - 1];
821   else
822     id = c->path->peers[c->own_pos + 1];
823
824   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
825        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
826
827   return GCP_get_short (id, GNUNET_YES);
828 }
829
830
831 /**
832  * Check that the direct neighbours (previous and next hop)
833  * are properly associated with this connection.
834  *
835  * @param c connection to check
836  */
837 static void
838 check_neighbours (const struct CadetConnection *c)
839 {
840   if (NULL == c->path)
841     return; /* nothing to check */
842   GCP_check_connection (get_next_hop (c), c);
843   GCP_check_connection (get_prev_hop (c), c);
844 }
845
846
847 /**
848  * Helper for #GCC_check_connections().  Calls #check_neighbours().
849  *
850  * @param cls NULL
851  * @param key ignored
852  * @param value the `struct CadetConnection` to check
853  * @return #GNUNET_OK (continue to iterate)
854  */
855 static int
856 check_connection (void *cls,
857                   const struct GNUNET_HashCode *key,
858                   void *value)
859 {
860   struct CadetConnection *c = value;
861
862   check_neighbours (c);
863   return GNUNET_OK;
864 }
865
866
867 /**
868  * Check invariants for all connections using #check_neighbours().
869  */
870 void
871 GCC_check_connections ()
872 {
873   if (0 == CHECK_INVARIANTS)
874     return;
875   if (NULL == connections)
876     return;
877   GNUNET_CONTAINER_multihashmap_iterate (connections,
878                                          &check_connection,
879                                          NULL);
880 }
881
882
883 /**
884  * Get the hop in a connection.
885  *
886  * @param c Connection.
887  * @param fwd Next in the FWD direction?
888  *
889  * @return Next peer in the connection.
890  */
891 static struct CadetPeer *
892 get_hop (struct CadetConnection *c, int fwd)
893 {
894   return (fwd) ? get_next_hop (c) : get_prev_hop (c);
895 }
896
897
898 /**
899  * Get a bit mask for a message received out-of-order.
900  *
901  * @param last_pid_recv Last PID we received prior to the out-of-order.
902  * @param ooo_pid PID of the out-of-order message.
903  */
904 static uint32_t
905 get_recv_bitmask (uint32_t last_pid_recv, uint32_t ooo_pid)
906 {
907   return 1 << (last_pid_recv - ooo_pid);
908 }
909
910
911 /**
912  * Check is an out-of-order message is ok:
913  * - at most 31 messages behind.
914  * - not duplicate.
915  *
916  * @param last_pid_recv Last in-order PID received.
917  */
918 static int
919 is_ooo_ok (uint32_t last_pid_recv, uint32_t ooo_pid, uint32_t ooo_bitmap)
920 {
921   uint32_t mask;
922
923   if (GC_is_pid_bigger (last_pid_recv - 31, ooo_pid))
924     return GNUNET_NO;
925
926   mask = get_recv_bitmask (last_pid_recv, ooo_pid);
927   if (0 != (ooo_bitmap & mask))
928     return GNUNET_NO;
929
930   return GNUNET_YES;
931 }
932
933
934 /**
935  * Is traffic coming from this sender 'FWD' traffic?
936  *
937  * @param c Connection to check.
938  * @param sender Peer identity of neighbor.
939  *
940  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
941  *         the traffic is 'FWD'.
942  *         #GNUNET_NO for BCK.
943  *         #GNUNET_SYSERR for errors.
944  */
945 static int
946 is_fwd (const struct CadetConnection *c,
947         const struct GNUNET_PeerIdentity *sender)
948 {
949   GNUNET_PEER_Id id;
950
951   id = GNUNET_PEER_search (sender);
952   if (GCP_get_short_id (get_prev_hop (c)) == id)
953     return GNUNET_YES;
954
955   if (GCP_get_short_id (get_next_hop (c)) == id)
956     return GNUNET_NO;
957
958   GNUNET_break (0);
959   return GNUNET_SYSERR;
960 }
961
962
963 /**
964  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
965  * or a first CONNECTION_ACK directed to us.
966  *
967  * @param connection Connection to confirm.
968  * @param fwd Should we send it FWD? (root->dest)
969  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
970  */
971 static void
972 send_connection_ack (struct CadetConnection *connection, int fwd)
973 {
974   struct CadetTunnel *t;
975   size_t size = sizeof (struct GNUNET_CADET_ConnectionACK);
976
977   GCC_check_connections ();
978   t = connection->t;
979   LOG (GNUNET_ERROR_TYPE_INFO,
980        "==> { C %s ACK} %19s on conn %s (%p) %s [%5u]\n",
981        GC_f2s (!fwd), "", GCC_2s (connection), connection, GC_f2s (fwd), size);
982   GCP_queue_add (get_hop (connection, fwd), NULL,
983                  GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, UINT16_MAX, 0,
984                  size, connection, fwd, &conn_message_sent, NULL);
985   connection->pending_messages++;
986   if (CADET_TUNNEL_NEW == GCT_get_cstate (t))
987     GCT_change_cstate (t, CADET_TUNNEL_WAITING);
988   if (CADET_CONNECTION_READY != connection->state)
989     connection_change_state (connection, CADET_CONNECTION_SENT);
990   GCC_check_connections ();
991 }
992
993
994 /**
995  * Send a notification that a connection is broken.
996  *
997  * @param c Connection that is broken.
998  * @param id1 Peer that has disconnected.
999  * @param id2 Peer that has disconnected.
1000  * @param fwd Direction towards which to send it.
1001  */
1002 static void
1003 send_broken (struct CadetConnection *c,
1004              const struct GNUNET_PeerIdentity *id1,
1005              const struct GNUNET_PeerIdentity *id2,
1006              int fwd)
1007 {
1008   struct GNUNET_CADET_ConnectionBroken msg;
1009
1010   GCC_check_connections ();
1011   msg.header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
1012   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
1013   msg.cid = c->id;
1014   msg.peer1 = *id1;
1015   msg.peer2 = *id2;
1016   GNUNET_assert (NULL ==
1017                  GCC_send_prebuilt_message (&msg.header, UINT16_MAX, 0, c, fwd,
1018                                             GNUNET_YES, NULL, NULL));
1019   GCC_check_connections ();
1020 }
1021
1022
1023 /**
1024  * Send a notification that a connection is broken, when a connection
1025  * isn't even known to the local peer or soon to be destroyed.
1026  *
1027  * @param connection_id Connection ID.
1028  * @param id1 Peer that has disconnected, probably local peer.
1029  * @param id2 Peer that has disconnected can be NULL if unknown.
1030  * @param peer Peer to notify (neighbor who sent the connection).
1031  */
1032 static void
1033 send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id,
1034                      const struct GNUNET_PeerIdentity *id1,
1035                      const struct GNUNET_PeerIdentity *id2,
1036                      const struct GNUNET_PeerIdentity *peer_id)
1037 {
1038   struct GNUNET_CADET_ConnectionBroken *msg;
1039   struct CadetPeer *neighbor;
1040
1041   GCC_check_connections ();
1042   LOG (GNUNET_ERROR_TYPE_INFO, "--> BROKEN on unknown connection %s\n",
1043        GNUNET_h2s (GC_h2hc (connection_id)));
1044
1045   msg = GNUNET_new (struct GNUNET_CADET_ConnectionBroken);
1046   msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionBroken));
1047   msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
1048   msg->cid = *connection_id;
1049   msg->peer1 = *id1;
1050   if (NULL != id2)
1051     msg->peer2 = *id2;
1052   else
1053     memset (&msg->peer2, 0, sizeof (msg->peer2));
1054   neighbor = GCP_get (peer_id, GNUNET_NO); /* We MUST know neighbor. */
1055   GNUNET_assert (NULL != neighbor);
1056   GCP_queue_add (neighbor, msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1057                  UINT16_MAX, 2, sizeof (struct GNUNET_CADET_ConnectionBroken),
1058                  NULL, GNUNET_SYSERR, /* connection, fwd */
1059                  NULL, NULL); /* continuation */
1060   GCC_check_connections ();
1061 }
1062
1063
1064 /**
1065  * Send keepalive packets for a connection.
1066  *
1067  * @param c Connection to keep alive..
1068  * @param fwd Is this a FWD keepalive? (owner -> dest).
1069  */
1070 static void
1071 send_connection_keepalive (struct CadetConnection *c, int fwd)
1072 {
1073   struct GNUNET_MessageHeader msg;
1074   struct CadetFlowControl *fc;
1075
1076   GCC_check_connections ();
1077   LOG (GNUNET_ERROR_TYPE_INFO,
1078        "keepalive %s for connection %s\n",
1079        GC_f2s (fwd), GCC_2s (c));
1080
1081   GNUNET_assert (NULL != c->t);
1082   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1083   if (0 < fc->queue_n || GNUNET_YES == GCT_has_queued_traffic (c->t))
1084   {
1085     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, traffic in queue\n");
1086     return;
1087   }
1088
1089   GNUNET_STATISTICS_update (stats, "# keepalives sent", 1, GNUNET_NO);
1090
1091   GNUNET_assert (NULL != c->t);
1092   msg.size = htons (sizeof (msg));
1093   msg.type = htons (GNUNET_MESSAGE_TYPE_CADET_KEEPALIVE);
1094
1095   GNUNET_assert (NULL ==
1096                  GCT_send_prebuilt_message (&msg, c->t, c,
1097                                             GNUNET_NO, NULL, NULL));
1098   GCC_check_connections ();
1099 }
1100
1101
1102 /**
1103  * Send CONNECTION_{CREATE/ACK} packets for a connection.
1104  *
1105  * @param c Connection for which to send the message.
1106  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
1107  */
1108 static void
1109 connection_recreate (struct CadetConnection *c, int fwd)
1110 {
1111   LOG (GNUNET_ERROR_TYPE_DEBUG,
1112        "sending connection recreate\n");
1113   if (fwd)
1114     GCC_send_create (c);
1115   else
1116     send_connection_ack (c, GNUNET_NO);
1117 }
1118
1119
1120 /**
1121  * Generic connection timer management.
1122  * Depending on the role of the peer in the connection will send the
1123  * appropriate message (build or keepalive)
1124  *
1125  * @param c Conncetion to maintain.
1126  * @param fwd Is FWD?
1127  */
1128 static void
1129 connection_maintain (struct CadetConnection *c, int fwd)
1130 {
1131   if (GNUNET_NO != c->destroy)
1132   {
1133     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, being destroyed\n");
1134     return;
1135   }
1136
1137   if (NULL == c->t)
1138   {
1139     GNUNET_break (0);
1140     GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
1141     return;
1142   }
1143
1144   if (CADET_TUNNEL_SEARCHING == GCT_get_cstate (c->t))
1145   {
1146     /* If status is SEARCHING, why is there a connection? Should be WAITING */
1147     GNUNET_break (0);
1148     GCT_debug (c->t, GNUNET_ERROR_TYPE_ERROR);
1149     LOG (GNUNET_ERROR_TYPE_INFO, "not sending keepalive, tunnel SEARCHING\n");
1150     schedule_next_keepalive (c, fwd);
1151     return;
1152   }
1153   switch (c->state)
1154   {
1155     case CADET_CONNECTION_NEW:
1156       GNUNET_break (0);
1157       /* fall-through */
1158     case CADET_CONNECTION_SENT:
1159       connection_recreate (c, fwd);
1160       break;
1161     case CADET_CONNECTION_READY:
1162       send_connection_keepalive (c, fwd);
1163       break;
1164     default:
1165       break;
1166   }
1167 }
1168
1169
1170 /**
1171  * Keep the connection alive.
1172  *
1173  * @param c Connection to keep alive.
1174  * @param fwd Direction.
1175  * @param shutdown Are we shutting down? (Don't send traffic)
1176  *                 Non-zero value for true, not necessarily GNUNET_YES.
1177  */
1178 static void
1179 connection_keepalive (struct CadetConnection *c, int fwd, int shutdown)
1180 {
1181   GCC_check_connections ();
1182   LOG (GNUNET_ERROR_TYPE_DEBUG,
1183        "%s keepalive for %s\n",
1184        GC_f2s (fwd), GCC_2s (c));
1185
1186   if (fwd)
1187     c->fwd_maintenance_task = NULL;
1188   else
1189     c->bck_maintenance_task = NULL;
1190
1191   if (GNUNET_NO != shutdown)
1192     return;
1193
1194   connection_maintain (c, fwd);
1195   GCC_check_connections ();
1196   /* Next execution will be scheduled by message_sent or _maintain*/
1197 }
1198
1199
1200 /**
1201  * Keep the connection alive in the FWD direction.
1202  *
1203  * @param cls Closure (connection to keepalive).
1204  * @param tc TaskContext.
1205  */
1206 static void
1207 connection_fwd_keepalive (void *cls,
1208                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1209 {
1210   GCC_check_connections ();
1211   connection_keepalive ((struct CadetConnection *) cls,
1212                         GNUNET_YES,
1213                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1214   GCC_check_connections ();
1215 }
1216
1217
1218 /**
1219  * Keep the connection alive in the BCK direction.
1220  *
1221  * @param cls Closure (connection to keepalive).
1222  * @param tc TaskContext.
1223  */
1224 static void
1225 connection_bck_keepalive (void *cls,
1226                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1227 {
1228   GCC_check_connections ();
1229   connection_keepalive ((struct CadetConnection *) cls,
1230                         GNUNET_NO,
1231                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1232   GCC_check_connections ();
1233 }
1234
1235
1236 /**
1237  * Schedule next keepalive task, taking in consideration
1238  * the connection state and number of retries.
1239  *
1240  * If the peer is not the origin, do nothing.
1241  *
1242  * @param c Connection for which to schedule the next keepalive.
1243  * @param fwd Direction for the next keepalive.
1244  */
1245 static void
1246 schedule_next_keepalive (struct CadetConnection *c, int fwd)
1247 {
1248   struct GNUNET_TIME_Relative delay;
1249   struct GNUNET_SCHEDULER_Task * *task_id;
1250   GNUNET_SCHEDULER_TaskCallback keepalive_task;
1251
1252   GCC_check_connections ();
1253   if (GNUNET_NO == GCC_is_origin (c, fwd))
1254     return;
1255
1256   /* Calculate delay to use, depending on the state of the connection */
1257   if (CADET_CONNECTION_READY == c->state)
1258   {
1259     delay = refresh_connection_time;
1260   }
1261   else
1262   {
1263     if (1 > c->create_retry)
1264       c->create_retry = 1;
1265     delay = GNUNET_TIME_relative_multiply (create_connection_time,
1266                                            c->create_retry);
1267     if (c->create_retry < 64)
1268       c->create_retry *= 2;
1269   }
1270
1271   /* Select direction-dependent parameters */
1272   if (GNUNET_YES == fwd)
1273   {
1274     task_id = &c->fwd_maintenance_task;
1275     keepalive_task = &connection_fwd_keepalive;
1276   }
1277   else
1278   {
1279     task_id = &c->bck_maintenance_task;
1280     keepalive_task = &connection_bck_keepalive;
1281   }
1282
1283   /* Check that no one scheduled it before us */
1284   if (NULL != *task_id)
1285   {
1286     /* No need for a _break. It can happen for instance when sending a SYNACK
1287      * for a duplicate SYN: the first SYNACK scheduled the task. */
1288     GNUNET_SCHEDULER_cancel (*task_id);
1289   }
1290
1291   /* Schedule the task */
1292   *task_id = GNUNET_SCHEDULER_add_delayed (delay,
1293                                            keepalive_task,
1294                                            c);
1295   LOG (GNUNET_ERROR_TYPE_DEBUG,
1296        "next keepalive in %s\n",
1297        GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1298   GCC_check_connections ();
1299 }
1300
1301
1302 /**
1303  * @brief Re-initiate traffic on this connection if necessary.
1304  *
1305  * Check if there is traffic queued towards this peer
1306  * and the core transmit handle is NULL (traffic was stalled).
1307  * If so, call core tmt rdy.
1308  *
1309  * @param c Connection on which initiate traffic.
1310  * @param fwd Is this about fwd traffic?
1311  */
1312 static void
1313 connection_unlock_queue (struct CadetConnection *c, int fwd)
1314 {
1315   struct CadetPeer *peer;
1316
1317   GCC_check_connections ();
1318   LOG (GNUNET_ERROR_TYPE_DEBUG,
1319        "connection_unlock_queue %s on %s\n",
1320        GC_f2s (fwd), GCC_2s (c));
1321
1322   if (GCC_is_terminal (c, fwd))
1323   {
1324     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal, can unlock!\n");
1325     return;
1326   }
1327
1328   peer = get_hop (c, fwd);
1329   GCP_queue_unlock (peer, c);
1330   GCC_check_connections ();
1331 }
1332
1333
1334 /**
1335  * Cancel all transmissions that belong to a certain connection.
1336  *
1337  * If the connection is scheduled for destruction and no more messages are left,
1338  * the connection will be destroyed by the continuation call.
1339  *
1340  * @param c Connection which to cancel. Might be destroyed during this call.
1341  * @param fwd Cancel fwd traffic?
1342  */
1343 static void
1344 connection_cancel_queues (struct CadetConnection *c,
1345                           int fwd)
1346 {
1347   struct CadetFlowControl *fc;
1348   struct CadetPeer *peer;
1349
1350   GCC_check_connections ();
1351   LOG (GNUNET_ERROR_TYPE_DEBUG,
1352        "Cancel %s queues for connection %s\n",
1353        GC_f2s (fwd), GCC_2s (c));
1354   if (NULL == c)
1355   {
1356     GNUNET_break (0);
1357     return;
1358   }
1359
1360   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1361   if (NULL != fc->poll_task)
1362   {
1363     GNUNET_SCHEDULER_cancel (fc->poll_task);
1364     fc->poll_task = NULL;
1365     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cancelled POLL task for fc %p\n", fc);
1366   }
1367   if (NULL != fc->poll_msg)
1368   {
1369     GCC_cancel (fc->poll_msg);
1370     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cancelled POLL msg for fc %p\n", fc);
1371   }
1372   peer = get_hop (c, fwd);
1373   GCP_queue_cancel (peer, c);
1374   GCC_check_connections ();
1375 }
1376
1377
1378 /**
1379  * Function called if a connection has been stalled for a while,
1380  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1381  *
1382  * @param cls Closure (poll ctx).
1383  * @param tc TaskContext.
1384  */
1385 static void
1386 connection_poll (void *cls,
1387                  const struct GNUNET_SCHEDULER_TaskContext *tc);
1388
1389
1390 /**
1391  * Callback called when a queued POLL message is sent.
1392  *
1393  * @param cls Closure (FC).
1394  * @param c Connection this message was on.
1395  * @param q Queue handler this call invalidates.
1396  * @param type Type of message sent.
1397  * @param fwd Was this a FWD going message?
1398  * @param size Size of the message.
1399  */
1400 static void
1401 poll_sent (void *cls,
1402            struct CadetConnection *c,
1403            struct CadetConnectionQueue *q,
1404            uint16_t type, int fwd, size_t size)
1405 {
1406   struct CadetFlowControl *fc = cls;
1407
1408   if (2 == c->destroy)
1409   {
1410     LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL canceled on shutdown\n");
1411     return;
1412   }
1413   LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL sent for %s, scheduling new one!\n",
1414        GCC_2s (c));
1415   fc->poll_msg = NULL;
1416   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
1417   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1418                                                 &connection_poll,
1419                                                 fc);
1420   LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
1421 }
1422
1423
1424 /**
1425  * Function called if a connection has been stalled for a while,
1426  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1427  *
1428  * @param cls Closure (poll ctx).
1429  * @param tc TaskContext.
1430  */
1431 static void
1432 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1433 {
1434   struct CadetFlowControl *fc = cls;
1435   struct GNUNET_CADET_Poll msg;
1436   struct CadetConnection *c;
1437   int fwd;
1438
1439   fc->poll_task = NULL;
1440   GCC_check_connections ();
1441   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1442   {
1443     return;
1444   }
1445
1446   c = fc->c;
1447   fwd = fc == &c->fwd_fc;
1448   LOG (GNUNET_ERROR_TYPE_DEBUG, "Polling connection %s %s\n",
1449        GCC_2s (c),  GC_f2s (fwd));
1450
1451   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_POLL);
1452   msg.header.size = htons (sizeof (msg));
1453   msg.pid = htonl (fc->last_pid_sent);
1454   LOG (GNUNET_ERROR_TYPE_DEBUG, " last pid sent: %u\n", fc->last_pid_sent);
1455   fc->poll_msg =
1456       GCC_send_prebuilt_message (&msg.header, UINT16_MAX, fc->last_pid_sent, c,
1457                                  fc == &c->fwd_fc, GNUNET_YES, &poll_sent, fc);
1458   GNUNET_assert (NULL != fc->poll_msg);
1459   GCC_check_connections ();
1460 }
1461
1462
1463 /**
1464  * Resend all queued messages for a connection on other connections of the
1465  * same tunnel, if possible. The connection WILL BE DESTROYED by this function.
1466  *
1467  * @param c Connection whose messages to resend.
1468  * @param fwd Resend fwd messages?
1469  */
1470 static void
1471 resend_messages_and_destroy (struct CadetConnection *c, int fwd)
1472 {
1473   struct GNUNET_MessageHeader *out_msg;
1474   struct CadetTunnel *t = c->t;
1475   struct CadetPeer *neighbor;
1476   unsigned int pending;
1477   int destroyed;
1478
1479   GCC_check_connections ();
1480   c->state = CADET_CONNECTION_DESTROYED;
1481   c->destroy = GNUNET_YES;
1482
1483   destroyed = GNUNET_NO;
1484   neighbor = get_hop (c, fwd);
1485   pending = c->pending_messages;
1486
1487   while (NULL != (out_msg = GCP_connection_pop (neighbor, c, &destroyed)))
1488   {
1489     if (NULL != t)
1490       GCT_resend_message (out_msg, t);
1491     GNUNET_free (out_msg);
1492   }
1493
1494   /* All pending messages should have been popped,
1495    * and the connection destroyed by the continuation.
1496    */
1497   if (GNUNET_YES != destroyed)
1498   {
1499     if (0 != pending)
1500     {
1501       GNUNET_break (0);
1502       GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
1503       if (NULL != t) GCT_debug (t, GNUNET_ERROR_TYPE_ERROR);
1504     }
1505     GCC_destroy (c);
1506   }
1507   GCC_check_connections ();
1508 }
1509
1510
1511 /**
1512  * Generic connection timeout implementation.
1513  *
1514  * Timeout function due to lack of keepalive/traffic from an endpoint.
1515  * Destroys connection if called.
1516  *
1517  * @param c Connection to destroy.
1518  * @param fwd Was the timeout from the origin? (FWD timeout)
1519  */
1520 static void
1521 connection_timeout (struct CadetConnection *c, int fwd)
1522 {
1523   struct CadetFlowControl *reverse_fc;
1524
1525   GCC_check_connections ();
1526   reverse_fc = fwd ? &c->bck_fc : &c->fwd_fc;
1527
1528   LOG (GNUNET_ERROR_TYPE_INFO,
1529        "Connection %s %s timed out. Destroying.\n",
1530        GCC_2s (c),
1531        GC_f2s (fwd));
1532   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
1533
1534   if (GCC_is_origin (c, fwd)) /* Loopback? Something is wrong! */
1535   {
1536     GNUNET_break (0);
1537     return;
1538   }
1539
1540   /* If dest, salvage queued traffic. */
1541   if (GCC_is_origin (c, !fwd))
1542   {
1543     const struct GNUNET_PeerIdentity *next_hop;
1544
1545     next_hop = GCP_get_id (fwd ? get_prev_hop (c) : get_next_hop (c));
1546     send_broken_unknown (&c->id, &my_full_id, NULL, next_hop);
1547     if (0 < reverse_fc->queue_n)
1548       resend_messages_and_destroy (c, !fwd);
1549     GCC_check_connections ();
1550     return;
1551   }
1552
1553   GCC_destroy (c);
1554   GCC_check_connections ();
1555 }
1556
1557
1558 /**
1559  * Timeout function due to lack of keepalive/traffic from the owner.
1560  * Destroys connection if called.
1561  *
1562  * @param cls Closure (connection to destroy).
1563  * @param tc TaskContext.
1564  */
1565 static void
1566 connection_fwd_timeout (void *cls,
1567                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1568 {
1569   struct CadetConnection *c = cls;
1570
1571   c->fwd_maintenance_task = NULL;
1572   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1573     return;
1574   GCC_check_connections ();
1575   connection_timeout (c, GNUNET_YES);
1576   GCC_check_connections ();
1577 }
1578
1579
1580 /**
1581  * Timeout function due to lack of keepalive/traffic from the destination.
1582  * Destroys connection if called.
1583  *
1584  * @param cls Closure (connection to destroy).
1585  * @param tc TaskContext
1586  */
1587 static void
1588 connection_bck_timeout (void *cls,
1589                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1590 {
1591   struct CadetConnection *c = cls;
1592
1593   c->bck_maintenance_task = NULL;
1594   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1595     return;
1596   GCC_check_connections ();
1597   connection_timeout (c, GNUNET_NO);
1598   GCC_check_connections ();
1599 }
1600
1601
1602 /**
1603  * Resets the connection timeout task, some other message has done the
1604  * task's job.
1605  * - For the first peer on the direction this means to send
1606  *   a keepalive or a path confirmation message (either create or ACK).
1607  * - For all other peers, this means to destroy the connection,
1608  *   due to lack of activity.
1609  * Starts the timeout if no timeout was running (connection just created).
1610  *
1611  * @param c Connection whose timeout to reset.
1612  * @param fwd Is this forward?
1613  *
1614  * TODO use heap to improve efficiency of scheduler.
1615  */
1616 static void
1617 connection_reset_timeout (struct CadetConnection *c, int fwd)
1618 {
1619   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GC_f2s (fwd));
1620   if (GCC_is_origin (c, fwd)) /* Startpoint */
1621   {
1622     schedule_next_keepalive (c, fwd);
1623   }
1624   else /* Relay, endpoint. */
1625   {
1626     struct GNUNET_TIME_Relative delay;
1627     struct GNUNET_SCHEDULER_Task * *ti;
1628     GNUNET_SCHEDULER_TaskCallback f;
1629
1630     ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1631
1632     if (NULL != *ti)
1633       GNUNET_SCHEDULER_cancel (*ti);
1634     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1635     LOG (GNUNET_ERROR_TYPE_DEBUG,
1636          "  timing out in %s\n",
1637          GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_NO));
1638     f = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1639     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1640   }
1641 }
1642
1643
1644 /**
1645  * Iterator to compare each connection's path with the path of a new connection.
1646  *
1647  * If the connection conincides, the c member of path is set to the connection
1648  * and the destroy flag of the connection is set.
1649  *
1650  * @param cls Closure (new path).
1651  * @param c Connection in the tunnel to check.
1652  */
1653 static void
1654 check_path (void *cls, struct CadetConnection *c)
1655 {
1656   struct CadetConnection *new_conn = cls;
1657   struct CadetPeerPath *path = new_conn->path;
1658
1659   LOG (GNUNET_ERROR_TYPE_DEBUG, "  checking %s (%p), length %u\n",
1660        GCC_2s (c), c, c->path->length);
1661
1662   if (c != new_conn
1663       && c->destroy == GNUNET_NO
1664       && c->state != CADET_CONNECTION_BROKEN
1665       && c->state != CADET_CONNECTION_DESTROYED
1666       && path_equivalent (path, c->path))
1667   {
1668     new_conn->destroy = GNUNET_YES;
1669     new_conn->path->c = c;
1670     LOG (GNUNET_ERROR_TYPE_DEBUG, "  MATCH!\n");
1671   }
1672 }
1673
1674
1675 /**
1676  * Finds out if this path is already being used by an existing connection.
1677  *
1678  * Checks the tunnel towards the destination to see if it contains
1679  * any connection with the same path.
1680  *
1681  * If the existing connection is ready, it is kept.
1682  * Otherwise if the sender has a smaller ID that ours, we accept it (and
1683  * the peer will eventually reject our attempt).
1684  *
1685  * @param path Path to check.
1686  * @return #GNUNET_YES if the tunnel has a connection with the same path,
1687  *         #GNUNET_NO otherwise.
1688  */
1689 static int
1690 does_connection_exist (struct CadetConnection *conn)
1691 {
1692   struct CadetPeer *p;
1693   struct CadetTunnel *t;
1694   struct CadetConnection *c;
1695
1696   p = GCP_get_short (conn->path->peers[0], GNUNET_NO);
1697   if (NULL == p)
1698     return GNUNET_NO;
1699   t = GCP_get_tunnel (p);
1700   if (NULL == t)
1701     return GNUNET_NO;
1702
1703   LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking for duplicates\n");
1704
1705   GCT_iterate_connections (t, &check_path, conn);
1706
1707   if (GNUNET_YES == conn->destroy)
1708   {
1709     c = conn->path->c;
1710     conn->destroy = GNUNET_NO;
1711     conn->path->c = conn;
1712     LOG (GNUNET_ERROR_TYPE_DEBUG, " found duplicate of %s\n", GCC_2s (conn));
1713     LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate: %s\n", GCC_2s (c));
1714     GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
1715     if (CADET_CONNECTION_READY == c->state)
1716     {
1717       /* The other peer confirmed a live connection with this path,
1718        * why is it trying to duplicate it. */
1719       GNUNET_STATISTICS_update (stats, "# duplicate connections", 1, GNUNET_NO);
1720       return GNUNET_YES;
1721     }
1722     LOG (GNUNET_ERROR_TYPE_DEBUG, " duplicate not valid, connection unique\n");
1723     return GNUNET_NO;
1724   }
1725   else
1726   {
1727     LOG (GNUNET_ERROR_TYPE_DEBUG, " %s has no duplicates\n", GCC_2s (conn));
1728     return GNUNET_NO;
1729   }
1730 }
1731
1732
1733 /**
1734  * @brief Check if the tunnel this connection belongs to has any other
1735  * connection with the same path, and destroy one if so.
1736  *
1737  * @param cls Closure (connection to check).
1738  * @param tc Task context.
1739  */
1740 static void
1741 check_duplicates (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1742 {
1743   struct CadetConnection *c = cls;
1744
1745   c->check_duplicates_task = NULL;
1746   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1747     return;
1748
1749   if (GNUNET_YES == does_connection_exist (c))
1750   {
1751     GCT_debug (c->t, GNUNET_ERROR_TYPE_DEBUG);
1752     send_broken (c, &my_full_id, &my_full_id, GCC_is_origin (c, GNUNET_YES));
1753     GCC_destroy (c);
1754   }
1755 }
1756
1757
1758
1759 /**
1760  * Wait for enough time to let any dead connections time out and check for
1761  * any remaining duplicates.
1762  *
1763  * @param c Connection that is a potential duplicate.
1764  */
1765 static void
1766 schedule_check_duplicates (struct CadetConnection *c)
1767 {
1768   struct GNUNET_TIME_Relative delay;
1769
1770   if (NULL != c->check_duplicates_task)
1771     return;
1772
1773   delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 5);
1774   c->check_duplicates_task = GNUNET_SCHEDULER_add_delayed (delay,
1775                                                            &check_duplicates,
1776                                                            c);
1777 }
1778
1779
1780
1781 /**
1782  * Add the connection to the list of both neighbors.
1783  *
1784  * @param c Connection.
1785  *
1786  * @return #GNUNET_OK if everything went fine
1787  *         #GNUNET_SYSERR if the was an error and @c c is malformed.
1788  */
1789 static int
1790 register_neighbors (struct CadetConnection *c)
1791 {
1792   c->next_peer = get_next_hop (c);
1793   c->prev_peer = get_prev_hop (c);
1794   GNUNET_assert (c->next_peer != c->prev_peer);
1795   LOG (GNUNET_ERROR_TYPE_DEBUG,
1796        "register neighbors for connection %s\n",
1797        GCC_2s (c));
1798   path_debug (c->path);
1799   LOG (GNUNET_ERROR_TYPE_DEBUG,
1800        "own pos %u\n", c->own_pos);
1801   LOG (GNUNET_ERROR_TYPE_DEBUG,
1802        "putting connection %s to next peer %p\n",
1803        GCC_2s (c),
1804        c->next_peer);
1805   LOG (GNUNET_ERROR_TYPE_DEBUG, "next peer %p %s\n",
1806        c->next_peer,
1807        GCP_2s (c->next_peer));
1808   LOG (GNUNET_ERROR_TYPE_DEBUG,
1809        "putting connection %s to prev peer %p\n",
1810        GCC_2s (c),
1811        c->prev_peer);
1812   LOG (GNUNET_ERROR_TYPE_DEBUG,
1813        "prev peer %p %s\n",
1814        c->prev_peer,
1815        GCP_2s (c->prev_peer));
1816
1817   if ( (GNUNET_NO == GCP_is_neighbor (c->next_peer)) ||
1818        (GNUNET_NO == GCP_is_neighbor (c->prev_peer)) )
1819   {
1820     if (GCC_is_origin (c, GNUNET_YES))
1821       GNUNET_STATISTICS_update (stats, "# local bad paths", 1, GNUNET_NO);
1822     GNUNET_STATISTICS_update (stats, "# bad paths", 1, GNUNET_NO);
1823
1824     LOG (GNUNET_ERROR_TYPE_DEBUG,
1825          "  register neighbors failed\n");
1826     LOG (GNUNET_ERROR_TYPE_DEBUG,
1827          "  prev: %s, neighbor?: %d\n",
1828          GCP_2s (c->prev_peer),
1829          GCP_is_neighbor (c->prev_peer));
1830     LOG (GNUNET_ERROR_TYPE_DEBUG,
1831          "  next: %s, neighbor?: %d\n",
1832          GCP_2s (c->next_peer),
1833          GCP_is_neighbor (c->next_peer));
1834     return GNUNET_SYSERR;
1835   }
1836   GCP_add_connection (c->next_peer, c, GNUNET_NO);
1837   GCP_add_connection (c->prev_peer, c, GNUNET_YES);
1838
1839   return GNUNET_OK;
1840 }
1841
1842
1843 /**
1844  * Remove the connection from the list of both neighbors.
1845  *
1846  * @param c Connection.
1847  */
1848 static void
1849 unregister_neighbors (struct CadetConnection *c)
1850 {
1851 //  struct CadetPeer *peer; FIXME dont use next_peer, prev_peer
1852   /* Either already unregistered or never got registered, it's ok either way. */
1853   if (NULL == c->path)
1854     return;
1855   if (NULL != c->next_peer)
1856   {
1857     GCP_remove_connection (c->next_peer, c);
1858     c->next_peer = NULL;
1859   }
1860   if (NULL != c->prev_peer)
1861   {
1862     GCP_remove_connection (c->prev_peer, c);
1863     c->prev_peer = NULL;
1864   }
1865 }
1866
1867
1868 /**
1869  * Invalidates all paths towards all peers that comprise the connection which
1870  * rely on the disconnected peer.
1871  *
1872  * ~O(n^3) (peers in connection * paths/peer * links/path)
1873  *
1874  * @param c Connection whose peers' paths to clean.
1875  * @param disconnected Peer that disconnected.
1876  */
1877 static void
1878 invalidate_paths (struct CadetConnection *c, struct CadetPeer *disconnected)
1879 {
1880   struct CadetPeer *peer;
1881   unsigned int i;
1882
1883   for (i = 0; i < c->path->length; i++)
1884   {
1885     peer = GCP_get_short (c->path->peers[i], GNUNET_NO);
1886     if (NULL != peer)
1887       GCP_notify_broken_link (peer, &my_full_id, GCP_get_id (disconnected));
1888   }
1889 }
1890
1891
1892 /**
1893  * Bind the connection to the peer and the tunnel to that peer.
1894  *
1895  * If the peer has no tunnel, create one. Update tunnel and connection
1896  * data structres to reflect new status.
1897  *
1898  * @param c Connection.
1899  * @param peer Peer.
1900  */
1901 static void
1902 add_to_peer (struct CadetConnection *c,
1903              struct CadetPeer *peer)
1904 {
1905   GCP_add_tunnel (peer);
1906   c->t = GCP_get_tunnel (peer);
1907   GCT_add_connection (c->t, c);
1908 }
1909
1910
1911 /**
1912  * Log receipt of message on stderr (INFO level).
1913  *
1914  * @param message Message received.
1915  * @param peer Peer who sent the message.
1916  * @param hash Connection ID.
1917  */
1918 static void
1919 log_message (const struct GNUNET_MessageHeader *message,
1920              const struct GNUNET_PeerIdentity *peer,
1921              const struct GNUNET_CADET_Hash *hash)
1922 {
1923   uint16_t size;
1924   uint16_t type;
1925   char *arrow;
1926
1927   size = ntohs (message->size);
1928   type = ntohs (message->type);
1929   switch (type)
1930   {
1931     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
1932     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
1933     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
1934     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
1935       arrow = "==";
1936       break;
1937     default:
1938       arrow = "--";
1939   }
1940   LOG (GNUNET_ERROR_TYPE_INFO, "<%s %s on conn %s from %s, %6u bytes\n",
1941        arrow, GC_m2s (type), GNUNET_h2s (GC_h2hc (hash)),
1942        GNUNET_i2s (peer), (unsigned int) size);
1943 }
1944
1945 /******************************************************************************/
1946 /********************************    API    ***********************************/
1947 /******************************************************************************/
1948
1949 /**
1950  * Core handler for connection creation.
1951  *
1952  * @param cls Closure (unused).
1953  * @param peer Sender (neighbor).
1954  * @param message Message.
1955  * @return #GNUNET_OK to keep the connection open,
1956  *         #GNUNET_SYSERR to close it (signal serious error)
1957  */
1958 int
1959 GCC_handle_create (void *cls,
1960                    const struct GNUNET_PeerIdentity *peer,
1961                    const struct GNUNET_MessageHeader *message)
1962 {
1963   struct GNUNET_CADET_ConnectionCreate *msg;
1964   struct GNUNET_PeerIdentity *id;
1965   struct GNUNET_CADET_Hash *cid;
1966   struct CadetPeerPath *path;
1967   struct CadetPeer *dest_peer;
1968   struct CadetPeer *orig_peer;
1969   struct CadetConnection *c;
1970   unsigned int own_pos;
1971   uint16_t size;
1972
1973   GCC_check_connections ();
1974   /* Check size */
1975   size = ntohs (message->size);
1976   if (size < sizeof (struct GNUNET_CADET_ConnectionCreate))
1977   {
1978     GNUNET_break_op (0);
1979     return GNUNET_OK;
1980   }
1981
1982   /* Calculate hops */
1983   size -= sizeof (struct GNUNET_CADET_ConnectionCreate);
1984   if (size % sizeof (struct GNUNET_PeerIdentity))
1985   {
1986     GNUNET_break_op (0);
1987     return GNUNET_OK;
1988   }
1989   if (0 != size % sizeof (struct GNUNET_PeerIdentity))
1990   {
1991     GNUNET_break_op (0);
1992     return GNUNET_OK;
1993   }
1994   size /= sizeof (struct GNUNET_PeerIdentity);
1995   if (1 > size)
1996   {
1997     GNUNET_break_op (0);
1998     return GNUNET_OK;
1999   }
2000   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
2001
2002   /* Get parameters */
2003   msg = (struct GNUNET_CADET_ConnectionCreate *) message;
2004   cid = &msg->cid;
2005   log_message (message, peer, cid);
2006   id = (struct GNUNET_PeerIdentity *) &msg[1];
2007   LOG (GNUNET_ERROR_TYPE_DEBUG, "    origin: %s\n", GNUNET_i2s (id));
2008
2009   /* Create connection */
2010   c = connection_get (cid);
2011   if (NULL == c)
2012   {
2013     path = path_build_from_peer_ids ((struct GNUNET_PeerIdentity *) &msg[1],
2014                                      size, myid, &own_pos);
2015     if (NULL == path)
2016     {
2017       /* Path was malformed, probably our own ID was not in it. */
2018       GNUNET_STATISTICS_update (stats, "# malformed paths", 1, GNUNET_NO);
2019       GNUNET_break_op (0);
2020       return GNUNET_OK;
2021     }
2022
2023     if (0 == own_pos)
2024     {
2025       /* We received this request from a neighbor, we cannot be origin */
2026       GNUNET_STATISTICS_update (stats, "# fake paths", 1, GNUNET_NO);
2027       GNUNET_break_op (0);
2028       path_destroy (path);
2029       return GNUNET_OK;
2030     }
2031
2032     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
2033     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
2034     c = GCC_new (cid, NULL, path, own_pos);
2035     if (NULL == c)
2036     {
2037       if (path->length - 1 == own_pos)
2038       {
2039         /* If we are destination, why did the creation fail? */
2040         GNUNET_break (0);
2041         path_destroy (path);
2042         GCC_check_connections ();
2043         return GNUNET_OK;
2044       }
2045       send_broken_unknown (cid, &my_full_id,
2046                            GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
2047                            peer);
2048       path_destroy (path);
2049       GCC_check_connections ();
2050       return GNUNET_OK;
2051     }
2052     GCP_add_path_to_all (path, GNUNET_NO);
2053     connection_reset_timeout (c, GNUNET_YES);
2054   }
2055   else
2056   {
2057     path = path_duplicate (c->path);
2058   }
2059   if (CADET_CONNECTION_NEW == c->state)
2060     connection_change_state (c, CADET_CONNECTION_SENT);
2061
2062   /* Remember peers */
2063   dest_peer = GCP_get (&id[size - 1], GNUNET_YES);
2064   orig_peer = GCP_get (&id[0], GNUNET_YES);
2065
2066   /* Is it a connection to us? */
2067   if (c->own_pos == path->length - 1)
2068   {
2069     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
2070     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
2071
2072     add_to_peer (c, orig_peer);
2073     if (GNUNET_YES == does_connection_exist (c))
2074     {
2075       /* Peer created a connection equal to one we think exists
2076        * and is fine.
2077        * Solution: Keep both and postpone disambiguation. In the meantime
2078        * the connection will time out or peer will inform us it is broken.
2079        *
2080        * Other options:
2081        * - Use explicit duplicate.
2082        * - Accept new conn and destroy the old. (interruption in higher level)
2083        * - Keep the one with higher ID / created by peer with higher ID. */
2084        schedule_check_duplicates (c);
2085     }
2086
2087     if (CADET_TUNNEL_NEW == GCT_get_cstate (c->t))
2088       GCT_change_cstate (c->t,  CADET_TUNNEL_WAITING);
2089
2090     send_connection_ack (c, GNUNET_NO);
2091     if (CADET_CONNECTION_SENT == c->state)
2092       connection_change_state (c, CADET_CONNECTION_ACK);
2093   }
2094   else
2095   {
2096     /* It's for somebody else! Retransmit. */
2097     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
2098     GCP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
2099     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
2100     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c,
2101                                                       GNUNET_YES, GNUNET_YES,
2102                                                       NULL, NULL));
2103   }
2104   path_destroy (path);
2105   GCC_check_connections ();
2106   return GNUNET_OK;
2107 }
2108
2109
2110 /**
2111  * Core handler for path confirmations.
2112  *
2113  * @param cls closure
2114  * @param message message
2115  * @param peer peer identity this notification is about
2116  * @return #GNUNET_OK to keep the connection open,
2117  *         #GNUNET_SYSERR to close it (signal serious error)
2118  */
2119 int
2120 GCC_handle_confirm (void *cls,
2121                     const struct GNUNET_PeerIdentity *peer,
2122                     const struct GNUNET_MessageHeader *message)
2123 {
2124   struct GNUNET_CADET_ConnectionACK *msg;
2125   struct CadetConnection *c;
2126   struct CadetPeerPath *p;
2127   struct CadetPeer *pi;
2128   enum CadetConnectionState oldstate;
2129   int fwd;
2130
2131   GCC_check_connections ();
2132   msg = (struct GNUNET_CADET_ConnectionACK *) message;
2133   log_message (message, peer, &msg->cid);
2134   c = connection_get (&msg->cid);
2135   if (NULL == c)
2136   {
2137     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
2138                               1, GNUNET_NO);
2139     LOG (GNUNET_ERROR_TYPE_DEBUG,
2140          "  don't know the connection!\n");
2141     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
2142     GCC_check_connections ();
2143     return GNUNET_OK;
2144   }
2145
2146   if (GNUNET_NO != c->destroy)
2147   {
2148     GNUNET_assert (CADET_CONNECTION_DESTROYED != c->state);
2149     LOG (GNUNET_ERROR_TYPE_DEBUG,
2150          "connection %s being destroyed, ignoring confirm\n",
2151          GCC_2s (c));
2152     GCC_check_connections ();
2153     return GNUNET_OK;
2154   }
2155
2156   oldstate = c->state;
2157   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
2158   pi = GCP_get (peer, GNUNET_YES);
2159   if (get_next_hop (c) == pi)
2160   {
2161     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
2162     fwd = GNUNET_NO;
2163     if (CADET_CONNECTION_SENT == oldstate)
2164       connection_change_state (c, CADET_CONNECTION_ACK);
2165   }
2166   else if (get_prev_hop (c) == pi)
2167   {
2168     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FINAL ACK\n");
2169     fwd = GNUNET_YES;
2170     connection_change_state (c, CADET_CONNECTION_READY);
2171   }
2172   else
2173   {
2174     GNUNET_break_op (0);
2175     return GNUNET_OK;
2176   }
2177
2178   connection_reset_timeout (c, fwd);
2179
2180   /* Add path to peers? */
2181   p = c->path;
2182   if (NULL != p)
2183   {
2184     GCP_add_path_to_all (p, GNUNET_YES);
2185   }
2186   else
2187   {
2188     GNUNET_break (0);
2189   }
2190
2191   /* Message for us as creator? */
2192   if (GCC_is_origin (c, GNUNET_YES))
2193   {
2194     if (GNUNET_NO != fwd)
2195     {
2196       GNUNET_break_op (0);
2197       return GNUNET_OK;
2198     }
2199     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
2200
2201     /* If just created, cancel the short timeout and start a long one */
2202     if (CADET_CONNECTION_SENT == oldstate)
2203       connection_reset_timeout (c, GNUNET_YES);
2204
2205     /* Change connection state */
2206     connection_change_state (c, CADET_CONNECTION_READY);
2207     send_connection_ack (c, GNUNET_YES);
2208
2209     /* Change tunnel state, trigger KX */
2210     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2211       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2212     GCC_check_connections ();
2213     return GNUNET_OK;
2214   }
2215
2216   /* Message for us as destination? */
2217   if (GCC_is_terminal (c, GNUNET_YES))
2218   {
2219     if (GNUNET_YES != fwd)
2220     {
2221       GNUNET_break_op (0);
2222       return GNUNET_OK;
2223     }
2224     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
2225
2226     /* If just created, cancel the short timeout and start a long one */
2227     if (CADET_CONNECTION_ACK == oldstate)
2228       connection_reset_timeout (c, GNUNET_NO);
2229
2230     /* Change tunnel state */
2231     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2232       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2233     GCC_check_connections ();
2234     return GNUNET_OK;
2235   }
2236
2237   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2238   GNUNET_assert (NULL ==
2239                  GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2240                                             GNUNET_YES, NULL, NULL));
2241   GCC_check_connections ();
2242   return GNUNET_OK;
2243 }
2244
2245
2246 /**
2247  * Core handler for notifications of broken connections.
2248  *
2249  * @param cls Closure (unused).
2250  * @param id Peer identity of sending neighbor.
2251  * @param message Message.
2252  * @return #GNUNET_OK to keep the connection open,
2253  *         #GNUNET_SYSERR to close it (signal serious error)
2254  */
2255 int
2256 GCC_handle_broken (void* cls,
2257                    const struct GNUNET_PeerIdentity* id,
2258                    const struct GNUNET_MessageHeader* message)
2259 {
2260   struct GNUNET_CADET_ConnectionBroken *msg;
2261   struct CadetConnection *c;
2262   struct CadetTunnel *t;
2263   int pending;
2264   int fwd;
2265
2266   GCC_check_connections ();
2267   msg = (struct GNUNET_CADET_ConnectionBroken *) message;
2268   log_message (message, id, &msg->cid);
2269   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
2270               GNUNET_i2s (&msg->peer1));
2271   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
2272               GNUNET_i2s (&msg->peer2));
2273   c = connection_get (&msg->cid);
2274   if (NULL == c)
2275   {
2276     LOG (GNUNET_ERROR_TYPE_DEBUG, "  duplicate CONNECTION_BROKEN\n");
2277     GCC_check_connections ();
2278     return GNUNET_OK;
2279   }
2280
2281   t = c->t;
2282
2283   fwd = is_fwd (c, id);
2284   c->destroy = GNUNET_YES;
2285   if (GCC_is_terminal (c, fwd))
2286   {
2287     struct CadetPeer *endpoint;
2288
2289     if (NULL == t)
2290     {
2291       /* A terminal connection should not have 't' set to NULL. */
2292       GNUNET_break (0);
2293       GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
2294       return GNUNET_OK;
2295     }
2296     endpoint = GCP_get_short (c->path->peers[c->path->length - 1], GNUNET_YES);
2297     if (2 < c->path->length)
2298       path_invalidate (c->path);
2299     GCP_notify_broken_link (endpoint, &msg->peer1, &msg->peer2);
2300
2301     c->state = CADET_CONNECTION_BROKEN;
2302     GCT_remove_connection (t, c);
2303     c->t = NULL;
2304
2305     pending = c->pending_messages;
2306     if (0 < pending)
2307       resend_messages_and_destroy (c, !fwd);
2308     else
2309       GCC_destroy (c);
2310   }
2311   else
2312   {
2313     GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2314                                                       GNUNET_YES, NULL, NULL));
2315     connection_cancel_queues (c, !fwd);
2316   }
2317   GCC_check_connections ();
2318   return GNUNET_OK;
2319 }
2320
2321
2322 /**
2323  * Core handler for tunnel destruction
2324  *
2325  * @param cls Closure (unused).
2326  * @param peer Peer identity of sending neighbor.
2327  * @param message Message.
2328  * @return #GNUNET_OK to keep the connection open,
2329  *         #GNUNET_SYSERR to close it (signal serious error)
2330  */
2331 int
2332 GCC_handle_destroy (void *cls,
2333                     const struct GNUNET_PeerIdentity *peer,
2334                     const struct GNUNET_MessageHeader *message)
2335 {
2336   const struct GNUNET_CADET_ConnectionDestroy *msg;
2337   struct CadetConnection *c;
2338   int fwd;
2339
2340   GCC_check_connections ();
2341   msg = (const struct GNUNET_CADET_ConnectionDestroy *) message;
2342   log_message (message, peer, &msg->cid);
2343   c = connection_get (&msg->cid);
2344   if (NULL == c)
2345   {
2346     /* Probably already got the message from another path,
2347      * destroyed the tunnel and retransmitted to children.
2348      * Safe to ignore.
2349      */
2350     GNUNET_STATISTICS_update (stats,
2351                               "# control on unknown connection",
2352                               1, GNUNET_NO);
2353     LOG (GNUNET_ERROR_TYPE_DEBUG,
2354          "  connection unknown: already destroyed?\n");
2355     GCC_check_connections ();
2356     return GNUNET_OK;
2357   }
2358   fwd = is_fwd (c, peer);
2359   if (GNUNET_SYSERR == fwd)
2360   {
2361     GNUNET_break_op (0); /* FIXME */
2362     return GNUNET_OK;
2363   }
2364   if (GNUNET_NO == GCC_is_terminal (c, fwd))
2365   {
2366     GNUNET_assert (NULL ==
2367                    GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2368                                               GNUNET_YES, NULL, NULL));
2369   }
2370   else if (0 == c->pending_messages)
2371   {
2372     LOG (GNUNET_ERROR_TYPE_DEBUG, "  directly destroying connection!\n");
2373     GCC_destroy (c);
2374     GCC_check_connections ();
2375     return GNUNET_OK;
2376   }
2377   c->destroy = GNUNET_YES;
2378   c->state = CADET_CONNECTION_DESTROYED;
2379   if (NULL != c->t)
2380   {
2381     GCT_remove_connection (c->t, c);
2382     c->t = NULL;
2383   }
2384   GCC_check_connections ();
2385   return GNUNET_OK;
2386 }
2387
2388
2389 /**
2390  * Check the message against internal state and test if it goes FWD or BCK.
2391  *
2392  * Updates the PID, state and timeout values for the connection.
2393  *
2394  * @param message Message to check. It must belong to an existing connection.
2395  * @param minimum_size The message cannot be smaller than this value.
2396  * @param cid Connection ID (even if @a c is NULL, the ID is still needed).
2397  * @param c Connection this message should belong. If NULL, check fails.
2398  * @param neighbor Neighbor that sent the message.
2399  */
2400 static int
2401 check_message (const struct GNUNET_MessageHeader *message,
2402                size_t minimum_size,
2403                const struct GNUNET_CADET_Hash* cid,
2404                struct CadetConnection *c,
2405                const struct GNUNET_PeerIdentity *neighbor,
2406                uint32_t pid)
2407 {
2408   GNUNET_PEER_Id neighbor_id;
2409   struct CadetFlowControl *fc;
2410   struct CadetPeer *hop;
2411   int fwd;
2412   uint16_t type;
2413
2414   /* Check size */
2415   if (ntohs (message->size) < minimum_size)
2416   {
2417     GNUNET_break_op (0);
2418     LOG (GNUNET_ERROR_TYPE_WARNING, "Size %u < %u\n",
2419          ntohs (message->size), minimum_size);
2420     return GNUNET_SYSERR;
2421   }
2422
2423   /* Check connection */
2424   if (NULL == c)
2425   {
2426     GNUNET_STATISTICS_update (stats,
2427                               "# unknown connection",
2428                               1, GNUNET_NO);
2429     LOG (GNUNET_ERROR_TYPE_DEBUG,
2430          "%s on unknown connection %s\n",
2431          GC_m2s (ntohs (message->type)),
2432          GNUNET_h2s (GC_h2hc (cid)));
2433     send_broken_unknown (cid,
2434                          &my_full_id,
2435                          NULL,
2436                          neighbor);
2437     return GNUNET_SYSERR;
2438   }
2439
2440   /* Check if origin is as expected */
2441   neighbor_id = GNUNET_PEER_search (neighbor);
2442   hop = get_prev_hop (c);
2443   if (neighbor_id == GCP_get_short_id (hop))
2444   {
2445     fwd = GNUNET_YES;
2446   }
2447   else
2448   {
2449     hop = get_next_hop (c);
2450     GNUNET_break (hop == c->next_peer);
2451     if (neighbor_id == GCP_get_short_id (hop))
2452     {
2453       fwd = GNUNET_NO;
2454     }
2455     else
2456     {
2457       /* Unexpected peer sending traffic on a connection. */
2458       GNUNET_break_op (0);
2459       return GNUNET_SYSERR;
2460     }
2461   }
2462
2463   /* Check PID for payload messages */
2464   type = ntohs (message->type);
2465   if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == type
2466       || GNUNET_MESSAGE_TYPE_CADET_AX == type)
2467   {
2468     fc = fwd ? &c->bck_fc : &c->fwd_fc;
2469     LOG (GNUNET_ERROR_TYPE_DEBUG, " PID %u (expected %u - %u)\n",
2470          pid, fc->last_pid_recv + 1, fc->last_ack_sent);
2471     if (GC_is_pid_bigger (pid, fc->last_ack_sent))
2472     {
2473       GNUNET_break_op (0);
2474       GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
2475       LOG (GNUNET_ERROR_TYPE_WARNING, "Received PID %u, (prev %u), ACK %u\n",
2476           pid, fc->last_pid_recv, fc->last_ack_sent);
2477       return GNUNET_SYSERR;
2478     }
2479     if (GC_is_pid_bigger (pid, fc->last_pid_recv))
2480     {
2481       unsigned int delta;
2482
2483       delta = pid - fc->last_pid_recv;
2484       fc->last_pid_recv = pid;
2485       fc->recv_bitmap <<= delta;
2486       fc->recv_bitmap |= 1;
2487     }
2488     else
2489     {
2490       GNUNET_STATISTICS_update (stats, "# out of order PID", 1, GNUNET_NO);
2491       if (GNUNET_NO == is_ooo_ok (fc->last_pid_recv, pid, fc->recv_bitmap))
2492       {
2493         LOG (GNUNET_ERROR_TYPE_WARNING, "PID %u unexpected (%u+), dropping!\n",
2494              pid, fc->last_pid_recv - 31);
2495         return GNUNET_SYSERR;
2496       }
2497       fc->recv_bitmap |= get_recv_bitmask (fc->last_pid_recv, pid);
2498     }
2499   }
2500
2501   /* Count as connection confirmation. */
2502   if (CADET_CONNECTION_SENT == c->state || CADET_CONNECTION_ACK == c->state)
2503   {
2504     connection_change_state (c, CADET_CONNECTION_READY);
2505     if (NULL != c->t)
2506     {
2507       if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
2508         GCT_change_cstate (c->t, CADET_TUNNEL_READY);
2509     }
2510   }
2511   connection_reset_timeout (c, fwd);
2512
2513   return fwd;
2514 }
2515
2516
2517 /**
2518  * Generic handler for cadet network encrypted traffic.
2519  *
2520  * @param peer Peer identity this notification is about.
2521  * @param msg Encrypted message.
2522  * @return #GNUNET_OK to keep the connection open,
2523  *         #GNUNET_SYSERR to close it (signal serious error)
2524  */
2525 static int
2526 handle_cadet_encrypted (const struct GNUNET_PeerIdentity *peer,
2527                         const struct GNUNET_MessageHeader *message)
2528 {
2529   const struct GNUNET_CADET_Encrypted *otr_msg;
2530   const struct GNUNET_CADET_AX *ax_msg;
2531   const struct GNUNET_CADET_Hash* cid;
2532   struct CadetConnection *c;
2533   size_t minimum_size;
2534   size_t overhead;
2535   uint32_t pid;
2536   uint32_t ttl;
2537   int fwd;
2538
2539   GCC_check_connections ();
2540   if (GNUNET_MESSAGE_TYPE_CADET_AX == ntohs (message->type))
2541   {
2542     overhead = sizeof (struct GNUNET_CADET_AX);
2543     ax_msg = (const struct GNUNET_CADET_AX *) message;
2544     cid = &ax_msg->cid;
2545     pid = ntohl (ax_msg->pid);
2546     otr_msg = NULL;
2547   }
2548   else
2549   {
2550     overhead = sizeof (struct GNUNET_CADET_Encrypted);
2551     otr_msg = (const struct GNUNET_CADET_Encrypted *) message;
2552     cid = &otr_msg->cid;
2553     pid = ntohl (otr_msg->pid);
2554   }
2555
2556   log_message (message, peer, cid);
2557
2558   minimum_size = sizeof (struct GNUNET_MessageHeader) + overhead;
2559   c = connection_get (cid);
2560   fwd = check_message (message,
2561                        minimum_size,
2562                        cid,
2563                        c,
2564                        peer,
2565                        pid);
2566
2567   /* If something went wrong, discard message. */
2568   if (GNUNET_SYSERR == fwd)
2569   {
2570     GCC_check_connections ();
2571     return GNUNET_OK;
2572   }
2573
2574   /* Is this message for us? */
2575   if (GCC_is_terminal (c, fwd))
2576   {
2577     GNUNET_STATISTICS_update (stats, "# received encrypted", 1, GNUNET_NO);
2578
2579     if (NULL == c->t)
2580     {
2581       GNUNET_break (GNUNET_NO != c->destroy);
2582       return GNUNET_OK;
2583     }
2584     GCT_handle_encrypted (c->t, message);
2585     GCC_send_ack (c, fwd, GNUNET_NO);
2586     GCC_check_connections ();
2587     return GNUNET_OK;
2588   }
2589
2590   /* Message not for us: forward to next hop */
2591   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2592   if (NULL != otr_msg) /* only otr has ttl */
2593   {
2594     ttl = ntohl (otr_msg->ttl);
2595     LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
2596     if (ttl == 0)
2597     {
2598       GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
2599       LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
2600       GCC_send_ack (c, fwd, GNUNET_NO);
2601       GCC_check_connections ();
2602       return GNUNET_OK;
2603     }
2604   }
2605
2606   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2607   GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
2608                                                     GNUNET_NO, NULL, NULL));
2609   GCC_check_connections ();
2610   return GNUNET_OK;
2611 }
2612
2613
2614 /**
2615  * Generic handler for cadet network encrypted traffic.
2616  *
2617  * @param peer Peer identity this notification is about.
2618  * @param msg Encrypted message.
2619  * @return #GNUNET_OK to keep the connection open,
2620  *         #GNUNET_SYSERR to close it (signal serious error)
2621  */
2622 static int
2623 handle_cadet_kx (const struct GNUNET_PeerIdentity *peer,
2624                  const struct GNUNET_CADET_KX *msg)
2625 {
2626   const struct GNUNET_CADET_Hash* cid;
2627   struct CadetConnection *c;
2628   size_t expected_size;
2629   int fwd;
2630
2631   GCC_check_connections ();
2632   cid = &msg->cid;
2633   log_message (&msg->header, peer, cid);
2634
2635   expected_size = sizeof (struct GNUNET_CADET_KX)
2636                   + sizeof (struct GNUNET_MessageHeader);
2637   c = connection_get (cid);
2638   fwd = check_message (&msg->header,
2639                        expected_size,
2640                        cid,
2641                        c,
2642                        peer,
2643                        0);
2644
2645   /* If something went wrong, discard message. */
2646   if (GNUNET_SYSERR == fwd)
2647     return GNUNET_OK;
2648
2649   /* Is this message for us? */
2650   if (GCC_is_terminal (c, fwd))
2651   {
2652     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2653     GNUNET_STATISTICS_update (stats, "# received KX", 1, GNUNET_NO);
2654     if (NULL == c->t)
2655     {
2656       GNUNET_break (0);
2657       return GNUNET_OK;
2658     }
2659     GCT_handle_kx (c->t, &msg[1].header);
2660     GCC_check_connections ();
2661     return GNUNET_OK;
2662   }
2663
2664   /* Message not for us: forward to next hop */
2665   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2666   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2667   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
2668                                                     GNUNET_NO, NULL, NULL));
2669   GCC_check_connections ();
2670   return GNUNET_OK;
2671 }
2672
2673
2674 /**
2675  * Core handler for key exchange traffic (ephemeral key, ping, pong).
2676  *
2677  * @param cls Closure (unused).
2678  * @param message Message received.
2679  * @param peer Peer who sent the message.
2680  * @return #GNUNET_OK to keep the connection open,
2681  *         #GNUNET_SYSERR to close it (signal serious error)
2682  */
2683 int
2684 GCC_handle_kx (void *cls,
2685                const struct GNUNET_PeerIdentity *peer,
2686                const struct GNUNET_MessageHeader *message)
2687 {
2688   GCC_check_connections ();
2689   return handle_cadet_kx (peer, (struct GNUNET_CADET_KX *) message);
2690 }
2691
2692
2693 /**
2694  * Core handler for encrypted cadet network traffic (channel mgmt, data).
2695  *
2696  * @param cls Closure (unused).
2697  * @param message Message received.
2698  * @param peer Peer who sent the message.
2699  * @return #GNUNET_OK to keep the connection open,
2700  *         #GNUNET_SYSERR to close it (signal serious error)
2701  */
2702 int
2703 GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
2704                       const struct GNUNET_MessageHeader *message)
2705 {
2706   GCC_check_connections ();
2707   return handle_cadet_encrypted (peer, message);
2708 }
2709
2710
2711 /**
2712  * Core handler for cadet network traffic point-to-point acks.
2713  *
2714  * @param cls closure
2715  * @param message message
2716  * @param peer peer identity this notification is about
2717  * @return #GNUNET_OK to keep the connection open,
2718  *         #GNUNET_SYSERR to close it (signal serious error)
2719  */
2720 int
2721 GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
2722                 const struct GNUNET_MessageHeader *message)
2723 {
2724   struct GNUNET_CADET_ACK *msg;
2725   struct CadetConnection *c;
2726   struct CadetFlowControl *fc;
2727   GNUNET_PEER_Id id;
2728   uint32_t ack;
2729   int fwd;
2730
2731   GCC_check_connections ();
2732   msg = (struct GNUNET_CADET_ACK *) message;
2733   log_message (message, peer, &msg->cid);
2734   c = connection_get (&msg->cid);
2735   if (NULL == c)
2736   {
2737     GNUNET_STATISTICS_update (stats,
2738                               "# ack on unknown connection",
2739                               1,
2740                               GNUNET_NO);
2741     send_broken_unknown (&msg->cid,
2742                          &my_full_id,
2743                          NULL,
2744                          peer);
2745     GCC_check_connections ();
2746     return GNUNET_OK;
2747   }
2748
2749   /* Is this a forward or backward ACK? */
2750   id = GNUNET_PEER_search (peer);
2751   if (GCP_get_short_id (get_next_hop (c)) == id)
2752   {
2753     fc = &c->fwd_fc;
2754     fwd = GNUNET_YES;
2755   }
2756   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2757   {
2758     fc = &c->bck_fc;
2759     fwd = GNUNET_NO;
2760   }
2761   else
2762   {
2763     GNUNET_break_op (0);
2764     return GNUNET_OK;
2765   }
2766
2767   ack = ntohl (msg->ack);
2768   LOG (GNUNET_ERROR_TYPE_DEBUG, " %s ACK %u (was %u)\n",
2769        GC_f2s (fwd), ack, fc->last_ack_recv);
2770   if (GC_is_pid_bigger (ack, fc->last_ack_recv))
2771     fc->last_ack_recv = ack;
2772
2773   /* Cancel polling if the ACK is big enough. */
2774   if (NULL != fc->poll_task &&
2775       GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2776   {
2777     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
2778     GNUNET_SCHEDULER_cancel (fc->poll_task);
2779     fc->poll_task = NULL;
2780     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
2781   }
2782
2783   connection_unlock_queue (c, fwd);
2784   GCC_check_connections ();
2785   return GNUNET_OK;
2786 }
2787
2788
2789 /**
2790  * Core handler for cadet network traffic point-to-point ack polls.
2791  *
2792  * @param cls closure
2793  * @param message message
2794  * @param peer peer identity this notification is about
2795  * @return #GNUNET_OK to keep the connection open,
2796  *         #GNUNET_SYSERR to close it (signal serious error)
2797  */
2798 int
2799 GCC_handle_poll (void *cls,
2800                  const struct GNUNET_PeerIdentity *peer,
2801                  const struct GNUNET_MessageHeader *message)
2802 {
2803   struct GNUNET_CADET_Poll *msg;
2804   struct CadetConnection *c;
2805   struct CadetFlowControl *fc;
2806   GNUNET_PEER_Id id;
2807   uint32_t pid;
2808   int fwd;
2809
2810   GCC_check_connections ();
2811   msg = (struct GNUNET_CADET_Poll *) message;
2812   log_message (message, peer, &msg->cid);
2813   c = connection_get (&msg->cid);
2814   if (NULL == c)
2815   {
2816     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
2817                               GNUNET_NO);
2818     LOG (GNUNET_ERROR_TYPE_DEBUG,
2819          "POLL message on unknown connection %s!\n",
2820          GNUNET_h2s (GC_h2hc (&msg->cid)));
2821     send_broken_unknown (&msg->cid,
2822                          &my_full_id,
2823                          NULL,
2824                          peer);
2825     GCC_check_connections ();
2826     return GNUNET_OK;
2827   }
2828
2829   /* Is this a forward or backward ACK?
2830    * Note: a poll should never be needed in a loopback case,
2831    * since there is no possiblility of packet loss there, so
2832    * this way of discerining FWD/BCK should not be a problem.
2833    */
2834   id = GNUNET_PEER_search (peer);
2835   if (GCP_get_short_id (get_next_hop (c)) == id)
2836   {
2837     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
2838     fc = &c->fwd_fc;
2839   }
2840   else if (GCP_get_short_id (get_prev_hop (c)) == id)
2841   {
2842     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
2843     fc = &c->bck_fc;
2844   }
2845   else
2846   {
2847     GNUNET_break_op (0);
2848     return GNUNET_OK;
2849   }
2850
2851   pid = ntohl (msg->pid);
2852   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
2853   fc->last_pid_recv = pid;
2854   fwd = fc == &c->bck_fc;
2855   GCC_send_ack (c, fwd, GNUNET_YES);
2856   GCC_check_connections ();
2857
2858   return GNUNET_OK;
2859 }
2860
2861
2862 /**
2863  * Send an ACK on the appropriate connection/channel, depending on
2864  * the direction and the position of the peer.
2865  *
2866  * @param c Which connection to send the hop-by-hop ACK.
2867  * @param fwd Is this a fwd ACK? (will go dest->root).
2868  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
2869  */
2870 void
2871 GCC_send_ack (struct CadetConnection *c, int fwd, int force)
2872 {
2873   unsigned int buffer;
2874
2875   GCC_check_connections ();
2876   LOG (GNUNET_ERROR_TYPE_DEBUG, "GCC send %s ACK on %s\n",
2877        GC_f2s (fwd), GCC_2s (c));
2878
2879   if (NULL == c)
2880   {
2881     GNUNET_break (0);
2882     return;
2883   }
2884
2885   if (GNUNET_NO != c->destroy)
2886   {
2887     LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
2888     GCC_check_connections ();
2889     return;
2890   }
2891
2892   /* Get available buffer space */
2893   if (GCC_is_terminal (c, fwd))
2894   {
2895     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
2896     buffer = GCT_get_channels_buffer (c->t);
2897   }
2898   else
2899   {
2900     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
2901     buffer = GCC_get_buffer (c, fwd);
2902   }
2903   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
2904   if (0 == buffer && GNUNET_NO == force)
2905   {
2906     GCC_check_connections ();
2907     return;
2908   }
2909
2910   /* Send available buffer space */
2911   if (GCC_is_origin (c, fwd))
2912   {
2913     GNUNET_assert (NULL != c->t);
2914     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2915     GCT_unchoke_channels (c->t);
2916   }
2917   else
2918   {
2919     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2920     send_ack (c, buffer, fwd, force);
2921   }
2922   GCC_check_connections ();
2923 }
2924
2925
2926 /**
2927  * Initialize the connections subsystem
2928  *
2929  * @param c Configuration handle.
2930  */
2931 void
2932 GCC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2933 {
2934   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2935   if (GNUNET_OK !=
2936       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_MSGS_QUEUE",
2937                                              &max_msgs_queue))
2938   {
2939     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2940                                "CADET", "MAX_MSGS_QUEUE", "MISSING");
2941     GNUNET_SCHEDULER_shutdown ();
2942     return;
2943   }
2944
2945   if (GNUNET_OK !=
2946       GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_CONNECTIONS",
2947                                              &max_connections))
2948   {
2949     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2950                                "CADET", "MAX_CONNECTIONS", "MISSING");
2951     GNUNET_SCHEDULER_shutdown ();
2952     return;
2953   }
2954
2955   if (GNUNET_OK !=
2956       GNUNET_CONFIGURATION_get_value_time (c, "CADET", "REFRESH_CONNECTION_TIME",
2957                                            &refresh_connection_time))
2958   {
2959     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2960                                "CADET", "REFRESH_CONNECTION_TIME", "MISSING");
2961     GNUNET_SCHEDULER_shutdown ();
2962     return;
2963   }
2964   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2965   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
2966 }
2967
2968
2969 /**
2970  * Destroy each connection on shutdown.
2971  *
2972  * @param cls Closure (unused).
2973  * @param key Current key code (CID, unused).
2974  * @param value Value in the hash map (`struct CadetConnection`)
2975  *
2976  * @return #GNUNET_YES, because we should continue to iterate
2977  */
2978 static int
2979 shutdown_iterator (void *cls,
2980                    const struct GNUNET_HashCode *key,
2981                    void *value)
2982 {
2983   struct CadetConnection *c = value;
2984
2985   c->state = CADET_CONNECTION_DESTROYED;
2986   GCC_destroy (c);
2987   return GNUNET_YES;
2988 }
2989
2990
2991 /**
2992  * Shut down the connections subsystem.
2993  */
2994 void
2995 GCC_shutdown (void)
2996 {
2997   LOG (GNUNET_ERROR_TYPE_DEBUG, "Shutting down connections\n");
2998   GCC_check_connections ();
2999   GNUNET_CONTAINER_multihashmap_iterate (connections,
3000                                          &shutdown_iterator,
3001                                          NULL);
3002   GNUNET_CONTAINER_multihashmap_destroy (connections);
3003   connections = NULL;
3004 }
3005
3006
3007 /**
3008  * Create a connection.
3009  *
3010  * @param cid Connection ID (either created locally or imposed remotely).
3011  * @param t Tunnel this connection belongs to (or NULL);
3012  * @param path Path this connection has to use (copy is made).
3013  * @param own_pos Own position in the @c path path.
3014  *
3015  * @return Newly created connection, NULL in case of error (own id not in path).
3016  */
3017 struct CadetConnection *
3018 GCC_new (const struct GNUNET_CADET_Hash *cid,
3019          struct CadetTunnel *t,
3020          struct CadetPeerPath *path,
3021          unsigned int own_pos)
3022 {
3023   struct CadetConnection *c;
3024   struct CadetPeerPath *cpath;
3025
3026   GCC_check_connections ();
3027   cpath = path_duplicate (path);
3028   GNUNET_assert (NULL != cpath);
3029   c = GNUNET_new (struct CadetConnection);
3030   c->id = *cid;
3031   GNUNET_assert (GNUNET_OK ==
3032                  GNUNET_CONTAINER_multihashmap_put (connections,
3033                                                     GCC_get_h (c), c,
3034                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3035   fc_init (&c->fwd_fc);
3036   fc_init (&c->bck_fc);
3037   c->fwd_fc.c = c;
3038   c->bck_fc.c = c;
3039
3040   c->t = t;
3041   GNUNET_assert (own_pos <= cpath->length - 1);
3042   c->own_pos = own_pos;
3043   c->path = cpath;
3044   cpath->c = c;
3045   if (GNUNET_OK != register_neighbors (c))
3046   {
3047     if (0 == own_pos)
3048     {
3049       /* We were the origin of this request, this means we have invalid
3050        * info about the paths to reach the destination. We must invalidate
3051        * the *original* path to avoid trying it again in the next minute.
3052        */
3053       if (2 < path->length)
3054         path_invalidate (path);
3055       else
3056       {
3057         GNUNET_break (0);
3058         GCT_debug(t, GNUNET_ERROR_TYPE_WARNING);
3059       }
3060       c->t = NULL;
3061     }
3062     path_destroy (c->path);
3063     c->path = NULL;
3064     GCC_destroy (c);
3065     return NULL;
3066   }
3067   LOG (GNUNET_ERROR_TYPE_INFO, "New connection %s\n", GCC_2s (c));
3068   GCC_check_connections ();
3069   return c;
3070 }
3071
3072
3073 void
3074 GCC_destroy (struct CadetConnection *c)
3075 {
3076   GCC_check_connections ();
3077   if (NULL == c)
3078   {
3079     GNUNET_break (0);
3080     return;
3081   }
3082
3083   if (2 == c->destroy) /* cancel queues -> GCP_queue_cancel -> q_destroy -> */
3084     return;            /* -> message_sent -> GCC_destroy. Don't loop. */
3085   c->destroy = 2;
3086
3087   LOG (GNUNET_ERROR_TYPE_DEBUG,
3088        "destroying connection %s\n",
3089        GCC_2s (c));
3090   LOG (GNUNET_ERROR_TYPE_DEBUG,
3091        " fc's f: %p, b: %p\n",
3092        &c->fwd_fc, &c->bck_fc);
3093   LOG (GNUNET_ERROR_TYPE_DEBUG,
3094        " fc tasks f: %u, b: %u\n",
3095        c->fwd_fc.poll_task,
3096        c->bck_fc.poll_task);
3097
3098   /* Cancel all traffic */
3099   if (NULL != c->path)
3100   {
3101     connection_cancel_queues (c, GNUNET_YES);
3102     connection_cancel_queues (c, GNUNET_NO);
3103   }
3104   unregister_neighbors (c);
3105   path_destroy (c->path);
3106   c->path = NULL;
3107
3108   /* Delete from tunnel */
3109   if (NULL != c->t)
3110     GCT_remove_connection (c->t, c);
3111
3112   if (NULL != c->check_duplicates_task)
3113     GNUNET_SCHEDULER_cancel (c->check_duplicates_task);
3114   if (NULL != c->fwd_maintenance_task)
3115     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
3116   if (NULL != c->bck_maintenance_task)
3117     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
3118
3119   if (GNUNET_NO == c->was_removed)
3120   {
3121     GNUNET_break (GNUNET_YES ==
3122                   GNUNET_CONTAINER_multihashmap_remove (connections,
3123                                                         GCC_get_h (c),
3124                                                         c));
3125   }
3126   GNUNET_STATISTICS_update (stats,
3127                             "# connections",
3128                             -1,
3129                             GNUNET_NO);
3130   GNUNET_free (c);
3131   GCC_check_connections ();
3132 }
3133
3134
3135 /**
3136  * Get the connection ID.
3137  *
3138  * @param c Connection to get the ID from.
3139  *
3140  * @return ID of the connection.
3141  */
3142 const struct GNUNET_CADET_Hash *
3143 GCC_get_id (const struct CadetConnection *c)
3144 {
3145   return &c->id;
3146 }
3147
3148
3149 /**
3150  * Get the connection ID.
3151  *
3152  * @param c Connection to get the ID from.
3153  *
3154  * @return ID of the connection.
3155  */
3156 const struct GNUNET_HashCode *
3157 GCC_get_h (const struct CadetConnection *c)
3158 {
3159   return GC_h2hc (&c->id);
3160 }
3161
3162
3163 /**
3164  * Get the connection path.
3165  *
3166  * @param c Connection to get the path from.
3167  *
3168  * @return path used by the connection.
3169  */
3170 const struct CadetPeerPath *
3171 GCC_get_path (const struct CadetConnection *c)
3172 {
3173   if (GNUNET_NO == c->destroy)
3174     return c->path;
3175   return NULL;
3176 }
3177
3178
3179 /**
3180  * Get the connection state.
3181  *
3182  * @param c Connection to get the state from.
3183  *
3184  * @return state of the connection.
3185  */
3186 enum CadetConnectionState
3187 GCC_get_state (const struct CadetConnection *c)
3188 {
3189   return c->state;
3190 }
3191
3192 /**
3193  * Get the connection tunnel.
3194  *
3195  * @param c Connection to get the tunnel from.
3196  *
3197  * @return tunnel of the connection.
3198  */
3199 struct CadetTunnel *
3200 GCC_get_tunnel (const struct CadetConnection *c)
3201 {
3202   return c->t;
3203 }
3204
3205
3206 /**
3207  * Get free buffer space in a connection.
3208  *
3209  * @param c Connection.
3210  * @param fwd Is query about FWD traffic?
3211  *
3212  * @return Free buffer space [0 - max_msgs_queue/max_connections]
3213  */
3214 unsigned int
3215 GCC_get_buffer (struct CadetConnection *c, int fwd)
3216 {
3217   struct CadetFlowControl *fc;
3218
3219   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3220
3221   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Get %s buffer on %s: %u - %u\n",
3222        GC_f2s (fwd), GCC_2s (c), fc->queue_max, fc->queue_n);
3223   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
3224
3225   return (fc->queue_max - fc->queue_n);
3226 }
3227
3228
3229 /**
3230  * Get how many messages have we allowed to send to us from a direction.
3231  *
3232  * @param c Connection.
3233  * @param fwd Are we asking about traffic from FWD (BCK messages)?
3234  *
3235  * @return last_ack_sent - last_pid_recv
3236  */
3237 unsigned int
3238 GCC_get_allowed (struct CadetConnection *c, int fwd)
3239 {
3240   struct CadetFlowControl *fc;
3241
3242   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3243   if (CADET_CONNECTION_READY != c->state
3244       || GC_is_pid_bigger (fc->last_pid_recv, fc->last_ack_sent))
3245   {
3246     return 0;
3247   }
3248   return (fc->last_ack_sent - fc->last_pid_recv);
3249 }
3250
3251
3252 /**
3253  * Get messages queued in a connection.
3254  *
3255  * @param c Connection.
3256  * @param fwd Is query about FWD traffic?
3257  *
3258  * @return Number of messages queued.
3259  */
3260 unsigned int
3261 GCC_get_qn (struct CadetConnection *c, int fwd)
3262 {
3263   struct CadetFlowControl *fc;
3264
3265   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3266
3267   return fc->queue_n;
3268 }
3269
3270
3271 /**
3272  * Get next PID to use.
3273  *
3274  * @param c Connection.
3275  * @param fwd Is query about FWD traffic?
3276  *
3277  * @return Last PID used + 1.
3278  */
3279 unsigned int
3280 GCC_get_pid (struct CadetConnection *c, int fwd)
3281 {
3282   struct CadetFlowControl *fc;
3283
3284   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3285
3286   return fc->last_pid_sent + 1;
3287 }
3288
3289
3290 /**
3291  * Allow the connection to advertise a buffer of the given size.
3292  *
3293  * The connection will send an @c fwd ACK message (so: in direction !fwd)
3294  * allowing up to last_pid_recv + buffer.
3295  *
3296  * @param c Connection.
3297  * @param buffer How many more messages the connection can accept.
3298  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
3299  */
3300 void
3301 GCC_allow (struct CadetConnection *c, unsigned int buffer, int fwd)
3302 {
3303   LOG (GNUNET_ERROR_TYPE_DEBUG, "  allowing %s %u messages %s\n",
3304        GCC_2s (c), buffer, GC_f2s (fwd));
3305   send_ack (c, buffer, fwd, GNUNET_NO);
3306 }
3307
3308
3309 /**
3310  * Notify other peers on a connection of a broken link. Mark connections
3311  * to destroy after all traffic has been sent.
3312  *
3313  * @param c Connection on which there has been a disconnection.
3314  * @param peer Peer that disconnected.
3315  */
3316 void
3317 GCC_neighbor_disconnected (struct CadetConnection *c, struct CadetPeer *peer)
3318 {
3319   struct CadetPeer *hop;
3320   char peer_name[16];
3321   int fwd;
3322
3323   GCC_check_connections ();
3324   strncpy (peer_name, GCP_2s (peer), 16);
3325   peer_name[15] = '\0';
3326   LOG (GNUNET_ERROR_TYPE_DEBUG,
3327        "shutting down %s, %s disconnected\n",
3328        GCC_2s (c), peer_name);
3329
3330   invalidate_paths (c, peer);
3331
3332   hop = get_prev_hop (c);
3333   if (NULL == hop)
3334   {
3335     /* Path was NULL, we should have deleted the connection. */
3336     GNUNET_break (0);
3337     return;
3338   }
3339   fwd = (peer == hop);
3340   if ( (GNUNET_YES == GCC_is_terminal (c, fwd)) ||
3341        (GNUNET_NO != c->destroy) )
3342   {
3343     /* Local shutdown, or other peer already down (hence 'c->destroy');
3344        so there is no one to notify about this, just clean up. */
3345     GCC_destroy (c);
3346     GCC_check_connections ();
3347     return;
3348   }
3349   send_broken (c, &my_full_id, GCP_get_id (peer), fwd);
3350   /* Connection will have at least one pending message
3351    * (the one we just scheduled), so delay destruction
3352    * and remove from map so we don't use accidentally. */
3353   c->destroy = GNUNET_YES;
3354   c->state = CADET_CONNECTION_DESTROYED;
3355   GNUNET_assert (GNUNET_NO == c->was_removed);
3356   c->was_removed = GNUNET_YES;
3357   GNUNET_break (GNUNET_YES ==
3358                 GNUNET_CONTAINER_multihashmap_remove (connections,
3359                                                       GCC_get_h (c),
3360                                                       c));
3361   /* Cancel queue in the direction that just died. */
3362   connection_cancel_queues (c, ! fwd);
3363   GCC_stop_poll (c, ! fwd);
3364   unregister_neighbors (c);
3365   GCC_check_connections ();
3366 }
3367
3368
3369 /**
3370  * Is this peer the first one on the connection?
3371  *
3372  * @param c Connection.
3373  * @param fwd Is this about fwd traffic?
3374  *
3375  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
3376  */
3377 int
3378 GCC_is_origin (struct CadetConnection *c, int fwd)
3379 {
3380   if (!fwd && c->path->length - 1 == c->own_pos )
3381     return GNUNET_YES;
3382   if (fwd && 0 == c->own_pos)
3383     return GNUNET_YES;
3384   return GNUNET_NO;
3385 }
3386
3387
3388 /**
3389  * Is this peer the last one on the connection?
3390  *
3391  * @param c Connection.
3392  * @param fwd Is this about fwd traffic?
3393  *            Note that the ROOT is the terminal for BCK traffic!
3394  *
3395  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
3396  */
3397 int
3398 GCC_is_terminal (struct CadetConnection *c, int fwd)
3399 {
3400   return GCC_is_origin (c, ! fwd);
3401 }
3402
3403
3404 /**
3405  * See if we are allowed to send by the next hop in the given direction.
3406  *
3407  * @param c Connection.
3408  * @param fwd Is this about fwd traffic?
3409  *
3410  * @return #GNUNET_YES in case it's OK to send.
3411  */
3412 int
3413 GCC_is_sendable (struct CadetConnection *c, int fwd)
3414 {
3415   struct CadetFlowControl *fc;
3416
3417   LOG (GNUNET_ERROR_TYPE_DEBUG,
3418        " checking sendability of %s traffic on %s\n",
3419        GC_f2s (fwd), GCC_2s (c));
3420   if (NULL == c)
3421   {
3422     GNUNET_break (0);
3423     return GNUNET_YES;
3424   }
3425   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3426   LOG (GNUNET_ERROR_TYPE_DEBUG,
3427        " last ack recv: %u, last pid sent: %u\n",
3428        fc->last_ack_recv, fc->last_pid_sent);
3429   if (GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
3430   {
3431     LOG (GNUNET_ERROR_TYPE_DEBUG, " sendable\n");
3432     return GNUNET_YES;
3433   }
3434   LOG (GNUNET_ERROR_TYPE_DEBUG, " not sendable\n");
3435   return GNUNET_NO;
3436 }
3437
3438
3439 /**
3440  * Check if this connection is a direct one (never trim a direct connection).
3441  *
3442  * @param c Connection.
3443  *
3444  * @return #GNUNET_YES in case it's a direct connection, #GNUNET_NO otherwise.
3445  */
3446 int
3447 GCC_is_direct (struct CadetConnection *c)
3448 {
3449   return (c->path->length == 2) ? GNUNET_YES : GNUNET_NO;
3450 }
3451
3452
3453 /**
3454  * Sends an already built message on a connection, properly registering
3455  * all used resources.
3456  *
3457  * @param message Message to send. Function makes a copy of it.
3458  *                If message is not hop-by-hop, decrements TTL of copy.
3459  * @param payload_type Type of payload, in case the message is encrypted.
3460  * @param c Connection on which this message is transmitted.
3461  * @param fwd Is this a fwd message?
3462  * @param force Force the connection to accept the message (buffer overfill).
3463  * @param cont Continuation called once message is sent. Can be NULL.
3464  * @param cont_cls Closure for @c cont.
3465  *
3466  * @return Handle to cancel the message before it's sent.
3467  *         NULL on error or if @c cont is NULL.
3468  *         Invalid on @c cont call.
3469  */
3470 struct CadetConnectionQueue *
3471 GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
3472                            uint16_t payload_type, uint32_t payload_id,
3473                            struct CadetConnection *c, int fwd, int force,
3474                            GCC_sent cont, void *cont_cls)
3475 {
3476   struct CadetFlowControl *fc;
3477   struct CadetConnectionQueue *q;
3478   void *data;
3479   size_t size;
3480   uint16_t type;
3481   int droppable;
3482
3483   GCC_check_connections ();
3484   size = ntohs (message->size);
3485   data = GNUNET_malloc (size);
3486   memcpy (data, message, size);
3487   type = ntohs (message->type);
3488   LOG (GNUNET_ERROR_TYPE_INFO,
3489        "--> %s (%s %4u) on conn %s (%p) %s [%5u]\n",
3490        GC_m2s (type), GC_m2s (payload_type), payload_id, GCC_2s (c), c,
3491        GC_f2s(fwd), size);
3492
3493   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3494   droppable = GNUNET_NO == force;
3495   switch (type)
3496   {
3497     struct GNUNET_CADET_AX        *axmsg;
3498     struct GNUNET_CADET_Encrypted *emsg;
3499     struct GNUNET_CADET_KX        *kmsg;
3500     struct GNUNET_CADET_ACK       *amsg;
3501     struct GNUNET_CADET_Poll      *pmsg;
3502     struct GNUNET_CADET_ConnectionDestroy *dmsg;
3503     struct GNUNET_CADET_ConnectionBroken  *bmsg;
3504     uint32_t ttl;
3505
3506     case GNUNET_MESSAGE_TYPE_CADET_AX:
3507     case GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED:
3508       if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == type)
3509       {
3510         emsg = (struct GNUNET_CADET_Encrypted *) data;
3511         ttl = ntohl (emsg->ttl);
3512         if (0 == ttl)
3513         {
3514           GNUNET_break_op (0);
3515           GNUNET_free (data);
3516           return NULL;
3517         }
3518         emsg->cid = c->id;
3519         emsg->ttl = htonl (ttl - 1);
3520       }
3521       else
3522       {
3523         axmsg = (struct GNUNET_CADET_AX *) data;
3524         axmsg->cid = c->id;
3525       }
3526       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
3527       LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
3528       LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
3529       if (GNUNET_YES == droppable)
3530       {
3531         fc->queue_n++;
3532       }
3533       else
3534       {
3535         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not droppable, Q_N stays the same\n");
3536       }
3537       break;
3538
3539     case GNUNET_MESSAGE_TYPE_CADET_KX:
3540       kmsg = (struct GNUNET_CADET_KX *) data;
3541       kmsg->cid = c->id;
3542       break;
3543
3544     case GNUNET_MESSAGE_TYPE_CADET_ACK:
3545       amsg = (struct GNUNET_CADET_ACK *) data;
3546       amsg->cid = c->id;
3547       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
3548       droppable = GNUNET_NO;
3549       break;
3550
3551     case GNUNET_MESSAGE_TYPE_CADET_POLL:
3552       pmsg = (struct GNUNET_CADET_Poll *) data;
3553       pmsg->cid = c->id;
3554       LOG (GNUNET_ERROR_TYPE_DEBUG, " POLL %u\n", ntohl (pmsg->pid));
3555       droppable = GNUNET_NO;
3556       break;
3557
3558     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
3559       dmsg = (struct GNUNET_CADET_ConnectionDestroy *) data;
3560       dmsg->cid = c->id;
3561       break;
3562
3563     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
3564       bmsg = (struct GNUNET_CADET_ConnectionBroken *) data;
3565       bmsg->cid = c->id;
3566       break;
3567
3568     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
3569     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
3570       break;
3571
3572     default:
3573       GNUNET_break (0);
3574       GNUNET_free (data);
3575       return NULL;
3576   }
3577
3578   if (fc->queue_n > fc->queue_max && droppable)
3579   {
3580     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
3581                               1, GNUNET_NO);
3582     GNUNET_break (0);
3583     LOG (GNUNET_ERROR_TYPE_DEBUG, "queue full: %u/%u\n",
3584          fc->queue_n, fc->queue_max);
3585     if (GNUNET_MESSAGE_TYPE_CADET_ENCRYPTED == type
3586         || GNUNET_MESSAGE_TYPE_CADET_AX == type)
3587     {
3588       fc->queue_n--;
3589     }
3590     GNUNET_free (data);
3591     return NULL; /* Drop this message */
3592   }
3593
3594   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %s %u\n",
3595        GCC_2s (c), c->pending_messages);
3596   c->pending_messages++;
3597
3598   q = GNUNET_new (struct CadetConnectionQueue);
3599   q->forced = !droppable;
3600   q->q = GCP_queue_add (get_hop (c, fwd), data, type, payload_type, payload_id,
3601                         size, c, fwd, &conn_message_sent, q);
3602   if (NULL == q->q)
3603   {
3604     LOG (GNUNET_ERROR_TYPE_DEBUG, "dropping msg on %s, NULL q\n", GCC_2s (c));
3605     GNUNET_free (data);
3606     GNUNET_free (q);
3607     GCC_check_connections ();
3608     return NULL;
3609   }
3610   q->cont = cont;
3611   q->cont_cls = cont_cls;
3612   GCC_check_connections ();
3613   return (NULL == cont) ? NULL : q;
3614 }
3615
3616
3617 /**
3618  * Cancel a previously sent message while it's in the queue.
3619  *
3620  * ONLY can be called before the continuation given to the send function
3621  * is called. Once the continuation is called, the message is no longer in the
3622  * queue.
3623  *
3624  * @param q Handle to the queue.
3625  */
3626 void
3627 GCC_cancel (struct CadetConnectionQueue *q)
3628 {
3629   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GCC cancel message\n");
3630
3631   /* queue destroy calls message_sent, which calls q->cont and frees q */
3632   GCP_queue_destroy (q->q, GNUNET_YES, GNUNET_NO, 0);
3633   GCC_check_connections ();
3634 }
3635
3636
3637 /**
3638  * Sends a CREATE CONNECTION message for a path to a peer.
3639  * Changes the connection and tunnel states if necessary.
3640  *
3641  * @param connection Connection to create.
3642  */
3643 void
3644 GCC_send_create (struct CadetConnection *connection)
3645 {
3646   enum CadetTunnelCState state;
3647   size_t size;
3648
3649   GCC_check_connections ();
3650   size = sizeof (struct GNUNET_CADET_ConnectionCreate);
3651   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
3652
3653   LOG (GNUNET_ERROR_TYPE_INFO, "==> %s %19s on conn %s (%p) FWD [%5u]\n",
3654        GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE), "",
3655        GCC_2s (connection), connection, size);
3656   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
3657        connection, connection->pending_messages);
3658   connection->pending_messages++;
3659
3660   connection->maintenance_q =
3661     GCP_queue_add (get_next_hop (connection), NULL,
3662                    GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, UINT16_MAX, 0,
3663                    size, connection, GNUNET_YES, &conn_message_sent, NULL);
3664
3665   state = GCT_get_cstate (connection->t);
3666   if (CADET_TUNNEL_SEARCHING == state || CADET_TUNNEL_NEW == state)
3667     GCT_change_cstate (connection->t, CADET_TUNNEL_WAITING);
3668   if (CADET_CONNECTION_NEW == connection->state)
3669     connection_change_state (connection, CADET_CONNECTION_SENT);
3670   GCC_check_connections ();
3671 }
3672
3673
3674 /**
3675  * Send a message to all peers in this connection that the connection
3676  * is no longer valid.
3677  *
3678  * If some peer should not receive the message, it should be zero'ed out
3679  * before calling this function.
3680  *
3681  * @param c The connection whose peers to notify.
3682  */
3683 void
3684 GCC_send_destroy (struct CadetConnection *c)
3685 {
3686   struct GNUNET_CADET_ConnectionDestroy msg;
3687
3688   if (GNUNET_YES == c->destroy)
3689     return;
3690   GCC_check_connections ();
3691   msg.header.size = htons (sizeof (msg));
3692   msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
3693   msg.cid = c->id;
3694   LOG (GNUNET_ERROR_TYPE_DEBUG,
3695               "  sending connection destroy for connection %s\n",
3696               GCC_2s (c));
3697
3698   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_YES))
3699     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, UINT16_MAX,
3700                                                       0, c, GNUNET_YES,
3701                                                       GNUNET_YES, NULL, NULL));
3702   if (GNUNET_NO == GCC_is_terminal (c, GNUNET_NO))
3703     GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg.header, UINT16_MAX,
3704                                                       0, c, GNUNET_NO,
3705                                                       GNUNET_YES, NULL, NULL));
3706   c->destroy = GNUNET_YES;
3707   c->state = CADET_CONNECTION_DESTROYED;
3708   GCC_check_connections ();
3709 }
3710
3711
3712 /**
3713  * @brief Start a polling timer for the connection.
3714  *
3715  * When a neighbor does not accept more traffic on the connection it could be
3716  * caused by a simple congestion or by a lost ACK. Polling enables to check
3717  * for the lastest ACK status for a connection.
3718  *
3719  * @param c Connection.
3720  * @param fwd Should we poll in the FWD direction?
3721  */
3722 void
3723 GCC_start_poll (struct CadetConnection *c, int fwd)
3724 {
3725   struct CadetFlowControl *fc;
3726
3727   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3728   LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL %s requested\n",
3729        GC_f2s (fwd));
3730   if (NULL != fc->poll_task || NULL != fc->poll_msg)
3731   {
3732     LOG (GNUNET_ERROR_TYPE_DEBUG, "  POLL not needed (%p, %p)\n",
3733          fc->poll_task, fc->poll_msg);
3734     return;
3735   }
3736   LOG (GNUNET_ERROR_TYPE_DEBUG, "POLL started on request\n");
3737   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
3738                                                 &connection_poll,
3739                                                 fc);
3740 }
3741
3742
3743 /**
3744  * @brief Stop polling a connection for ACKs.
3745  *
3746  * Once we have enough ACKs for future traffic, polls are no longer necessary.
3747  *
3748  * @param c Connection.
3749  * @param fwd Should we stop the poll in the FWD direction?
3750  */
3751 void
3752 GCC_stop_poll (struct CadetConnection *c, int fwd)
3753 {
3754   struct CadetFlowControl *fc;
3755
3756   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3757   if (NULL != fc->poll_task)
3758   {
3759     GNUNET_SCHEDULER_cancel (fc->poll_task);
3760     fc->poll_task = NULL;
3761   }
3762 }
3763
3764
3765 /**
3766  * Get a (static) string for a connection.
3767  *
3768  * @param c Connection.
3769  */
3770 const char *
3771 GCC_2s (const struct CadetConnection *c)
3772 {
3773   if (NULL == c)
3774     return "NULL";
3775
3776   if (NULL != c->t)
3777   {
3778     static char buf[128];
3779
3780     SPRINTF (buf, "%s (->%s)",
3781              GNUNET_h2s (GC_h2hc (GCC_get_id (c))), GCT_2s (c->t));
3782     return buf;
3783   }
3784   return GNUNET_h2s (GC_h2hc (&c->id));
3785 }
3786
3787
3788 /**
3789  * Log all possible info about the connection state.
3790  *
3791  * @param c Connection to debug.
3792  * @param level Debug level to use.
3793  */
3794 void
3795 GCC_debug (const struct CadetConnection *c, enum GNUNET_ErrorType level)
3796 {
3797   int do_log;
3798   char *s;
3799
3800   do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
3801                                        "cadet-con",
3802                                        __FILE__, __FUNCTION__, __LINE__);
3803   if (0 == do_log)
3804     return;
3805
3806   if (NULL == c)
3807   {
3808     LOG2 (level, "CCC DEBUG NULL CONNECTION\n");
3809     return;
3810   }
3811
3812   LOG2 (level, "CCC DEBUG CONNECTION %s\n", GCC_2s (c));
3813   s = path_2s (c->path);
3814   LOG2 (level, "CCC  path %s, own pos: %u\n", s, c->own_pos);
3815   GNUNET_free (s);
3816   LOG2 (level, "CCC  state: %s, destroy: %u\n",
3817         GCC_state2s (c->state), c->destroy);
3818   LOG2 (level, "CCC  pending messages: %u\n", c->pending_messages);
3819   if (NULL != c->perf)
3820     LOG2 (level, "CCC  us/byte: %f\n", c->perf->avg);
3821
3822   LOG2 (level, "CCC  FWD flow control:\n");
3823   LOG2 (level, "CCC   queue: %u/%u\n", c->fwd_fc.queue_n, c->fwd_fc.queue_max);
3824   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3825         c->fwd_fc.last_pid_sent, c->fwd_fc.last_pid_recv);
3826   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3827         c->fwd_fc.last_ack_sent, c->fwd_fc.last_ack_recv);
3828   LOG2 (level, "CCC   recv PID bitmap: %X\n", c->fwd_fc.recv_bitmap);
3829   LOG2 (level, "CCC   poll: task %d, msg  %p, msg_ack %p)\n",
3830         c->fwd_fc.poll_task, c->fwd_fc.poll_msg, c->fwd_fc.ack_msg);
3831
3832   LOG2 (level, "CCC  BCK flow control:\n");
3833   LOG2 (level, "CCC   queue: %u/%u\n", c->bck_fc.queue_n, c->bck_fc.queue_max);
3834   LOG2 (level, "CCC   last PID sent: %5u, recv: %5u\n",
3835         c->bck_fc.last_pid_sent, c->bck_fc.last_pid_recv);
3836   LOG2 (level, "CCC   last ACK sent: %5u, recv: %5u\n",
3837         c->bck_fc.last_ack_sent, c->bck_fc.last_ack_recv);
3838   LOG2 (level, "CCC   recv PID bitmap: %X\n", c->bck_fc.recv_bitmap);
3839   LOG2 (level, "CCC   poll: task %d, msg  %p, msg_ack %p)\n",
3840         c->bck_fc.poll_task, c->bck_fc.poll_msg, c->bck_fc.ack_msg);
3841
3842   LOG2 (level, "CCC DEBUG CONNECTION END\n");
3843 }