- use shorter timeout for connection establishment (CREATE/ACK) to avoid timeout...
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh_connection.c
1 /*
2      This file is part of GNUnet.
3      (C) 2001-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file mesh/gnunet-service-mesh_connection.c
23  * @brief GNUnet MESH service connection handling
24  * @author Bartlomiej Polot
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30 #include "gnunet_statistics_service.h"
31
32 #include "mesh_path.h"
33 #include "mesh_protocol.h"
34 #include "mesh.h"
35 #include "gnunet-service-mesh_connection.h"
36 #include "gnunet-service-mesh_peer.h"
37 #include "gnunet-service-mesh_tunnel.h"
38
39
40 #define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__)
41
42 #define MESH_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
43                                   GNUNET_TIME_UNIT_MINUTES,\
44                                   10)
45 #define AVG_MSGS                32
46
47
48 /******************************************************************************/
49 /********************************   STRUCTS  **********************************/
50 /******************************************************************************/
51
52 /**
53  * Struct to encapsulate all the Flow Control information to a peer to which
54  * we are directly connected (on a core level).
55  */
56 struct MeshFlowControl
57 {
58   /**
59    * Connection this controls.
60    */
61   struct MeshConnection *c;
62
63   /**
64    * How many messages are in the queue on this connection.
65    */
66   unsigned int queue_n;
67
68   /**
69    * How many messages do we accept in the queue.
70    */
71   unsigned int queue_max;
72
73   /**
74    * Next ID to use.
75    */
76   uint32_t next_pid;
77
78   /**
79    * ID of the last packet sent towards the peer.
80    */
81   uint32_t last_pid_sent;
82
83   /**
84    * ID of the last packet received from the peer.
85    */
86   uint32_t last_pid_recv;
87
88   /**
89    * Last ACK sent to the peer (peer can't send more than this PID).
90    */
91   uint32_t last_ack_sent;
92
93   /**
94    * Last ACK sent towards the origin (for traffic towards leaf node).
95    */
96   uint32_t last_ack_recv;
97
98   /**
99    * Task to poll the peer in case of a lost ACK causes stall.
100    */
101   GNUNET_SCHEDULER_TaskIdentifier poll_task;
102
103   /**
104    * How frequently to poll for ACKs.
105    */
106   struct GNUNET_TIME_Relative poll_time;
107
108   /**
109    * Queued poll message, to cancel if not necessary anymore (got ACK).
110    */
111   struct MeshConnectionQueue *poll_msg;
112
113   /**
114    * Queued poll message, to cancel if not necessary anymore (got ACK).
115    */
116   struct MeshConnectionQueue *ack_msg;
117 };
118
119 /**
120  * Keep a record of the last messages sent on this connection.
121  */
122 struct MeshConnectionPerformance
123 {
124   /**
125    * Circular buffer for storing measurements.
126    */
127   double usecsperbyte[AVG_MSGS];
128
129   /**
130    * Running average of @c usecsperbyte.
131    */
132   double avg;
133
134   /**
135    * How many values of @c usecsperbyte are valid.
136    */
137   uint16_t size;
138
139   /**
140    * Index of the next "free" position in @c usecsperbyte.
141    */
142   uint16_t idx;
143 };
144
145
146 /**
147  * Struct containing all information regarding a connection to a peer.
148  */
149 struct MeshConnection
150 {
151   /**
152    * Tunnel this connection is part of.
153    */
154   struct MeshTunnel3 *t;
155
156   /**
157    * Flow control information for traffic fwd.
158    */
159   struct MeshFlowControl fwd_fc;
160
161   /**
162    * Flow control information for traffic bck.
163    */
164   struct MeshFlowControl bck_fc;
165
166   /**
167    * Measure connection performance on the endpoint.
168    */
169   struct MeshConnectionPerformance *perf;
170
171   /**
172    * ID of the connection.
173    */
174   struct GNUNET_HashCode id;
175
176   /**
177    * State of the connection.
178    */
179   enum MeshConnectionState state;
180
181   /**
182    * Path being used for the tunnel.
183    */
184   struct MeshPeerPath *path;
185
186   /**
187    * Position of the local peer in the path.
188    */
189   unsigned int own_pos;
190
191   /**
192    * Task to keep the used paths alive at the owner,
193    * time tunnel out on all the other peers.
194    */
195   GNUNET_SCHEDULER_TaskIdentifier fwd_maintenance_task;
196
197   /**
198    * Task to keep the used paths alive at the destination,
199    * time tunnel out on all the other peers.
200    */
201   GNUNET_SCHEDULER_TaskIdentifier bck_maintenance_task;
202
203   /**
204    * Pending message count.
205    */
206   int pending_messages;
207
208   /**
209    * Destroy flag: if true, destroy on last message.
210    */
211   int destroy;
212 };
213
214 /**
215  * Handle for messages queued but not yet sent.
216  */
217 struct MeshConnectionQueue
218 {
219   struct MeshPeerQueue *q;
220   GMC_sent cont;
221   void *cont_cls;
222 };
223
224 /******************************************************************************/
225 /*******************************   GLOBALS  ***********************************/
226 /******************************************************************************/
227
228 /**
229  * Global handle to the statistics service.
230  */
231 extern struct GNUNET_STATISTICS_Handle *stats;
232
233 /**
234  * Local peer own ID (memory efficient handle).
235  */
236 extern GNUNET_PEER_Id myid;
237
238 /**
239  * Local peer own ID (full value).
240  */
241 extern struct GNUNET_PeerIdentity my_full_id;
242
243 /**
244  * Connections known, indexed by cid (MeshConnection).
245  */
246 static struct GNUNET_CONTAINER_MultiHashMap *connections;
247
248 /**
249  * How many connections are we willing to maintain.
250  * Local connections are always allowed, even if there are more connections than max.
251  */
252 static unsigned long long max_connections;
253
254 /**
255  * How many messages *in total* are we willing to queue, divide by number of
256  * connections to get connection queue size.
257  */
258 static unsigned long long max_msgs_queue;
259
260 /**
261  * How often to send path keepalives. Paths timeout after 4 missed.
262  */
263 static struct GNUNET_TIME_Relative refresh_connection_time;
264
265 /**
266  * How often to send path create / ACKs.
267  */
268 static struct GNUNET_TIME_Relative create_connection_time;
269
270
271 /******************************************************************************/
272 /********************************   STATIC  ***********************************/
273 /******************************************************************************/
274
275 #if 0 // avoid compiler warning for unused static function
276 static void
277 fc_debug (struct MeshFlowControl *fc)
278 {
279   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
280               fc->last_pid_recv, fc->last_ack_sent);
281   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
282               fc->last_pid_sent, fc->last_ack_recv);
283   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
284               fc->queue_n, fc->queue_max);
285 }
286
287 static void
288 connection_debug (struct MeshConnection *c)
289 {
290   if (NULL == c)
291   {
292     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CONNECTION ***\n");
293     return;
294   }
295   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
296               peer2s (c->t->peer), GMC_2s (c));
297   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
298               c->state, c->pending_messages);
299   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
300   fc_debug (&c->fwd_fc);
301   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
302   fc_debug (&c->bck_fc);
303 }
304 #endif
305
306 /**
307  * Get string description for tunnel state.
308  *
309  * @param s Tunnel state.
310  *
311  * @return String representation.
312  */
313 static const char *
314 GMC_state2s (enum MeshConnectionState s)
315 {
316   switch (s)
317   {
318     case MESH_CONNECTION_NEW:
319       return "MESH_CONNECTION_NEW";
320     case MESH_CONNECTION_SENT:
321       return "MESH_CONNECTION_SENT";
322     case MESH_CONNECTION_ACK:
323       return "MESH_CONNECTION_ACK";
324     case MESH_CONNECTION_READY:
325       return "MESH_CONNECTION_READY";
326     default:
327       return "MESH_CONNECTION_STATE_ERROR";
328   }
329 }
330
331
332 /**
333  * Initialize a Flow Control structure to the initial state.
334  *
335  * @param fc Flow Control structure to initialize.
336  */
337 static void
338 fc_init (struct MeshFlowControl *fc)
339 {
340   fc->next_pid = 0;
341   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
342   fc->last_pid_recv = (uint32_t) -1;
343   fc->last_ack_sent = (uint32_t) 0;
344   fc->last_ack_recv = (uint32_t) 0;
345   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
346   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
347   fc->queue_n = 0;
348   fc->queue_max = (max_msgs_queue / max_connections) + 1;
349 }
350
351
352 /**
353  * Find a connection.
354  *
355  * @param cid Connection ID.
356  */
357 static struct MeshConnection *
358 connection_get (const struct GNUNET_HashCode *cid)
359 {
360   return GNUNET_CONTAINER_multihashmap_get (connections, cid);
361 }
362
363
364 static void
365 connection_change_state (struct MeshConnection* c,
366                          enum MeshConnectionState state)
367 {
368   LOG (GNUNET_ERROR_TYPE_DEBUG,
369               "Connection %s state was %s\n",
370               GMC_2s (c), GMC_state2s (c->state));
371   LOG (GNUNET_ERROR_TYPE_DEBUG,
372               "Connection %s state is now %s\n",
373               GMC_2s (c), GMC_state2s (state));
374   c->state = state;
375 }
376
377
378 /**
379  * Callback called when a queued ACK message is sent.
380  *
381  * @param cls Closure (FC).
382  * @param c Connection this message was on.
383  * @param type Type of message sent.
384  * @param fwd Was this a FWD going message?
385  * @param size Size of the message.
386  */
387 static void
388 ack_sent (void *cls,
389           struct MeshConnection *c,
390           struct MeshConnectionQueue *q,
391           uint16_t type, int fwd, size_t size)
392 {
393   struct MeshFlowControl *fc = cls;
394
395   fc->ack_msg = NULL;
396 }
397
398
399 /**
400  * Send an ACK on the connection, informing the predecessor about
401  * the available buffer space. Should not be called in case the peer
402  * is origin (no predecessor) in the @c fwd direction.
403  *
404  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
405  * the ACK itself goes "back" (dest->root).
406  *
407  * @param c Connection on which to send the ACK.
408  * @param buffer How much space free to advertise?
409  * @param fwd Is this FWD ACK? (Going dest -> root)
410  * @param force Don't optimize out.
411  */
412 static void
413 send_ack (struct MeshConnection *c, unsigned int buffer, int fwd, int force)
414 {
415   struct MeshFlowControl *next_fc;
416   struct MeshFlowControl *prev_fc;
417   struct GNUNET_MESH_ACK msg;
418   uint32_t ack;
419   int delta;
420
421   /* If origin, there is no connection to send ACKs. Wrong function! */
422   if (GMC_is_origin (c, fwd))
423   {
424     GNUNET_break (0);
425     return;
426   }
427
428   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
429   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
430
431   LOG (GNUNET_ERROR_TYPE_DEBUG,
432               "connection send %s ack on %s\n",
433               fwd ? "FWD" : "BCK", GMC_2s (c));
434
435   /* Check if we need to transmit the ACK. */
436   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
437   if (3 < delta && buffer < delta && GNUNET_NO == force)
438   {
439     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
440     LOG (GNUNET_ERROR_TYPE_DEBUG,
441          "  last pid recv: %u, last ack sent: %u\n",
442          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
443     return;
444   }
445
446   /* Ok, ACK might be necessary, what PID to ACK? */
447   ack = prev_fc->last_pid_recv + buffer;
448   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
449   LOG (GNUNET_ERROR_TYPE_DEBUG,
450        " last pid %u, last ack %u, qmax %u, q %u\n",
451        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
452        next_fc->queue_max, next_fc->queue_n);
453   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
454   {
455     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
456     return;
457   }
458
459   /* Check if message is already in queue */
460   if (NULL != prev_fc->ack_msg)
461   {
462     if (GMC_is_pid_bigger (ack, prev_fc->last_ack_sent))
463     {
464       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
465       GMC_cancel (prev_fc->ack_msg);
466       /* GMC_cancel triggers ack_sent(), which clears fc->ack_msg */
467     }
468     else
469     {
470       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
471       return;
472     }
473   }
474
475   prev_fc->last_ack_sent = ack;
476
477   /* Build ACK message and send on connection */
478   msg.header.size = htons (sizeof (msg));
479   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
480   msg.ack = htonl (ack);
481   msg.cid = c->id;
482
483   prev_fc->ack_msg = GMC_send_prebuilt_message (&msg.header, c, !fwd,
484                                                 &ack_sent, prev_fc);
485 }
486
487
488 /**
489  * Callback called when a queued message is sent.
490  *
491  * Calculates the average time and connection packet tracking.
492  *
493  * @param cls Closure (ConnectionQueue Handle).
494  * @param c Connection this message was on.
495  * @param type Type of message sent.
496  * @param fwd Was this a FWD going message?
497  * @param size Size of the message.
498  * @param wait Time spent waiting for core (only the time for THIS message)
499  */
500 static void 
501 message_sent (void *cls,
502               struct MeshConnection *c, uint16_t type,
503               int fwd, size_t size,
504               struct GNUNET_TIME_Relative wait)
505 {
506   struct MeshConnectionPerformance *p;
507   struct MeshFlowControl *fc;
508   struct MeshConnectionQueue *q = cls;
509   double usecsperbyte;
510
511   fc = fwd ? &c->fwd_fc : &c->bck_fc;
512   LOG (GNUNET_ERROR_TYPE_DEBUG,
513        "!  sent %s %s\n",
514        fwd ? "FWD" : "BCK",
515        GNUNET_MESH_DEBUG_M2S (type));
516   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  C_P- %p %u\n", c, c->pending_messages);
517   if (NULL != q)
518   {
519     if (NULL != q->cont)
520     {
521       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  calling cont\n");
522       q->cont (q->cont_cls, c, q, type, fwd, size);
523     }
524     GNUNET_free (q);
525   }
526   c->pending_messages--;
527   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
528   {
529     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
530     GMC_destroy (c);
531     return;
532   }
533   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
534   switch (type)
535   {
536     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
537       fc->last_pid_sent++;
538       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
539       fc->queue_n--;
540       LOG (GNUNET_ERROR_TYPE_DEBUG,
541            "!   accounting pid %u\n",
542            fc->last_pid_sent);
543       GMC_send_ack (c, fwd, GNUNET_NO);
544       break;
545
546     case GNUNET_MESSAGE_TYPE_MESH_POLL:
547       fc->poll_msg = NULL;
548       break;
549
550     case GNUNET_MESSAGE_TYPE_MESH_ACK:
551       fc->ack_msg = NULL;
552       break;
553
554     default:
555       break;
556   }
557   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
558
559   if (NULL == c->perf)
560     return; /* Only endpoints are interested in timing. */
561
562   p = c->perf;
563   usecsperbyte = ((double) wait.rel_value_us) / size;
564   if (p->size == AVG_MSGS)
565   {
566     /* Array is full. Substract oldest value, add new one and store. */
567     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
568     p->usecsperbyte[p->idx] = usecsperbyte;
569     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
570   }
571   else
572   {
573     /* Array not yet full. Add current value to avg and store. */
574     p->usecsperbyte[p->idx] = usecsperbyte;
575     p->avg *= p->size;
576     p->avg += p->usecsperbyte[p->idx];
577     p->size++;
578     p->avg /= p->size;
579   }
580   p->idx = (p->idx + 1) % AVG_MSGS;
581 }
582
583
584 /**
585  * Get the previous hop in a connection
586  *
587  * @param c Connection.
588  *
589  * @return Previous peer in the connection.
590  */
591 static struct MeshPeer *
592 get_prev_hop (const struct MeshConnection *c)
593 {
594   GNUNET_PEER_Id id;
595
596   LOG (GNUNET_ERROR_TYPE_DEBUG, "Get prev hop, own pos %u\n", c->own_pos);
597   if (0 == c->own_pos || c->path->length < 2)
598     id = c->path->peers[0];
599   else
600     id = c->path->peers[c->own_pos - 1];
601
602   return GMP_get_short (id);
603 }
604
605
606 /**
607  * Get the next hop in a connection
608  *
609  * @param c Connection.
610  *
611  * @return Next peer in the connection.
612  */
613 static struct MeshPeer *
614 get_next_hop (const struct MeshConnection *c)
615 {
616   GNUNET_PEER_Id id;
617
618   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
619     id = c->path->peers[c->path->length - 1];
620   else
621     id = c->path->peers[c->own_pos + 1];
622
623   return GMP_get_short (id);
624 }
625
626
627 /**
628  * Get the hop in a connection.
629  *
630  * @param c Connection.
631  * @param fwd Next hop?
632  *
633  * @return Next peer in the connection.
634  */
635 static struct MeshPeer *
636 get_hop (struct MeshConnection *c, int fwd)
637 {
638   if (fwd)
639     return get_next_hop (c);
640   return get_prev_hop (c);
641 }
642
643
644 /**
645  * Is traffic coming from this sender 'FWD' traffic?
646  *
647  * @param c Connection to check.
648  * @param sender Peer identity of neighbor.
649  *
650  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
651  *         the traffic is 'FWD'.
652  *         #GNUNET_NO for BCK.
653  *         #GNUNET_SYSERR for errors.
654  */
655 static int 
656 is_fwd (const struct MeshConnection *c,
657         const struct GNUNET_PeerIdentity *sender)
658 {
659   GNUNET_PEER_Id id;
660
661   id = GNUNET_PEER_search (sender);
662   if (GMP_get_short_id (get_prev_hop (c)) == id)
663     return GNUNET_YES;
664
665   if (GMP_get_short_id (get_next_hop (c)) == id)
666     return GNUNET_NO;
667
668   GNUNET_break (0);
669   return GNUNET_SYSERR;
670 }
671
672
673 /**
674  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
675  * or a first CONNECTION_ACK directed to us.
676  *
677  * @param connection Connection to confirm.
678  * @param fwd Should we send it FWD? (root->dest)
679  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
680  */
681 static void
682 send_connection_ack (struct MeshConnection *connection, int fwd)
683 {
684   struct MeshTunnel3 *t;
685
686   t = connection->t;
687   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection %s ACK\n",
688        !fwd ? "FWD" : "BCK");
689   GMP_queue_add (get_hop (connection, fwd), NULL,
690                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
691                  sizeof (struct GNUNET_MESH_ConnectionACK),
692                  connection, fwd, &message_sent, NULL);
693   connection->pending_messages++;
694   if (MESH_TUNNEL3_NEW == GMT_get_state (t))
695     GMT_change_state (t, MESH_TUNNEL3_WAITING);
696   if (MESH_CONNECTION_READY != connection->state)
697     connection_change_state (connection, MESH_CONNECTION_SENT);
698 }
699
700
701 /**
702  * Send a notification that a connection is broken.
703  *
704  * @param c Connection that is broken.
705  * @param id1 Peer that has disconnected.
706  * @param id2 Peer that has disconnected.
707  * @param fwd Direction towards which to send it.
708  */
709 static void
710 send_broken (struct MeshConnection *c,
711              const struct GNUNET_PeerIdentity *id1,
712              const struct GNUNET_PeerIdentity *id2,
713              int fwd)
714 {
715   struct GNUNET_MESH_ConnectionBroken msg;
716
717   msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
718   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
719   msg.cid = c->id;
720   msg.peer1 = *id1;
721   msg.peer2 = *id2;
722   GMC_send_prebuilt_message (&msg.header, c, fwd, NULL, NULL);
723 }
724
725
726
727 /**
728  * Send keepalive packets for a connection.
729  *
730  * @param c Connection to keep alive..
731  * @param fwd Is this a FWD keepalive? (owner -> dest).
732  */
733 static void
734 connection_keepalive (struct MeshConnection *c, int fwd)
735 {
736   struct GNUNET_MESH_ConnectionKeepAlive *msg;
737   size_t size = sizeof (struct GNUNET_MESH_ConnectionKeepAlive);
738   char cbuf[size];
739   uint16_t type;
740
741   type = fwd ? GNUNET_MESSAGE_TYPE_MESH_FWD_KEEPALIVE :
742                GNUNET_MESSAGE_TYPE_MESH_BCK_KEEPALIVE;
743
744   LOG (GNUNET_ERROR_TYPE_DEBUG,
745        "sending %s keepalive for connection %s]\n",
746        fwd ? "FWD" : "BCK", GMC_2s (c));
747
748   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) cbuf;
749   msg->header.size = htons (size);
750   msg->header.type = htons (type);
751   msg->cid = c->id;
752
753   GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL);
754 }
755
756
757 /**
758  * Send CONNECTION_{CREATE/ACK} packets for a connection.
759  *
760  * @param c Connection for which to send the message.
761  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
762  */
763 static void
764 connection_recreate (struct MeshConnection *c, int fwd)
765 {
766   LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
767   if (fwd)
768     GMC_send_create (c);
769   else
770     send_connection_ack (c, GNUNET_NO);
771 }
772
773
774 /**
775  * Generic connection timer management.
776  * Depending on the role of the peer in the connection will send the
777  * appropriate message (build or keepalive)
778  *
779  * @param c Conncetion to maintain.
780  * @param fwd Is FWD?
781  */
782 static void
783 connection_maintain (struct MeshConnection *c, int fwd)
784 {
785   if (MESH_TUNNEL3_SEARCHING == GMT_get_state (c->t))
786   {
787     /* TODO DHT GET with RO_BART */
788     return;
789   }
790   switch (c->state)
791   {
792     case MESH_CONNECTION_NEW:
793       GNUNET_break (0);
794     case MESH_CONNECTION_SENT:
795       connection_recreate (c, fwd);
796       break;
797     case MESH_CONNECTION_READY:
798       connection_keepalive (c, fwd);
799       break;
800     default:
801       break;
802   }
803 }
804
805
806 static void
807 connection_fwd_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
808 {
809   struct MeshConnection *c = cls;
810   struct GNUNET_TIME_Relative delay;
811
812   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
813   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
814     return;
815
816   connection_maintain (c, GNUNET_YES);
817   delay = c->state == MESH_CONNECTION_READY ?
818           refresh_connection_time : create_connection_time;
819   c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
820                                                           &connection_fwd_keepalive,
821                                                           c);
822 }
823
824
825 static void
826 connection_bck_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
827 {
828   struct MeshConnection *c = cls;
829   struct GNUNET_TIME_Relative delay;
830
831   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
832   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
833     return;
834
835   connection_maintain (c, GNUNET_NO);
836   delay = c->state == MESH_CONNECTION_READY ?
837           refresh_connection_time : create_connection_time;
838   c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
839                                                           &connection_bck_keepalive,
840                                                           c);
841 }
842
843
844 /**
845  * @brief Re-initiate traffic on this connection if necessary.
846  *
847  * Check if there is traffic queued towards this peer
848  * and the core transmit handle is NULL (traffic was stalled).
849  * If so, call core tmt rdy.
850  *
851  * @param c Connection on which initiate traffic.
852  * @param fwd Is this about fwd traffic?
853  */
854 static void
855 connection_unlock_queue (struct MeshConnection *c, int fwd)
856 {
857   struct MeshPeer *peer;
858
859   LOG (GNUNET_ERROR_TYPE_DEBUG,
860               "connection_unlock_queue %s on %s\n",
861               fwd ? "FWD" : "BCK", GMC_2s (c));
862
863   if (GMC_is_terminal (c, fwd))
864   {
865     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal!\n");
866     return;
867   }
868
869   peer = get_hop (c, fwd);
870   GMP_queue_unlock (peer, c);
871 }
872
873
874 /**
875  * Cancel all transmissions that belong to a certain connection.
876  * 
877  * If the connection is scheduled for destruction and no more messages are left,
878  * the connection will be destroyed by the continuation call.
879  *
880  * @param c Connection which to cancel. Might be destroyed during this call.
881  * @param fwd Cancel fwd traffic?
882  */
883 static void
884 connection_cancel_queues (struct MeshConnection *c, int fwd)
885 {
886   struct MeshFlowControl *fc;
887   struct MeshPeer *peer;
888
889   if (NULL == c)
890   {
891     GNUNET_break (0);
892     return;
893   }
894
895   fc = fwd ? &c->fwd_fc : &c->bck_fc;
896   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
897   {
898     GNUNET_SCHEDULER_cancel (fc->poll_task);
899     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
900   }
901   peer = get_hop (c, fwd);
902   GMP_queue_cancel (peer, c);
903 }
904
905
906 /**
907  * Function called if a connection has been stalled for a while,
908  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
909  *
910  * @param cls Closure (poll ctx).
911  * @param tc TaskContext.
912  */
913 static void
914 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
915
916
917 /**
918  * Callback called when a queued POLL message is sent.
919  *
920  * @param cls Closure (FC).
921  * @param c Connection this message was on.
922  * @param type Type of message sent.
923  * @param fwd Was this a FWD going message?
924  * @param size Size of the message.
925  */
926 static void
927 poll_sent (void *cls,
928            struct MeshConnection *c,
929            struct MeshConnectionQueue *q,
930            uint16_t type, int fwd, size_t size)
931 {
932   struct MeshFlowControl *fc = cls;
933
934   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL sent, scheduling new one!\n");
935   fc->poll_msg = NULL;
936   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
937   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
938                                                 &connection_poll, fc);
939 }
940
941 /**
942  * Function called if a connection has been stalled for a while,
943  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
944  *
945  * @param cls Closure (poll ctx).
946  * @param tc TaskContext.
947  */
948 static void
949 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
950 {
951   struct MeshFlowControl *fc = cls;
952   struct GNUNET_MESH_Poll msg;
953   struct MeshConnection *c;
954
955   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
956   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
957   {
958     return;
959   }
960
961   c = fc->c;
962   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
963   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%s]\n", GMC_2s (c));
964   LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   %s\n",
965        fc == &c->fwd_fc ? "FWD" : "BCK");
966
967   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
968   msg.header.size = htons (sizeof (msg));
969   msg.pid = htonl (fc->last_pid_sent);
970   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent);
971   fc->poll_msg = GMC_send_prebuilt_message (&msg.header, c, fc == &c->fwd_fc,
972                                             &poll_sent, fc);
973 }
974
975
976 /**
977  * Timeout function due to lack of keepalive/traffic from the owner.
978  * Destroys connection if called.
979  *
980  * @param cls Closure (connection to destroy).
981  * @param tc TaskContext.
982  */
983 static void
984 connection_fwd_timeout (void *cls,
985                         const struct GNUNET_SCHEDULER_TaskContext *tc)
986 {
987   struct MeshConnection *c = cls;
988
989   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
990   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
991     return;
992   LOG (GNUNET_ERROR_TYPE_DEBUG,
993               "Connection %s[%X] FWD timed out. Destroying.\n",
994               GMT_2s (c->t),
995               c->id);
996
997   if (GMC_is_origin (c, GNUNET_YES)) /* If local, leave. */
998     return;
999
1000   GMC_destroy (c);
1001 }
1002
1003
1004 /**
1005  * Timeout function due to lack of keepalive/traffic from the destination.
1006  * Destroys connection if called.
1007  *
1008  * @param cls Closure (connection to destroy).
1009  * @param tc TaskContext
1010  */
1011 static void
1012 connection_bck_timeout (void *cls,
1013                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1014 {
1015   struct MeshConnection *c = cls;
1016
1017   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1018   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1019     return;
1020
1021   LOG (GNUNET_ERROR_TYPE_DEBUG,
1022               "Connection %s[%X] FWD timed out. Destroying.\n",
1023               GMT_2s (c->t), c->id);
1024
1025   if (GMC_is_origin (c, GNUNET_NO)) /* If local, leave. */
1026     return;
1027
1028   GMC_destroy (c);
1029 }
1030
1031
1032 /**
1033  * Resets the connection timeout task, some other message has done the
1034  * task's job.
1035  * - For the first peer on the direction this means to send
1036  *   a keepalive or a path confirmation message (either create or ACK).
1037  * - For all other peers, this means to destroy the connection,
1038  *   due to lack of activity.
1039  * Starts the tiemout if no timeout was running (connection just created).
1040  *
1041  * @param c Connection whose timeout to reset.
1042  * @param fwd Is this forward?
1043  *
1044  * TODO use heap to improve efficiency of scheduler.
1045  */
1046 static void
1047 connection_reset_timeout (struct MeshConnection *c, int fwd)
1048 {
1049   GNUNET_SCHEDULER_TaskIdentifier *ti;
1050   GNUNET_SCHEDULER_Task f;
1051
1052   ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1053
1054   if (GNUNET_SCHEDULER_NO_TASK != *ti)
1055     GNUNET_SCHEDULER_cancel (*ti);
1056
1057   if (GMC_is_origin (c, fwd)) /* Startpoint */
1058   {
1059     f  = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
1060     *ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
1061   }
1062   else /* Relay, endpoint. */
1063   {
1064     struct GNUNET_TIME_Relative delay;
1065
1066     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1067     f  = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1068     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1069   }
1070 }
1071
1072
1073 /**
1074  * Add the connection to the list of both neighbors.
1075  *
1076  * @param c Connection.
1077  */
1078 static void
1079 register_neighbors (struct MeshConnection *c)
1080 {
1081   struct MeshPeer *peer;
1082
1083   peer = get_next_hop (c);
1084   if (GNUNET_NO == GMP_is_neighbor (peer))
1085   {
1086     GMC_destroy (c);
1087     return;
1088   }
1089   GMP_add_connection (peer, c);
1090   peer = get_prev_hop (c);
1091   if (GNUNET_NO == GMP_is_neighbor (peer))
1092   {
1093     GMC_destroy (c);
1094     return;
1095   }
1096   GMP_add_connection (peer, c);
1097 }
1098
1099
1100 /**
1101  * Remove the connection from the list of both neighbors.
1102  *
1103  * @param c Connection.
1104  */
1105 static void
1106 unregister_neighbors (struct MeshConnection *c)
1107 {
1108   struct MeshPeer *peer;
1109
1110   peer = get_next_hop (c);
1111   GMP_remove_connection (peer, c);
1112
1113   peer = get_prev_hop (c);
1114   GMP_remove_connection (peer, c);
1115
1116 }
1117
1118
1119 /**
1120  * Bind the connection to the peer and the tunnel to that peer.
1121  *
1122  * If the peer has no tunnel, create one. Update tunnel and connection
1123  * data structres to reflect new status.
1124  *
1125  * @param c Connection.
1126  * @param peer Peer.
1127  */
1128 static void
1129 add_to_peer (struct MeshConnection *c, struct MeshPeer *peer)
1130 {
1131   GMP_add_tunnel (peer);
1132   c->t = GMP_get_tunnel (peer);
1133   GMT_add_connection (c->t, c);
1134 }
1135
1136 /******************************************************************************/
1137 /********************************    API    ***********************************/
1138 /******************************************************************************/
1139
1140 /**
1141  * Core handler for connection creation.
1142  *
1143  * @param cls Closure (unused).
1144  * @param peer Sender (neighbor).
1145  * @param message Message.
1146  *
1147  * @return GNUNET_OK to keep the connection open,
1148  *         GNUNET_SYSERR to close it (signal serious error)
1149  */
1150 int
1151 GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
1152                    const struct GNUNET_MessageHeader *message)
1153 {
1154   struct GNUNET_MESH_ConnectionCreate *msg;
1155   struct GNUNET_PeerIdentity *id;
1156   struct GNUNET_HashCode *cid;
1157   struct MeshPeerPath *path;
1158   struct MeshPeer *dest_peer;
1159   struct MeshPeer *orig_peer;
1160   struct MeshConnection *c;
1161   unsigned int own_pos;
1162   uint16_t size;
1163   uint16_t i;
1164
1165   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1166   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a connection create msg\n");
1167
1168   /* Check size */
1169   size = ntohs (message->size);
1170   if (size < sizeof (struct GNUNET_MESH_ConnectionCreate))
1171   {
1172     GNUNET_break_op (0);
1173     return GNUNET_OK;
1174   }
1175
1176   /* Calculate hops */
1177   size -= sizeof (struct GNUNET_MESH_ConnectionCreate);
1178   if (size % sizeof (struct GNUNET_PeerIdentity))
1179   {
1180     GNUNET_break_op (0);
1181     return GNUNET_OK;
1182   }
1183   size /= sizeof (struct GNUNET_PeerIdentity);
1184   if (1 > size)
1185   {
1186     GNUNET_break_op (0);
1187     return GNUNET_OK;
1188   }
1189   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1190
1191   /* Get parameters */
1192   msg = (struct GNUNET_MESH_ConnectionCreate *) message;
1193   cid = &msg->cid;
1194   id = (struct GNUNET_PeerIdentity *) &msg[1];
1195   LOG (GNUNET_ERROR_TYPE_DEBUG,
1196               "    connection %s (%s).\n",
1197               GNUNET_h2s (cid), GNUNET_i2s (id));
1198
1199   /* Create connection */
1200   c = connection_get (cid);
1201   if (NULL == c)
1202   {
1203     /* Create path */
1204     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating path...\n");
1205     path = path_new (size);
1206     own_pos = 0;
1207     for (i = 0; i < size; i++)
1208     {
1209       LOG (GNUNET_ERROR_TYPE_DEBUG, "  ... adding %s\n",
1210                   GNUNET_i2s (&id[i]));
1211       path->peers[i] = GNUNET_PEER_intern (&id[i]);
1212       if (path->peers[i] == myid)
1213         own_pos = i;
1214     }
1215     if (own_pos == 0 && path->peers[own_pos] != myid)
1216     {
1217       /* create path: self not found in path through self */
1218       GNUNET_break_op (0);
1219       path_destroy (path);
1220       return GNUNET_OK;
1221     }
1222     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1223     GMP_add_path_to_all (path, GNUNET_NO);
1224         LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1225     c = GMC_new (cid, NULL, path_duplicate (path), own_pos);
1226     if (NULL == c)
1227       return GNUNET_OK;
1228     connection_reset_timeout (c, GNUNET_YES);
1229   }
1230   else
1231   {
1232     path = NULL;
1233   }
1234   if (MESH_CONNECTION_NEW == c->state)
1235     connection_change_state (c, MESH_CONNECTION_SENT);
1236
1237   /* Remember peers */
1238   dest_peer = GMP_get (&id[size - 1]);
1239   orig_peer = GMP_get (&id[0]);
1240
1241   /* Is it a connection to us? */
1242   if (c->own_pos == size - 1)
1243   {
1244     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
1245     GMP_add_path_to_origin (orig_peer, path, GNUNET_YES);
1246
1247     add_to_peer (c, orig_peer);
1248     if (MESH_TUNNEL3_NEW == GMT_get_state (c->t))
1249       GMT_change_state (c->t,  MESH_TUNNEL3_WAITING);
1250
1251     send_connection_ack (c, GNUNET_NO);
1252     if (MESH_CONNECTION_SENT == c->state)
1253       connection_change_state (c, MESH_CONNECTION_ACK);
1254
1255     /* Keep tunnel alive in direction dest->owner*/
1256     c->bck_maintenance_task =
1257             GNUNET_SCHEDULER_add_delayed (create_connection_time,
1258                                           &connection_bck_keepalive, c);
1259   }
1260   else
1261   {
1262     /* It's for somebody else! Retransmit. */
1263     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
1264     GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
1265     GMP_add_path_to_origin (orig_peer, path, GNUNET_NO);
1266     GMC_send_prebuilt_message (message, c, GNUNET_YES, NULL, NULL);
1267   }
1268   return GNUNET_OK;
1269 }
1270
1271
1272 /**
1273  * Core handler for path confirmations.
1274  *
1275  * @param cls closure
1276  * @param message message
1277  * @param peer peer identity this notification is about
1278  *
1279  * @return GNUNET_OK to keep the connection open,
1280  *         GNUNET_SYSERR to close it (signal serious error)
1281  */
1282 int
1283 GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
1284                     const struct GNUNET_MessageHeader *message)
1285 {
1286   struct GNUNET_MESH_ConnectionACK *msg;
1287   struct MeshConnection *c;
1288   struct MeshPeerPath *p;
1289   struct MeshPeer *pi;
1290   enum MeshConnectionState oldstate;
1291   int fwd;
1292
1293   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1294   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a connection ACK msg\n");
1295   msg = (struct GNUNET_MESH_ConnectionACK *) message;
1296   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on connection %s\n",
1297               GNUNET_h2s (&msg->cid));
1298   c = connection_get (&msg->cid);
1299   if (NULL == c)
1300   {
1301     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1302                               1, GNUNET_NO);
1303     LOG (GNUNET_ERROR_TYPE_DEBUG, "  don't know the connection!\n");
1304     return GNUNET_OK;
1305   }
1306
1307   oldstate = c->state;
1308   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
1309   pi = GMP_get (peer);
1310   if (get_next_hop (c) == pi)
1311   {
1312     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
1313     fwd = GNUNET_NO;
1314     if (MESH_CONNECTION_SENT == oldstate)
1315       connection_change_state (c, MESH_CONNECTION_ACK);
1316   }
1317   else if (get_prev_hop (c) == pi)
1318   {
1319     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK\n");
1320     fwd = GNUNET_YES;
1321     connection_change_state (c, MESH_CONNECTION_READY);
1322   }
1323   else
1324   {
1325     GNUNET_break_op (0);
1326     return GNUNET_OK;
1327   }
1328
1329   connection_reset_timeout (c, fwd);
1330
1331   /* Add path to peers? */
1332   p = c->path;
1333   if (NULL != p)
1334   {
1335     GMP_add_path_to_all (p, GNUNET_YES);
1336   }
1337   else
1338   {
1339     GNUNET_break (0);
1340   }
1341
1342   /* Message for us as creator? */
1343   if (GMC_is_origin (c, GNUNET_YES))
1344   {
1345     if (GNUNET_NO != fwd)
1346     {
1347       GNUNET_break_op (0);
1348       return GNUNET_OK;
1349     }
1350     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
1351
1352     /* If just created, cancel the short timeout and start a long one */
1353     if (MESH_CONNECTION_SENT == oldstate)
1354       connection_reset_timeout (c, GNUNET_YES);
1355
1356     /* Change tunnel state */
1357     if (MESH_TUNNEL3_WAITING == GMT_get_state (c->t))
1358       GMT_change_state (c->t, MESH_TUNNEL3_READY);
1359
1360     /* Send ACK (~TCP ACK)*/
1361     send_connection_ack (c, GNUNET_YES);
1362   }
1363
1364   /* Message for us as destination? */
1365   if (GMC_is_terminal (c, GNUNET_YES))
1366   {
1367     if (GNUNET_YES != fwd)
1368     {
1369       GNUNET_break_op (0);
1370       return GNUNET_OK;
1371     }
1372     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
1373
1374     /* If just created, cancel the short timeout and start a long one */
1375     if (MESH_CONNECTION_ACK == oldstate)
1376       connection_reset_timeout (c, GNUNET_NO);
1377
1378     /* Change tunnel state */
1379     if (MESH_TUNNEL3_WAITING == GMT_get_state (c->t))
1380       GMT_change_state (c->t, MESH_TUNNEL3_READY);
1381
1382     return GNUNET_OK;
1383   }
1384
1385   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1386   GMC_send_prebuilt_message (message, c, fwd, NULL, NULL);
1387   return GNUNET_OK;
1388 }
1389
1390
1391 /**
1392  * Core handler for notifications of broken paths
1393  *
1394  * @param cls Closure (unused).
1395  * @param id Peer identity of sending neighbor.
1396  * @param message Message.
1397  *
1398  * @return GNUNET_OK to keep the connection open,
1399  *         GNUNET_SYSERR to close it (signal serious error)
1400  */
1401 int
1402 GMC_handle_broken (void* cls,
1403                    const struct GNUNET_PeerIdentity* id,
1404                    const struct GNUNET_MessageHeader* message)
1405 {
1406   struct GNUNET_MESH_ConnectionBroken *msg;
1407   struct MeshConnection *c;
1408   int fwd;
1409
1410   LOG (GNUNET_ERROR_TYPE_DEBUG,
1411               "Received a CONNECTION BROKEN msg from %s\n", GNUNET_i2s (id));
1412   msg = (struct GNUNET_MESH_ConnectionBroken *) message;
1413   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1414               GNUNET_i2s (&msg->peer1));
1415   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1416               GNUNET_i2s (&msg->peer2));
1417   c = connection_get (&msg->cid);
1418   if (NULL == c)
1419   {
1420     GNUNET_break_op (0);
1421     return GNUNET_OK;
1422   }
1423
1424   fwd = is_fwd (c, id);
1425   connection_cancel_queues (c, !fwd);
1426   if (GMC_is_terminal (c, fwd))
1427   {
1428     if (0 < c->pending_messages)
1429       c->destroy = GNUNET_YES;
1430     else
1431       GMC_destroy (c);
1432   }
1433   else
1434   {
1435     GMC_send_prebuilt_message (message, c, fwd, NULL, NULL);
1436     c->destroy = GNUNET_YES;
1437   }
1438
1439   return GNUNET_OK;
1440
1441 }
1442
1443
1444 /**
1445  * Core handler for tunnel destruction
1446  *
1447  * @param cls Closure (unused).
1448  * @param peer Peer identity of sending neighbor.
1449  * @param message Message.
1450  *
1451  * @return GNUNET_OK to keep the connection open,
1452  *         GNUNET_SYSERR to close it (signal serious error)
1453  */
1454 int
1455 GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
1456                     const struct GNUNET_MessageHeader *message)
1457 {
1458   struct GNUNET_MESH_ConnectionDestroy *msg;
1459   struct MeshConnection *c;
1460   int fwd;
1461
1462   msg = (struct GNUNET_MESH_ConnectionDestroy *) message;
1463   LOG (GNUNET_ERROR_TYPE_DEBUG,
1464               "Got a CONNECTION DESTROY message from %s\n",
1465               GNUNET_i2s (peer));
1466   LOG (GNUNET_ERROR_TYPE_DEBUG,
1467               "  for connection %s\n",
1468               GNUNET_h2s (&msg->cid));
1469   c = connection_get (&msg->cid);
1470   if (NULL == c)
1471   {
1472     /* Probably already got the message from another path,
1473      * destroyed the tunnel and retransmitted to children.
1474      * Safe to ignore.
1475      */
1476     GNUNET_STATISTICS_update (stats, "# control on unknown tunnel",
1477                               1, GNUNET_NO);
1478     return GNUNET_OK;
1479   }
1480   fwd = is_fwd (c, peer);
1481   if (GNUNET_SYSERR == fwd)
1482   {
1483     GNUNET_break_op (0);
1484     return GNUNET_OK;
1485   }
1486   GMC_send_prebuilt_message (message, c, fwd, NULL, NULL);
1487   c->destroy = GNUNET_YES;
1488
1489   return GNUNET_OK;
1490 }
1491
1492 /**
1493  * Generic handler for mesh network encrypted traffic.
1494  *
1495  * @param peer Peer identity this notification is about.
1496  * @param msg Encrypted message.
1497  *
1498  * @return GNUNET_OK to keep the connection open,
1499  *         GNUNET_SYSERR to close it (signal serious error)
1500  */
1501 static int
1502 handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
1503                        const struct GNUNET_MESH_Encrypted *msg)
1504 {
1505   struct MeshConnection *c;
1506   struct MeshPeer *neighbor;
1507   struct MeshFlowControl *fc;
1508   GNUNET_PEER_Id peer_id;
1509   uint32_t pid;
1510   uint32_t ttl;
1511   uint16_t type;
1512   size_t size;
1513   int fwd;
1514
1515   /* Check size */
1516   size = ntohs (msg->header.size);
1517   if (size <
1518       sizeof (struct GNUNET_MESH_Encrypted) +
1519       sizeof (struct GNUNET_MessageHeader))
1520   {
1521     GNUNET_break_op (0);
1522     return GNUNET_OK;
1523   }
1524   type = ntohs (msg->header.type);
1525   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1526   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message (#%u) from %s\n",
1527        GNUNET_MESH_DEBUG_M2S (type), ntohl (msg->pid), GNUNET_i2s (peer));
1528
1529   /* Check connection */
1530   c = connection_get (&msg->cid);
1531   if (NULL == c)
1532   {
1533     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1534     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n");
1535     return GNUNET_OK;
1536   }
1537
1538   /* Check if origin is as expected */
1539   neighbor = get_prev_hop (c);
1540   peer_id = GNUNET_PEER_search (peer);
1541   if (peer_id == GMP_get_short_id (neighbor))
1542   {
1543     fwd = GNUNET_YES;
1544   }
1545   else
1546   {
1547     neighbor = get_next_hop (c);
1548     if (peer_id == GMP_get_short_id (neighbor))
1549     {
1550       fwd = GNUNET_NO;
1551     }
1552     else
1553     {
1554       /* Unexpected peer sending traffic on a connection. */
1555       GNUNET_break_op (0);
1556       return GNUNET_OK;
1557     }
1558   }
1559
1560   /* Check PID */
1561   fc = fwd ? &c->bck_fc : &c->fwd_fc;
1562   pid = ntohl (msg->pid);
1563   if (GMC_is_pid_bigger (pid, fc->last_ack_sent))
1564   {
1565     GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
1566     LOG (GNUNET_ERROR_TYPE_DEBUG,
1567                 "WARNING Received PID %u, (prev %u), ACK %u\n",
1568                 pid, fc->last_pid_recv, fc->last_ack_sent);
1569     return GNUNET_OK;
1570   }
1571   if (GNUNET_NO == GMC_is_pid_bigger (pid, fc->last_pid_recv))
1572   {
1573     GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
1574     LOG (GNUNET_ERROR_TYPE_DEBUG,
1575                 " Pid %u not expected (%u+), dropping!\n",
1576                 pid, fc->last_pid_recv + 1);
1577     return GNUNET_OK;
1578   }
1579   if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
1580     connection_change_state (c, MESH_CONNECTION_READY);
1581   connection_reset_timeout (c, fwd);
1582   fc->last_pid_recv = pid;
1583
1584   /* Is this message for us? */
1585   if (GMC_is_terminal (c, fwd))
1586   {
1587     /* TODO signature verification */
1588     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
1589     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
1590
1591     if (NULL == c->t)
1592     {
1593       GNUNET_break (0);
1594       return GNUNET_OK;
1595     }
1596     fc->last_pid_recv = pid;
1597     GMT_handle_encrypted (c->t, msg);
1598     GMC_send_ack (c, fwd, GNUNET_NO);
1599     return GNUNET_OK;
1600   }
1601
1602   /* Message not for us: forward to next hop */
1603   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1604   ttl = ntohl (msg->ttl);
1605   LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
1606   if (ttl == 0)
1607   {
1608     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
1609     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
1610     GMC_send_ack (c, fwd, GNUNET_NO);
1611     return GNUNET_OK;
1612   }
1613
1614   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
1615   GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL);
1616
1617   return GNUNET_OK;
1618 }
1619
1620 /**
1621  * Generic handler for mesh network encrypted traffic.
1622  *
1623  * @param peer Peer identity this notification is about.
1624  * @param msg Encrypted message.
1625  *
1626  * @return GNUNET_OK to keep the connection open,
1627  *         GNUNET_SYSERR to close it (signal serious error)
1628  */
1629 static int
1630 handle_mesh_kx (const struct GNUNET_PeerIdentity *peer,
1631                 const struct GNUNET_MESH_KX *msg)
1632 {
1633   struct MeshConnection *c;
1634   struct MeshPeer *neighbor;
1635   GNUNET_PEER_Id peer_id;
1636   size_t size;
1637   uint16_t type;
1638   int fwd;
1639
1640   /* Check size */
1641   size = ntohs (msg->header.size);
1642   if (size <
1643       sizeof (struct GNUNET_MESH_Encrypted) +
1644       sizeof (struct GNUNET_MessageHeader))
1645   {
1646     GNUNET_break_op (0);
1647     return GNUNET_OK;
1648   }
1649   type = ntohs (msg->header.type);
1650   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1651   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message from %s\n",
1652               GNUNET_MESH_DEBUG_M2S (type), GNUNET_i2s (peer));
1653
1654   /* Check connection */
1655   c = connection_get (&msg->cid);
1656   if (NULL == c)
1657   {
1658     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1659     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n");
1660     return GNUNET_OK;
1661   }
1662
1663   /* Check if origin is as expected */
1664   neighbor = get_prev_hop (c);
1665   peer_id = GNUNET_PEER_search (peer);
1666   if (peer_id == GMP_get_short_id (neighbor))
1667   {
1668     fwd = GNUNET_YES;
1669   }
1670   else
1671   {
1672     neighbor = get_next_hop (c);
1673     if (peer_id == GMP_get_short_id (neighbor))
1674     {
1675       fwd = GNUNET_NO;
1676     }
1677     else
1678     {
1679       /* Unexpected peer sending traffic on a connection. */
1680       GNUNET_break_op (0);
1681       return GNUNET_OK;
1682     }
1683   }
1684
1685   /* Count as connection confirmation. */
1686   if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
1687     connection_change_state (c, MESH_CONNECTION_READY);
1688   connection_reset_timeout (c, fwd);
1689   if (NULL != c->t)
1690   {
1691     if (MESH_TUNNEL3_WAITING == GMT_get_state (c->t))
1692       GMT_change_state (c->t, MESH_TUNNEL3_READY);
1693   }
1694
1695   /* Is this message for us? */
1696   if (GMC_is_terminal (c, fwd))
1697   {
1698     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
1699     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
1700     if (NULL == c->t)
1701     {
1702       GNUNET_break (0);
1703       return GNUNET_OK;
1704     }
1705     GMT_handle_kx (c->t, &msg[1].header);
1706     return GNUNET_OK;
1707   }
1708
1709   /* Message not for us: forward to next hop */
1710   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1711   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
1712   GMC_send_prebuilt_message (&msg->header, c, fwd, NULL, NULL);
1713
1714   return GNUNET_OK;
1715 }
1716
1717
1718 /**
1719  * Core handler for encrypted mesh network traffic (channel mgmt, data).
1720  *
1721  * @param cls Closure (unused).
1722  * @param message Message received.
1723  * @param peer Peer who sent the message.
1724  *
1725  * @return GNUNET_OK to keep the connection open,
1726  *         GNUNET_SYSERR to close it (signal serious error)
1727  */
1728 int
1729 GMC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
1730                       const struct GNUNET_MessageHeader *message)
1731 {
1732   return handle_mesh_encrypted (peer,
1733                                 (struct GNUNET_MESH_Encrypted *)message);
1734 }
1735
1736
1737 /**
1738  * Core handler for key exchange traffic (ephemeral key, ping, pong).
1739  *
1740  * @param cls Closure (unused).
1741  * @param message Message received.
1742  * @param peer Peer who sent the message.
1743  *
1744  * @return GNUNET_OK to keep the connection open,
1745  *         GNUNET_SYSERR to close it (signal serious error)
1746  */
1747 int
1748 GMC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
1749                const struct GNUNET_MessageHeader *message)
1750 {
1751   return handle_mesh_kx (peer,
1752                          (struct GNUNET_MESH_KX *) message);
1753 }
1754
1755
1756 /**
1757  * Core handler for mesh network traffic point-to-point acks.
1758  *
1759  * @param cls closure
1760  * @param message message
1761  * @param peer peer identity this notification is about
1762  *
1763  * @return GNUNET_OK to keep the connection open,
1764  *         GNUNET_SYSERR to close it (signal serious error)
1765  */
1766 int
1767 GMC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
1768                 const struct GNUNET_MessageHeader *message)
1769 {
1770   struct GNUNET_MESH_ACK *msg;
1771   struct MeshConnection *c;
1772   struct MeshFlowControl *fc;
1773   GNUNET_PEER_Id id;
1774   uint32_t ack;
1775   int fwd;
1776
1777   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1778   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK packet from %s!\n",
1779               GNUNET_i2s (peer));
1780   msg = (struct GNUNET_MESH_ACK *) message;
1781
1782   c = connection_get (&msg->cid);
1783
1784   if (NULL == c)
1785   {
1786     GNUNET_STATISTICS_update (stats, "# ack on unknown connection", 1,
1787                               GNUNET_NO);
1788     return GNUNET_OK;
1789   }
1790
1791   /* Is this a forward or backward ACK? */
1792   id = GNUNET_PEER_search (peer);
1793   if (GMP_get_short_id (get_next_hop (c)) == id)
1794   {
1795     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
1796     fc = &c->fwd_fc;
1797     fwd = GNUNET_YES;
1798   }
1799   else if (GMP_get_short_id (get_prev_hop (c)) == id)
1800   {
1801     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
1802     fc = &c->bck_fc;
1803     fwd = GNUNET_NO;
1804   }
1805   else
1806   {
1807     GNUNET_break_op (0);
1808     return GNUNET_OK;
1809   }
1810
1811   ack = ntohl (msg->ack);
1812   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK %u (was %u)\n",
1813               ack, fc->last_ack_recv);
1814   if (GMC_is_pid_bigger (ack, fc->last_ack_recv))
1815     fc->last_ack_recv = ack;
1816
1817   /* Cancel polling if the ACK is big enough. */
1818   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
1819       GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
1820   {
1821     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
1822     GNUNET_SCHEDULER_cancel (fc->poll_task);
1823     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1824     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
1825   }
1826
1827   connection_unlock_queue (c, fwd);
1828
1829   return GNUNET_OK;
1830 }
1831
1832
1833 /**
1834  * Core handler for mesh network traffic point-to-point ack polls.
1835  *
1836  * @param cls closure
1837  * @param message message
1838  * @param peer peer identity this notification is about
1839  *
1840  * @return GNUNET_OK to keep the connection open,
1841  *         GNUNET_SYSERR to close it (signal serious error)
1842  */
1843 int
1844 GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
1845                  const struct GNUNET_MessageHeader *message)
1846 {
1847   struct GNUNET_MESH_Poll *msg;
1848   struct MeshConnection *c;
1849   struct MeshFlowControl *fc;
1850   GNUNET_PEER_Id id;
1851   uint32_t pid;
1852   int fwd;
1853
1854   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1855   LOG (GNUNET_ERROR_TYPE_DEBUG,
1856        "Got a POLL packet from %s!\n",
1857        GNUNET_i2s (peer));
1858
1859   msg = (struct GNUNET_MESH_Poll *) message;
1860
1861   c = connection_get (&msg->cid);
1862
1863   if (NULL == c)
1864   {
1865     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
1866                               GNUNET_NO);
1867     GNUNET_break_op (0);
1868     return GNUNET_OK;
1869   }
1870
1871   /* Is this a forward or backward ACK?
1872    * Note: a poll should never be needed in a loopback case,
1873    * since there is no possiblility of packet loss there, so
1874    * this way of discerining FWD/BCK should not be a problem.
1875    */
1876   id = GNUNET_PEER_search (peer);
1877   if (GMP_get_short_id (get_next_hop (c)) == id)
1878   {
1879     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
1880     fc = &c->fwd_fc;
1881   }
1882   else if (GMP_get_short_id (get_prev_hop (c)) == id)
1883   {
1884     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
1885     fc = &c->bck_fc;
1886   }
1887   else
1888   {
1889     GNUNET_break_op (0);
1890     return GNUNET_OK;
1891   }
1892
1893   pid = ntohl (msg->pid);
1894   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
1895   fc->last_pid_recv = pid;
1896   fwd = fc == &c->bck_fc;
1897   GMC_send_ack (c, fwd, GNUNET_YES);
1898
1899   return GNUNET_OK;
1900 }
1901
1902
1903 /**
1904  * Core handler for mesh keepalives.
1905  *
1906  * @param cls closure
1907  * @param message message
1908  * @param peer peer identity this notification is about
1909  * @return GNUNET_OK to keep the connection open,
1910  *         GNUNET_SYSERR to close it (signal serious error)
1911  *
1912  * TODO: Check who we got this from, to validate route.
1913  */
1914 int
1915 GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer,
1916                       const struct GNUNET_MessageHeader *message)
1917 {
1918   struct GNUNET_MESH_ConnectionKeepAlive *msg;
1919   struct MeshConnection *c;
1920   struct MeshPeer *neighbor;
1921   int fwd;
1922
1923   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) message;
1924   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a keepalive packet from %s\n",
1925               GNUNET_i2s (peer));
1926
1927   c = connection_get (&msg->cid);
1928   if (NULL == c)
1929   {
1930     GNUNET_STATISTICS_update (stats, "# keepalive on unknown connection", 1,
1931                               GNUNET_NO);
1932     return GNUNET_OK;
1933   }
1934
1935   fwd = GNUNET_MESSAGE_TYPE_MESH_FWD_KEEPALIVE == ntohs (message->type) ?
1936         GNUNET_YES : GNUNET_NO;
1937
1938   /* Check if origin is as expected */
1939   neighbor = get_hop (c, fwd);
1940   if (GNUNET_PEER_search (peer) != GMP_get_short_id (neighbor))
1941   {
1942     GNUNET_break_op (0);
1943     return GNUNET_OK;
1944   }
1945
1946   connection_change_state (c, MESH_CONNECTION_READY);
1947   connection_reset_timeout (c, fwd);
1948
1949   if (GMC_is_terminal (c, fwd))
1950     return GNUNET_OK;
1951
1952   GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO);
1953   GMC_send_prebuilt_message (message, c, fwd, NULL, NULL);
1954
1955   return GNUNET_OK;
1956 }
1957
1958
1959 /**
1960  * Send an ACK on the appropriate connection/channel, depending on
1961  * the direction and the position of the peer.
1962  *
1963  * @param c Which connection to send the hop-by-hop ACK.
1964  * @param fwd Is this a fwd ACK? (will go dest->root).
1965  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
1966  */
1967 void
1968 GMC_send_ack (struct MeshConnection *c, int fwd, int force)
1969 {
1970   unsigned int buffer;
1971
1972   LOG (GNUNET_ERROR_TYPE_DEBUG,
1973        "GMC send %s ACK on %s\n",
1974        fwd ? "FWD" : "BCK", GMC_2s (c));
1975
1976   if (NULL == c)
1977   {
1978     GNUNET_break (0);
1979     return;
1980   }
1981
1982   /* Get available buffer space */
1983   if (GMC_is_terminal (c, fwd))
1984   {
1985     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
1986     buffer = GMT_get_channels_buffer (c->t);
1987   }
1988   else
1989   {
1990     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
1991     buffer = GMC_get_buffer (c, fwd);
1992   }
1993   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
1994   if (0 == buffer && GNUNET_NO == force)
1995     return;
1996
1997   /* Send available buffer space */
1998   if (GMC_is_origin (c, fwd))
1999   {
2000     GNUNET_assert (NULL != c->t);
2001     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2002     GMT_unchoke_channels (c->t);
2003   }
2004   else
2005   {
2006     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2007     send_ack (c, buffer, fwd, force);
2008   }
2009 }
2010
2011
2012 /**
2013  * Initialize the connections subsystem
2014  *
2015  * @param c Configuration handle.
2016  */
2017 void
2018 GMC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2019 {
2020   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2021   if (GNUNET_OK !=
2022       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_MSGS_QUEUE",
2023                                              &max_msgs_queue))
2024   {
2025     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2026                                "MESH", "MAX_MSGS_QUEUE", "MISSING");
2027     GNUNET_SCHEDULER_shutdown ();
2028     return;
2029   }
2030
2031   if (GNUNET_OK !=
2032       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_CONNECTIONS",
2033                                              &max_connections))
2034   {
2035     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2036                                "MESH", "MAX_CONNECTIONS", "MISSING");
2037     GNUNET_SCHEDULER_shutdown ();
2038     return;
2039   }
2040
2041   if (GNUNET_OK !=
2042       GNUNET_CONFIGURATION_get_value_time (c, "MESH", "REFRESH_CONNECTION_TIME",
2043                                            &refresh_connection_time))
2044   {
2045     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2046                                "MESH", "REFRESH_CONNECTION_TIME", "MISSING");
2047     GNUNET_SCHEDULER_shutdown ();
2048     return;
2049   }
2050   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2051   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES);
2052 }
2053
2054 /**
2055  * Shut down the connections subsystem.
2056  */
2057 void
2058 GMC_shutdown (void)
2059 {
2060   GNUNET_CONTAINER_multihashmap_destroy (connections);
2061 }
2062
2063
2064 struct MeshConnection *
2065 GMC_new (const struct GNUNET_HashCode *cid,
2066          struct MeshTunnel3 *t,
2067          struct MeshPeerPath *p,
2068          unsigned int own_pos)
2069 {
2070   struct MeshConnection *c;
2071
2072   c = GNUNET_new (struct MeshConnection);
2073   c->id = *cid;
2074   GNUNET_CONTAINER_multihashmap_put (connections, &c->id, c,
2075                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2076   fc_init (&c->fwd_fc);
2077   fc_init (&c->bck_fc);
2078   c->fwd_fc.c = c;
2079   c->bck_fc.c = c;
2080
2081   c->t = t;
2082   if (own_pos > p->length - 1)
2083   {
2084     GNUNET_break (0);
2085     GMC_destroy (c);
2086     return NULL;
2087   }
2088   c->own_pos = own_pos;
2089   c->path = p;
2090
2091   if (0 == own_pos)
2092   {
2093     c->fwd_maintenance_task =
2094             GNUNET_SCHEDULER_add_delayed (create_connection_time,
2095                                           &connection_fwd_keepalive, c);
2096   }
2097   register_neighbors (c);
2098   return c;
2099 }
2100
2101
2102 void
2103 GMC_destroy (struct MeshConnection *c)
2104 {
2105   if (NULL == c)
2106     return;
2107
2108   if (2 == c->destroy) /* cancel queues -> GMP_queue_cancel -> q_destroy -> */
2109     return;            /* -> message_sent -> GMC_destroy. Don't loop. */
2110   c->destroy = 2;
2111
2112   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GMC_2s (c));
2113
2114   /* Cancel all traffic */
2115   connection_cancel_queues (c, GNUNET_YES);
2116   connection_cancel_queues (c, GNUNET_NO);
2117
2118   /* Cancel maintainance task (keepalive/timeout) */
2119   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
2120     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
2121   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
2122     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
2123
2124   /* Unregister from neighbors */
2125   unregister_neighbors (c);
2126
2127   /* Delete */
2128   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
2129   if (NULL != c->t)
2130     GMT_remove_connection (c->t, c);
2131
2132   if (GNUNET_NO == GMC_is_origin (c, GNUNET_YES))
2133     path_destroy (c->path);
2134
2135   GNUNET_CONTAINER_multihashmap_remove (connections, &c->id, c);
2136
2137   GNUNET_free (c);
2138 }
2139
2140 /**
2141  * Get the connection ID.
2142  *
2143  * @param c Connection to get the ID from.
2144  *
2145  * @return ID of the connection.
2146  */
2147 const struct GNUNET_HashCode *
2148 GMC_get_id (const struct MeshConnection *c)
2149 {
2150   return &c->id;
2151 }
2152
2153
2154 /**
2155  * Get the connection path.
2156  *
2157  * @param c Connection to get the path from.
2158  *
2159  * @return path used by the connection.
2160  */
2161 const struct MeshPeerPath *
2162 GMC_get_path (const struct MeshConnection *c)
2163 {
2164   return c->path;
2165 }
2166
2167
2168 /**
2169  * Get the connection state.
2170  *
2171  * @param c Connection to get the state from.
2172  *
2173  * @return state of the connection.
2174  */
2175 enum MeshConnectionState
2176 GMC_get_state (const struct MeshConnection *c)
2177 {
2178   return c->state;
2179 }
2180
2181 /**
2182  * Get the connection tunnel.
2183  *
2184  * @param c Connection to get the tunnel from.
2185  *
2186  * @return tunnel of the connection.
2187  */
2188 struct MeshTunnel3 *
2189 GMC_get_tunnel (const struct MeshConnection *c)
2190 {
2191   return c->t;
2192 }
2193
2194
2195 /**
2196  * Get free buffer space in a connection.
2197  *
2198  * @param c Connection.
2199  * @param fwd Is query about FWD traffic?
2200  *
2201  * @return Free buffer space [0 - max_msgs_queue/max_connections]
2202  */
2203 unsigned int
2204 GMC_get_buffer (struct MeshConnection *c, int fwd)
2205 {
2206   struct MeshFlowControl *fc;
2207
2208   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2209
2210   return (fc->queue_max - fc->queue_n);
2211 }
2212
2213 /**
2214  * Get how many messages have we allowed to send to us from a direction.
2215  *
2216  * @param c Connection.
2217  * @param fwd Are we asking about traffic from FWD (BCK messages)?
2218  *
2219  * @return last_ack_sent - last_pid_recv
2220  */
2221 unsigned int
2222 GMC_get_allowed (struct MeshConnection *c, int fwd)
2223 {
2224   struct MeshFlowControl *fc;
2225
2226   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2227   if (GMC_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
2228   {
2229     return 0;
2230   }
2231   return (fc->last_ack_sent - fc->last_pid_recv);
2232 }
2233
2234 /**
2235  * Get messages queued in a connection.
2236  *
2237  * @param c Connection.
2238  * @param fwd Is query about FWD traffic?
2239  *
2240  * @return Number of messages queued.
2241  */
2242 unsigned int
2243 GMC_get_qn (struct MeshConnection *c, int fwd)
2244 {
2245   struct MeshFlowControl *fc;
2246
2247   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2248
2249   return fc->queue_n;
2250 }
2251
2252
2253 /**
2254  * Allow the connection to advertise a buffer of the given size.
2255  *
2256  * The connection will send an @c fwd ACK message (so: in direction !fwd)
2257  * allowing up to last_pid_recv + buffer.
2258  *
2259  * @param c Connection.
2260  * @param buffer How many more messages the connection can accept.
2261  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
2262  */
2263 void
2264 GMC_allow (struct MeshConnection *c, unsigned int buffer, int fwd)
2265 {
2266   send_ack (c, buffer, fwd, GNUNET_NO);
2267 }
2268
2269
2270 /**
2271  * Notify other peers on a connection of a broken link. Mark connections
2272  * to destroy after all traffic has been sent.
2273  *
2274  * @param c Connection on which there has been a disconnection.
2275  * @param peer Peer that disconnected.
2276  */
2277 void
2278 GMC_notify_broken (struct MeshConnection *c,
2279                    struct MeshPeer *peer)
2280 {
2281   int fwd;
2282
2283   fwd = peer == get_prev_hop (c);
2284
2285   if (GNUNET_YES == GMC_is_terminal (c, fwd))
2286   {
2287     /* Local shutdown, no one to notify about this. */
2288     GMC_destroy (c);
2289     return;
2290   }
2291   if (GNUNET_NO == c->destroy)
2292     send_broken (c, &my_full_id, GMP_get_id (peer), fwd);
2293
2294   /* Connection will have at least one pending message
2295    * (the one we just scheduled), so no point in checking whether to
2296    * destroy immediately. */
2297   c->destroy = GNUNET_YES;
2298
2299   /**
2300    * Cancel all queues, if no message is left, connection will be destroyed.
2301    */
2302   connection_cancel_queues (c, !fwd);
2303
2304   return;
2305 }
2306
2307
2308 /**
2309  * Is this peer the first one on the connection?
2310  *
2311  * @param c Connection.
2312  * @param fwd Is this about fwd traffic?
2313  *
2314  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
2315  */
2316 int
2317 GMC_is_origin (struct MeshConnection *c, int fwd)
2318 {
2319   if (!fwd && c->path->length - 1 == c->own_pos )
2320     return GNUNET_YES;
2321   if (fwd && 0 == c->own_pos)
2322     return GNUNET_YES;
2323   return GNUNET_NO;
2324 }
2325
2326
2327 /**
2328  * Is this peer the last one on the connection?
2329  *
2330  * @param c Connection.
2331  * @param fwd Is this about fwd traffic?
2332  *            Note that the ROOT is the terminal for BCK traffic!
2333  *
2334  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
2335  */
2336 int
2337 GMC_is_terminal (struct MeshConnection *c, int fwd)
2338 {
2339   return GMC_is_origin (c, !fwd);
2340 }
2341
2342
2343 /**
2344  * See if we are allowed to send by the next hop in the given direction.
2345  *
2346  * @param c Connection.
2347  * @param fwd Is this about fwd traffic?
2348  *
2349  * @return #GNUNET_YES in case it's OK to send.
2350  */
2351 int
2352 GMC_is_sendable (struct MeshConnection *c, int fwd)
2353 {
2354   struct MeshFlowControl *fc;
2355
2356   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2357   if (GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2358     return GNUNET_YES;
2359   return GNUNET_NO;
2360 }
2361
2362 /**
2363  * Sends an already built message on a connection, properly registering
2364  * all used resources.
2365  *
2366  * @param message Message to send. Function makes a copy of it.
2367  *                If message is not hop-by-hop, decrements TTL of copy.
2368  * @param c Connection on which this message is transmitted.
2369  * @param fwd Is this a fwd message?
2370  * @param cont Continuation called once message is sent. Can be NULL.
2371  * @param cont_cls Closure for @c cont.
2372  *
2373  * @return Handle to cancel the message before it's sent. NULL on error.
2374  *         Invalid on @c cont call.
2375  */
2376 struct MeshConnectionQueue *
2377 GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2378                            struct MeshConnection *c, int fwd,
2379                            GMC_sent cont, void *cont_cls)
2380 {
2381   struct MeshFlowControl *fc;
2382   struct MeshConnectionQueue *q;
2383   void *data;
2384   size_t size;
2385   uint16_t type;
2386   int droppable;
2387
2388   size = ntohs (message->size);
2389   data = GNUNET_malloc (size);
2390   memcpy (data, message, size);
2391   type = ntohs (message->type);
2392   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u bytes) on connection %s\n",
2393               GNUNET_MESH_DEBUG_M2S (type), size, GMC_2s (c));
2394
2395   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2396   droppable = GNUNET_YES;
2397   switch (type)
2398   {
2399     struct GNUNET_MESH_Encrypted *emsg;
2400     struct GNUNET_MESH_KX        *kmsg;
2401     struct GNUNET_MESH_ACK       *amsg;
2402     struct GNUNET_MESH_Poll      *pmsg;
2403     struct GNUNET_MESH_ConnectionDestroy *dmsg;
2404     struct GNUNET_MESH_ConnectionBroken  *bmsg;
2405     uint32_t ttl;
2406
2407     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
2408       emsg = (struct GNUNET_MESH_Encrypted *) data;
2409       ttl = ntohl (emsg->ttl);
2410       if (0 == ttl)
2411       {
2412         GNUNET_break_op (0);
2413         GNUNET_free (data);
2414         return NULL;
2415       }
2416       emsg->cid = c->id;
2417       emsg->ttl = htonl (ttl - 1);
2418       emsg->pid = htonl (fc->next_pid++);
2419       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
2420       fc->queue_n++;
2421       LOG (GNUNET_ERROR_TYPE_DEBUG, "pid %u\n", ntohl (emsg->pid));
2422       LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
2423       LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
2424       if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
2425       {
2426         GMC_start_poll (c, fwd);
2427       }
2428       break;
2429
2430     case GNUNET_MESSAGE_TYPE_MESH_KX:
2431       kmsg = (struct GNUNET_MESH_KX *) data;
2432       kmsg->cid = c->id;
2433       break;
2434
2435     case GNUNET_MESSAGE_TYPE_MESH_ACK:
2436       amsg = (struct GNUNET_MESH_ACK *) data;
2437       amsg->cid = c->id;
2438       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
2439       droppable = GNUNET_NO;
2440       break;
2441
2442     case GNUNET_MESSAGE_TYPE_MESH_POLL:
2443       pmsg = (struct GNUNET_MESH_Poll *) data;
2444       pmsg->cid = c->id;
2445       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
2446       droppable = GNUNET_NO;
2447       break;
2448
2449     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY:
2450       dmsg = (struct GNUNET_MESH_ConnectionDestroy *) data;
2451       dmsg->cid = c->id;
2452       dmsg->reserved = 0;
2453       break;
2454
2455     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
2456       bmsg = (struct GNUNET_MESH_ConnectionBroken *) data;
2457       bmsg->cid = c->id;
2458       bmsg->reserved = 0;
2459       break;
2460
2461     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
2462     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
2463       break;
2464
2465     default:
2466       GNUNET_break (0);
2467   }
2468
2469   if (fc->queue_n > fc->queue_max && droppable)
2470   {
2471     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
2472                               1, GNUNET_NO);
2473     GNUNET_break (0);
2474     LOG (GNUNET_ERROR_TYPE_DEBUG,
2475                 "queue full: %u/%u\n",
2476                 fc->queue_n, fc->queue_max);
2477     if (GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED == type)
2478     {
2479       fc->queue_n--;
2480       fc->next_pid--;
2481     }
2482     GNUNET_free (data);
2483     return NULL; /* Drop this message */
2484   }
2485
2486   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u\n", c, c->pending_messages);
2487   c->pending_messages++;
2488
2489   q = GNUNET_new (struct MeshConnectionQueue);
2490   q->q = GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd,
2491                         &message_sent, q);
2492   if (NULL == q->q)
2493   {
2494     GNUNET_break (0);
2495     GNUNET_free (data);
2496     GNUNET_free (q);
2497     return NULL;
2498   }
2499   q->cont = cont;
2500   q->cont_cls = cont_cls;
2501   return q;
2502 }
2503
2504
2505 /**
2506  * Cancel a previously sent message while it's in the queue.
2507  *
2508  * ONLY can be called before the continuation given to the send function
2509  * is called. Once the continuation is called, the message is no longer in the
2510  * queue.
2511  *
2512  * If the send function was given no continuation, GMC_cancel should
2513  * NOT be called, since it's not possible to determine if the message has
2514  * already been sent.
2515  *
2516  * @param q Handle to the queue.
2517  */
2518 void
2519 GMC_cancel (struct MeshConnectionQueue *q)
2520 {
2521   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GMC cancel message\n");
2522
2523   /* queue destroy calls message_sent, which calls q->cont and frees q */
2524   GMP_queue_destroy (q->q, GNUNET_YES);
2525 }
2526
2527
2528 /**
2529  * Sends a CREATE CONNECTION message for a path to a peer.
2530  * Changes the connection and tunnel states if necessary.
2531  *
2532  * @param connection Connection to create.
2533  */
2534 void
2535 GMC_send_create (struct MeshConnection *connection)
2536 {
2537   enum MeshTunnel3State state;
2538   size_t size;
2539
2540   size = sizeof (struct GNUNET_MESH_ConnectionCreate);
2541   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
2542   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n");
2543   GMP_queue_add (get_next_hop (connection), NULL,
2544                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE,
2545                  size, connection, GNUNET_YES, &message_sent, NULL);
2546   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
2547        connection, connection->pending_messages);
2548   connection->pending_messages++;
2549   state = GMT_get_state (connection->t);
2550   if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state)
2551     GMT_change_state (connection->t, MESH_TUNNEL3_WAITING);
2552   if (MESH_CONNECTION_NEW == connection->state)
2553     connection_change_state (connection, MESH_CONNECTION_SENT);
2554 }
2555
2556
2557 /**
2558  * Send a message to all peers in this connection that the connection
2559  * is no longer valid.
2560  *
2561  * If some peer should not receive the message, it should be zero'ed out
2562  * before calling this function.
2563  *
2564  * @param c The connection whose peers to notify.
2565  */
2566 void
2567 GMC_send_destroy (struct MeshConnection *c)
2568 {
2569   struct GNUNET_MESH_ConnectionDestroy msg;
2570
2571   if (GNUNET_YES == c->destroy)
2572     return;
2573
2574   msg.header.size = htons (sizeof (msg));
2575   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY);;
2576   msg.cid = c->id;
2577   LOG (GNUNET_ERROR_TYPE_DEBUG,
2578               "  sending connection destroy for connection %s\n",
2579               GMC_2s (c));
2580
2581   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_YES))
2582     GMC_send_prebuilt_message (&msg.header, c, GNUNET_YES, NULL, NULL);
2583   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO))
2584     GMC_send_prebuilt_message (&msg.header, c, GNUNET_NO, NULL, NULL);
2585   c->destroy = GNUNET_YES;
2586 }
2587
2588
2589 /**
2590  * @brief Start a polling timer for the connection.
2591  *
2592  * When a neighbor does not accept more traffic on the connection it could be
2593  * caused by a simple congestion or by a lost ACK. Polling enables to check
2594  * for the lastest ACK status for a connection.
2595  *
2596  * @param c Connection.
2597  * @param fwd Should we poll in the FWD direction?
2598  */
2599 void
2600 GMC_start_poll (struct MeshConnection *c, int fwd)
2601 {
2602   struct MeshFlowControl *fc;
2603
2604   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2605   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task && NULL != fc->poll_msg)
2606   {
2607     return;
2608   }
2609   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
2610                                                 &connection_poll,
2611                                                 fc);
2612 }
2613
2614
2615 /**
2616  * @brief Stop polling a connection for ACKs.
2617  *
2618  * Once we have enough ACKs for future traffic, polls are no longer necessary.
2619  *
2620  * @param c Connection.
2621  * @param fwd Should we stop the poll in the FWD direction?
2622  */
2623 void
2624 GMC_stop_poll (struct MeshConnection *c, int fwd)
2625 {
2626   struct MeshFlowControl *fc;
2627
2628   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2629   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
2630   {
2631     GNUNET_SCHEDULER_cancel (fc->poll_task);
2632     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2633   }
2634 }
2635
2636 /**
2637  * Get a (static) string for a connection.
2638  *
2639  * @param c Connection.
2640  */
2641 const char *
2642 GMC_2s (struct MeshConnection *c)
2643 {
2644   if (NULL != c->t)
2645   {
2646     static char buf[128];
2647
2648     sprintf (buf, "%s (->%s)", GNUNET_h2s (&c->id), GMT_2s (c->t));
2649     return buf;
2650   }
2651   return GNUNET_h2s (&c->id);
2652 }