- refactor fwd/bck connection keepalive
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh_connection.c
1 /*
2      This file is part of GNUnet.
3      (C) 2001-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file mesh/gnunet-service-mesh_connection.c
23  * @brief GNUnet MESH service connection handling
24  * @author Bartlomiej Polot
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30 #include "gnunet_statistics_service.h"
31
32 #include "mesh_path.h"
33 #include "mesh_protocol.h"
34 #include "mesh.h"
35 #include "gnunet-service-mesh_connection.h"
36 #include "gnunet-service-mesh_peer.h"
37 #include "gnunet-service-mesh_tunnel.h"
38
39
40 #define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__)
41
42 #define MESH_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
43                                   GNUNET_TIME_UNIT_MINUTES,\
44                                   10)
45 #define AVG_MSGS                32
46
47
48 /******************************************************************************/
49 /********************************   STRUCTS  **********************************/
50 /******************************************************************************/
51
52 /**
53  * Struct to encapsulate all the Flow Control information to a peer to which
54  * we are directly connected (on a core level).
55  */
56 struct MeshFlowControl
57 {
58   /**
59    * Connection this controls.
60    */
61   struct MeshConnection *c;
62
63   /**
64    * How many messages are in the queue on this connection.
65    */
66   unsigned int queue_n;
67
68   /**
69    * How many messages do we accept in the queue.
70    */
71   unsigned int queue_max;
72
73   /**
74    * Next ID to use.
75    */
76   uint32_t next_pid;
77
78   /**
79    * ID of the last packet sent towards the peer.
80    */
81   uint32_t last_pid_sent;
82
83   /**
84    * ID of the last packet received from the peer.
85    */
86   uint32_t last_pid_recv;
87
88   /**
89    * Last ACK sent to the peer (peer can't send more than this PID).
90    */
91   uint32_t last_ack_sent;
92
93   /**
94    * Last ACK sent towards the origin (for traffic towards leaf node).
95    */
96   uint32_t last_ack_recv;
97
98   /**
99    * Task to poll the peer in case of a lost ACK causes stall.
100    */
101   GNUNET_SCHEDULER_TaskIdentifier poll_task;
102
103   /**
104    * How frequently to poll for ACKs.
105    */
106   struct GNUNET_TIME_Relative poll_time;
107
108   /**
109    * Queued poll message, to cancel if not necessary anymore (got ACK).
110    */
111   struct MeshConnectionQueue *poll_msg;
112
113   /**
114    * Queued poll message, to cancel if not necessary anymore (got ACK).
115    */
116   struct MeshConnectionQueue *ack_msg;
117 };
118
119 /**
120  * Keep a record of the last messages sent on this connection.
121  */
122 struct MeshConnectionPerformance
123 {
124   /**
125    * Circular buffer for storing measurements.
126    */
127   double usecsperbyte[AVG_MSGS];
128
129   /**
130    * Running average of @c usecsperbyte.
131    */
132   double avg;
133
134   /**
135    * How many values of @c usecsperbyte are valid.
136    */
137   uint16_t size;
138
139   /**
140    * Index of the next "free" position in @c usecsperbyte.
141    */
142   uint16_t idx;
143 };
144
145
146 /**
147  * Struct containing all information regarding a connection to a peer.
148  */
149 struct MeshConnection
150 {
151   /**
152    * Tunnel this connection is part of.
153    */
154   struct MeshTunnel3 *t;
155
156   /**
157    * Flow control information for traffic fwd.
158    */
159   struct MeshFlowControl fwd_fc;
160
161   /**
162    * Flow control information for traffic bck.
163    */
164   struct MeshFlowControl bck_fc;
165
166   /**
167    * Measure connection performance on the endpoint.
168    */
169   struct MeshConnectionPerformance *perf;
170
171   /**
172    * ID of the connection.
173    */
174   struct GNUNET_HashCode id;
175
176   /**
177    * State of the connection.
178    */
179   enum MeshConnectionState state;
180
181   /**
182    * Path being used for the tunnel. At the origin of the connection
183    * it's a pointer to the destination's path pool, otherwise just a copy.
184    */
185   struct MeshPeerPath *path;
186
187   /**
188    * Position of the local peer in the path.
189    */
190   unsigned int own_pos;
191
192   /**
193    * Task to keep the used paths alive at the owner,
194    * time tunnel out on all the other peers.
195    */
196   GNUNET_SCHEDULER_TaskIdentifier fwd_maintenance_task;
197
198   /**
199    * Task to keep the used paths alive at the destination,
200    * time tunnel out on all the other peers.
201    */
202   GNUNET_SCHEDULER_TaskIdentifier bck_maintenance_task;
203
204   /**
205    * Queue handle for maintainance traffic. One handle for FWD and BCK since
206    * one peer never needs to maintain both directions (no loopback connections).
207    */
208   struct MeshPeerQueue *maintenance_q;
209
210   /**
211    * Counter to do exponential backoff when creating a connection (max 64).
212    */
213   unsigned short create_retry;
214
215   /**
216    * Pending message count.
217    */
218   int pending_messages;
219
220   /**
221    * Destroy flag: if true, destroy on last message.
222    */
223   int destroy;
224 };
225
226 /**
227  * Handle for messages queued but not yet sent.
228  */
229 struct MeshConnectionQueue
230 {
231   /**
232    * Peer queue handle, to cancel if necessary.
233    */
234   struct MeshPeerQueue *q;
235
236   /**
237    * Was this a forced message? (Do not account for it)
238    */
239   int forced;
240
241   /**
242    * Continuation to call once sent.
243    */
244   GMC_sent cont;
245
246   /**
247    * Closure for @c cont.
248    */
249   void *cont_cls;
250 };
251
252 /******************************************************************************/
253 /*******************************   GLOBALS  ***********************************/
254 /******************************************************************************/
255
256 /**
257  * Global handle to the statistics service.
258  */
259 extern struct GNUNET_STATISTICS_Handle *stats;
260
261 /**
262  * Local peer own ID (memory efficient handle).
263  */
264 extern GNUNET_PEER_Id myid;
265
266 /**
267  * Local peer own ID (full value).
268  */
269 extern struct GNUNET_PeerIdentity my_full_id;
270
271 /**
272  * Connections known, indexed by cid (MeshConnection).
273  */
274 static struct GNUNET_CONTAINER_MultiHashMap *connections;
275
276 /**
277  * How many connections are we willing to maintain.
278  * Local connections are always allowed, even if there are more connections than max.
279  */
280 static unsigned long long max_connections;
281
282 /**
283  * How many messages *in total* are we willing to queue, divide by number of
284  * connections to get connection queue size.
285  */
286 static unsigned long long max_msgs_queue;
287
288 /**
289  * How often to send path keepalives. Paths timeout after 4 missed.
290  */
291 static struct GNUNET_TIME_Relative refresh_connection_time;
292
293 /**
294  * How often to send path create / ACKs.
295  */
296 static struct GNUNET_TIME_Relative create_connection_time;
297
298
299 /******************************************************************************/
300 /********************************   STATIC  ***********************************/
301 /******************************************************************************/
302
303 #if 0 // avoid compiler warning for unused static function
304 static void
305 fc_debug (struct MeshFlowControl *fc)
306 {
307   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
308               fc->last_pid_recv, fc->last_ack_sent);
309   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
310               fc->last_pid_sent, fc->last_ack_recv);
311   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
312               fc->queue_n, fc->queue_max);
313 }
314
315 static void
316 connection_debug (struct MeshConnection *c)
317 {
318   if (NULL == c)
319   {
320     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CONNECTION ***\n");
321     return;
322   }
323   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
324               peer2s (c->t->peer), GMC_2s (c));
325   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
326               c->state, c->pending_messages);
327   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
328   fc_debug (&c->fwd_fc);
329   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
330   fc_debug (&c->bck_fc);
331 }
332 #endif
333
334
335 /**
336  * Schedule next keepalive task, taking in consideration
337  * the connection state and number of retries.
338  *
339  * @param c Connection for which to schedule the next keepalive.
340  * @param fwd Direction for the next keepalive.
341  */
342 static void
343 schedule_next_keepalive (struct MeshConnection *c, int fwd);
344
345
346 /**
347  * Get string description for tunnel state.
348  *
349  * @param s Tunnel state.
350  *
351  * @return String representation.
352  */
353 static const char *
354 GMC_state2s (enum MeshConnectionState s)
355 {
356   switch (s)
357   {
358     case MESH_CONNECTION_NEW:
359       return "MESH_CONNECTION_NEW";
360     case MESH_CONNECTION_SENT:
361       return "MESH_CONNECTION_SENT";
362     case MESH_CONNECTION_ACK:
363       return "MESH_CONNECTION_ACK";
364     case MESH_CONNECTION_READY:
365       return "MESH_CONNECTION_READY";
366     case MESH_CONNECTION_DESTROYED:
367       return "MESH_CONNECTION_DESTROYED";
368     default:
369       return "MESH_CONNECTION_STATE_ERROR";
370   }
371 }
372
373
374 /**
375  * Initialize a Flow Control structure to the initial state.
376  *
377  * @param fc Flow Control structure to initialize.
378  */
379 static void
380 fc_init (struct MeshFlowControl *fc)
381 {
382   fc->next_pid = 0;
383   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
384   fc->last_pid_recv = (uint32_t) -1;
385   fc->last_ack_sent = (uint32_t) 0;
386   fc->last_ack_recv = (uint32_t) 0;
387   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
388   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
389   fc->queue_n = 0;
390   fc->queue_max = (max_msgs_queue / max_connections) + 1;
391 }
392
393
394 /**
395  * Find a connection.
396  *
397  * @param cid Connection ID.
398  */
399 static struct MeshConnection *
400 connection_get (const struct GNUNET_HashCode *cid)
401 {
402   return GNUNET_CONTAINER_multihashmap_get (connections, cid);
403 }
404
405
406 static void
407 connection_change_state (struct MeshConnection* c,
408                          enum MeshConnectionState state)
409 {
410   LOG (GNUNET_ERROR_TYPE_DEBUG,
411               "Connection %s state was %s\n",
412               GMC_2s (c), GMC_state2s (c->state));
413   if (MESH_CONNECTION_DESTROYED == c->state)
414   {
415     LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
416     return;
417   }
418   LOG (GNUNET_ERROR_TYPE_DEBUG,
419               "Connection %s state is now %s\n",
420               GMC_2s (c), GMC_state2s (state));
421   c->state = state;
422   if (MESH_CONNECTION_READY == state)
423     c->create_retry = 1;
424 }
425
426
427 /**
428  * Callback called when a queued ACK message is sent.
429  *
430  * @param cls Closure (FC).
431  * @param c Connection this message was on.
432  * @param q Queue handler this call invalidates.
433  * @param type Type of message sent.
434  * @param fwd Was this a FWD going message?
435  * @param size Size of the message.
436  */
437 static void
438 ack_sent (void *cls,
439           struct MeshConnection *c,
440           struct MeshConnectionQueue *q,
441           uint16_t type, int fwd, size_t size)
442 {
443   struct MeshFlowControl *fc = cls;
444
445   fc->ack_msg = NULL;
446 }
447
448
449 /**
450  * Send an ACK on the connection, informing the predecessor about
451  * the available buffer space. Should not be called in case the peer
452  * is origin (no predecessor) in the @c fwd direction.
453  *
454  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
455  * the ACK itself goes "back" (dest->root).
456  *
457  * @param c Connection on which to send the ACK.
458  * @param buffer How much space free to advertise?
459  * @param fwd Is this FWD ACK? (Going dest -> root)
460  * @param force Don't optimize out.
461  */
462 static void
463 send_ack (struct MeshConnection *c, unsigned int buffer, int fwd, int force)
464 {
465   struct MeshFlowControl *next_fc;
466   struct MeshFlowControl *prev_fc;
467   struct GNUNET_MESH_ACK msg;
468   uint32_t ack;
469   int delta;
470
471   /* If origin, there is no connection to send ACKs. Wrong function! */
472   if (GMC_is_origin (c, fwd))
473   {
474     LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s is origin in %s\n",
475          GMC_2s (c), GM_f2s (fwd));
476     GNUNET_break (0);
477     return;
478   }
479
480   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
481   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
482
483   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection send %s ack on %s\n",
484        GM_f2s (fwd), GMC_2s (c));
485
486   /* Check if we need to transmit the ACK. */
487   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
488   if (3 < delta && buffer < delta && GNUNET_NO == force)
489   {
490     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
491     LOG (GNUNET_ERROR_TYPE_DEBUG,
492          "  last pid recv: %u, last ack sent: %u\n",
493          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
494     return;
495   }
496
497   /* Ok, ACK might be necessary, what PID to ACK? */
498   ack = prev_fc->last_pid_recv + buffer;
499   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
500   LOG (GNUNET_ERROR_TYPE_DEBUG,
501        " last pid %u, last ack %u, qmax %u, q %u\n",
502        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
503        next_fc->queue_max, next_fc->queue_n);
504   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
505   {
506     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
507     return;
508   }
509
510   /* Check if message is already in queue */
511   if (NULL != prev_fc->ack_msg)
512   {
513     if (GM_is_pid_bigger (ack, prev_fc->last_ack_sent))
514     {
515       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
516       GMC_cancel (prev_fc->ack_msg);
517       /* GMC_cancel triggers ack_sent(), which clears fc->ack_msg */
518     }
519     else
520     {
521       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
522       return;
523     }
524   }
525
526   prev_fc->last_ack_sent = ack;
527
528   /* Build ACK message and send on connection */
529   msg.header.size = htons (sizeof (msg));
530   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
531   msg.ack = htonl (ack);
532   msg.cid = c->id;
533
534   prev_fc->ack_msg = GMC_send_prebuilt_message (&msg.header, c,
535                                                 !fwd, GNUNET_YES,
536                                                 &ack_sent, prev_fc);
537 }
538
539
540 /**
541  * Callback called when a queued message is sent.
542  *
543  * Calculates the average time and connection packet tracking.
544  *
545  * @param cls Closure (ConnectionQueue Handle).
546  * @param c Connection this message was on.
547  * @param type Type of message sent.
548  * @param fwd Was this a FWD going message?
549  * @param size Size of the message.
550  * @param wait Time spent waiting for core (only the time for THIS message)
551  */
552 static void
553 message_sent (void *cls,
554               struct MeshConnection *c, uint16_t type,
555               int fwd, size_t size,
556               struct GNUNET_TIME_Relative wait)
557 {
558   struct MeshConnectionPerformance *p;
559   struct MeshFlowControl *fc;
560   struct MeshConnectionQueue *q = cls;
561   double usecsperbyte;
562   int forced;
563
564   fc = fwd ? &c->fwd_fc : &c->bck_fc;
565   LOG (GNUNET_ERROR_TYPE_DEBUG,
566        "!  sent %s %s\n",
567        GM_f2s (fwd),
568        GM_m2s (type));
569   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  C_P- %p %u\n", c, c->pending_messages);
570   if (NULL != q)
571   {
572     forced = q->forced;
573     if (NULL != q->cont)
574     {
575       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  calling cont\n");
576       q->cont (q->cont_cls, c, q, type, fwd, size);
577     }
578     GNUNET_free (q);
579   }
580   else if (type == GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED)
581   {
582     /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
583     forced = GNUNET_YES;
584   }
585   else
586   {
587     forced = GNUNET_NO;
588   }
589   c->pending_messages--;
590   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
591   {
592     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
593     GMC_destroy (c);
594     return;
595   }
596   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
597   switch (type)
598   {
599     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
600     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
601       c->maintenance_q = NULL;
602       /* Don't trigger a keepalive for sent ACKs, only SYN and SYNACKs */
603       if (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE == type || !fwd)
604         schedule_next_keepalive (c, fwd);
605       break;
606
607     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
608       fc->last_pid_sent++;
609       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
610       if (GNUNET_NO == forced)
611       {
612         fc->queue_n--;
613         LOG (GNUNET_ERROR_TYPE_DEBUG,
614             "!   accounting pid %u\n",
615             fc->last_pid_sent);
616       }
617       else
618       {
619         LOG (GNUNET_ERROR_TYPE_DEBUG,
620              "!   forced, Q_N not accounting pid %u\n",
621              fc->last_pid_sent);
622       }
623       GMC_send_ack (c, fwd, GNUNET_NO);
624       break;
625
626     case GNUNET_MESSAGE_TYPE_MESH_POLL:
627       fc->poll_msg = NULL;
628       break;
629
630     case GNUNET_MESSAGE_TYPE_MESH_ACK:
631       fc->ack_msg = NULL;
632       break;
633
634     default:
635       break;
636   }
637   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
638
639   if (NULL == c->perf)
640     return; /* Only endpoints are interested in timing. */
641
642   p = c->perf;
643   usecsperbyte = ((double) wait.rel_value_us) / size;
644   if (p->size == AVG_MSGS)
645   {
646     /* Array is full. Substract oldest value, add new one and store. */
647     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
648     p->usecsperbyte[p->idx] = usecsperbyte;
649     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
650   }
651   else
652   {
653     /* Array not yet full. Add current value to avg and store. */
654     p->usecsperbyte[p->idx] = usecsperbyte;
655     p->avg *= p->size;
656     p->avg += p->usecsperbyte[p->idx];
657     p->size++;
658     p->avg /= p->size;
659   }
660   p->idx = (p->idx + 1) % AVG_MSGS;
661 }
662
663
664 /**
665  * Get the previous hop in a connection
666  *
667  * @param c Connection.
668  *
669  * @return Previous peer in the connection.
670  */
671 static struct MeshPeer *
672 get_prev_hop (const struct MeshConnection *c)
673 {
674   GNUNET_PEER_Id id;
675
676   LOG (GNUNET_ERROR_TYPE_DEBUG, " get prev hop %s [%u/%u]\n",
677        GMC_2s (c), c->own_pos, c->path->length);
678   if (0 == c->own_pos || c->path->length < 2)
679     id = c->path->peers[0];
680   else
681     id = c->path->peers[c->own_pos - 1];
682
683   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
684        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
685
686   return GMP_get_short (id);
687 }
688
689
690 /**
691  * Get the next hop in a connection
692  *
693  * @param c Connection.
694  *
695  * @return Next peer in the connection.
696  */
697 static struct MeshPeer *
698 get_next_hop (const struct MeshConnection *c)
699 {
700   GNUNET_PEER_Id id;
701
702   LOG (GNUNET_ERROR_TYPE_DEBUG, " get next hop %s [%u/%u]\n",
703        GMC_2s (c), c->own_pos, c->path->length);
704   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
705     id = c->path->peers[c->path->length - 1];
706   else
707     id = c->path->peers[c->own_pos + 1];
708
709   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
710        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
711
712   return GMP_get_short (id);
713 }
714
715
716 /**
717  * Get the hop in a connection.
718  *
719  * @param c Connection.
720  * @param fwd Next hop?
721  *
722  * @return Next peer in the connection.
723  */
724 static struct MeshPeer *
725 get_hop (struct MeshConnection *c, int fwd)
726 {
727   if (fwd)
728     return get_next_hop (c);
729   return get_prev_hop (c);
730 }
731
732
733 /**
734  * Is traffic coming from this sender 'FWD' traffic?
735  *
736  * @param c Connection to check.
737  * @param sender Peer identity of neighbor.
738  *
739  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
740  *         the traffic is 'FWD'.
741  *         #GNUNET_NO for BCK.
742  *         #GNUNET_SYSERR for errors.
743  */
744 static int
745 is_fwd (const struct MeshConnection *c,
746         const struct GNUNET_PeerIdentity *sender)
747 {
748   GNUNET_PEER_Id id;
749
750   id = GNUNET_PEER_search (sender);
751   if (GMP_get_short_id (get_prev_hop (c)) == id)
752     return GNUNET_YES;
753
754   if (GMP_get_short_id (get_next_hop (c)) == id)
755     return GNUNET_NO;
756
757   GNUNET_break (0);
758   return GNUNET_SYSERR;
759 }
760
761
762 /**
763  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
764  * or a first CONNECTION_ACK directed to us.
765  *
766  * @param connection Connection to confirm.
767  * @param fwd Should we send it FWD? (root->dest)
768  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
769  */
770 static void
771 send_connection_ack (struct MeshConnection *connection, int fwd)
772 {
773   struct MeshTunnel3 *t;
774
775   t = connection->t;
776   LOG (GNUNET_ERROR_TYPE_INFO, "=> {%18s ACK} on connection %s\n",
777        GM_f2s (!fwd), GMC_2s (connection));
778   GMP_queue_add (get_hop (connection, fwd), NULL,
779                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
780                  sizeof (struct GNUNET_MESH_ConnectionACK),
781                  connection, fwd, &message_sent, NULL);
782   connection->pending_messages++;
783   if (MESH_TUNNEL3_NEW == GMT_get_cstate (t))
784     GMT_change_cstate (t, MESH_TUNNEL3_WAITING);
785   if (MESH_CONNECTION_READY != connection->state)
786     connection_change_state (connection, MESH_CONNECTION_SENT);
787 }
788
789
790 /**
791  * Send a notification that a connection is broken.
792  *
793  * @param c Connection that is broken.
794  * @param id1 Peer that has disconnected.
795  * @param id2 Peer that has disconnected.
796  * @param fwd Direction towards which to send it.
797  */
798 static void
799 send_broken (struct MeshConnection *c,
800              const struct GNUNET_PeerIdentity *id1,
801              const struct GNUNET_PeerIdentity *id2,
802              int fwd)
803 {
804   struct GNUNET_MESH_ConnectionBroken msg;
805
806   msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
807   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
808   msg.cid = c->id;
809   msg.peer1 = *id1;
810   msg.peer2 = *id2;
811   GMC_send_prebuilt_message (&msg.header, c, fwd, GNUNET_YES, NULL, NULL);
812 }
813
814
815 /**
816  * Send a notification that a connection is broken, when a connection
817  * isn't even created.
818  *
819  * @param connection_id Connection ID.
820  * @param id1 Peer that has disconnected.
821  * @param id2 Peer that has disconnected.
822  * @param peer Peer to notify (neighbor who sent the connection).
823  */
824 static void
825 send_broken2 (struct GNUNET_HashCode *connection_id,
826              const struct GNUNET_PeerIdentity *id1,
827              const struct GNUNET_PeerIdentity *id2,
828              GNUNET_PEER_Id peer_id)
829 {
830   struct GNUNET_MESH_ConnectionBroken *msg;
831   struct MeshPeer *neighbor;
832
833   LOG (GNUNET_ERROR_TYPE_INFO,
834        "Send BROKEN on unknown connection %s\n", GNUNET_h2s (connection_id));
835
836   msg = GNUNET_new (struct GNUNET_MESH_ConnectionBroken);
837   msg->header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
838   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
839   msg->cid = *connection_id;
840   msg->peer1 = *id1;
841   msg->peer2 = *id2;
842   neighbor = GMP_get_short (peer_id);
843   GMP_queue_add (neighbor, msg,
844                  GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED,
845                  sizeof (struct GNUNET_MESH_ConnectionBroken),
846                  NULL, GNUNET_SYSERR, /* connection, fwd */
847                  NULL, NULL); /* continuation */
848 }
849
850
851 /**
852  * Send keepalive packets for a connection.
853  *
854  * @param c Connection to keep alive..
855  * @param fwd Is this a FWD keepalive? (owner -> dest).
856  */
857 static void
858 send_connection_keepalive (struct MeshConnection *c, int fwd)
859 {
860   struct GNUNET_MESH_ConnectionKeepAlive *msg;
861   size_t size = sizeof (struct GNUNET_MESH_ConnectionKeepAlive);
862   char cbuf[size];
863
864   LOG (GNUNET_ERROR_TYPE_DEBUG,
865        "sending %s keepalive for connection %s]\n",
866        GM_f2s (fwd), GMC_2s (c));
867
868   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) cbuf;
869   msg->header.size = htons (size);
870   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_KEEPALIVE);
871   msg->cid = c->id;
872   msg->reserved = htonl (0);
873
874   GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_YES, NULL, NULL);
875 }
876
877
878 /**
879  * Send CONNECTION_{CREATE/ACK} packets for a connection.
880  *
881  * @param c Connection for which to send the message.
882  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
883  */
884 static void
885 connection_recreate (struct MeshConnection *c, int fwd)
886 {
887   LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
888   if (fwd)
889     GMC_send_create (c);
890   else
891     send_connection_ack (c, GNUNET_NO);
892 }
893
894
895 /**
896  * Generic connection timer management.
897  * Depending on the role of the peer in the connection will send the
898  * appropriate message (build or keepalive)
899  *
900  * @param c Conncetion to maintain.
901  * @param fwd Is FWD?
902  */
903 static void
904 connection_maintain (struct MeshConnection *c, int fwd)
905 {
906   if (GNUNET_NO != c->destroy)
907     return;
908
909   if (MESH_TUNNEL3_SEARCHING == GMT_get_cstate (c->t))
910   {
911     /* TODO DHT GET with RO_BART */
912     return;
913   }
914   switch (c->state)
915   {
916     case MESH_CONNECTION_NEW:
917       GNUNET_break (0);
918       /* fall-through */
919     case MESH_CONNECTION_SENT:
920       connection_recreate (c, fwd);
921       break;
922     case MESH_CONNECTION_READY:
923       send_connection_keepalive (c, fwd);
924       break;
925     default:
926       break;
927   }
928 }
929
930
931
932 /**
933  * Keep the connection alive.
934  *
935  * @param c Connection to keep alive.
936  * @param fwd Direction.
937  * @param shutdown Are we shutting down? (Don't send traffic)
938  *                 Non-zero value for true, not necessarily GNUNET_YES.
939  */
940 static void
941 connection_keepalive (struct MeshConnection *c, int fwd, int shutdown)
942 {
943   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s keepalive for %s\n",
944        GM_f2s (fwd), GMC_2s (c));
945
946   if (fwd)
947     c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
948   else
949     c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
950
951   if (GNUNET_NO != shutdown)
952     return;
953
954   connection_maintain (c, fwd);
955
956   /* Next execution will be scheduled by message_sent */
957 }
958
959
960 /**
961  * Keep the connection alive in the FWD direction.
962  *
963  * @param cls Closure (connection to keepalive).
964  * @param tc TaskContext.
965  */
966 static void
967 connection_fwd_keepalive (void *cls,
968                           const struct GNUNET_SCHEDULER_TaskContext *tc)
969 {
970   connection_keepalive ((struct MeshConnection *) cls,
971                         GNUNET_YES,
972                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
973 }
974
975
976 /**
977  * Keep the connection alive in the BCK direction.
978  *
979  * @param cls Closure (connection to keepalive).
980  * @param tc TaskContext.
981  */
982 static void
983 connection_bck_keepalive (void *cls,
984                           const struct GNUNET_SCHEDULER_TaskContext *tc)
985 {
986   connection_keepalive ((struct MeshConnection *) cls,
987                         GNUNET_NO,
988                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
989 }
990
991
992 /**
993  * Schedule next keepalive task, taking in consideration
994  * the connection state and number of retries.
995  *
996  * If the peer is not the origin, do nothing.
997  *
998  * @param c Connection for which to schedule the next keepalive.
999  * @param fwd Direction for the next keepalive.
1000  */
1001 static void
1002 schedule_next_keepalive (struct MeshConnection *c, int fwd)
1003 {
1004   struct GNUNET_TIME_Relative delay;
1005   GNUNET_SCHEDULER_TaskIdentifier *task_id;
1006   GNUNET_SCHEDULER_Task keepalive_task;
1007
1008   if (GNUNET_NO == GMC_is_origin (c, fwd))
1009     return;
1010
1011   /* Calculate delay to use, depending on the state of the connection */
1012   if (MESH_CONNECTION_READY == c->state)
1013   {
1014     delay = refresh_connection_time;
1015   }
1016   else
1017   {
1018     if (1 > c->create_retry)
1019       c->create_retry = 1;
1020     delay = GNUNET_TIME_relative_multiply (create_connection_time,
1021                                            c->create_retry);
1022     if (c->create_retry < 64)
1023       c->create_retry *= 2;
1024   }
1025
1026   /* Select direction-dependent parameters */
1027   if (GNUNET_YES == fwd)
1028   {
1029     task_id = &c->fwd_maintenance_task;
1030     keepalive_task = &connection_fwd_keepalive;
1031   }
1032   else
1033   {
1034     task_id = &c->bck_maintenance_task;
1035     keepalive_task = &connection_bck_keepalive;
1036   }
1037
1038   /* Check that no one scheduled it before us */
1039   if (GNUNET_SCHEDULER_NO_TASK != *task_id)
1040   {
1041     /* No need for a _break. It can happen for instance when sending a SYNACK
1042      * for a duplicate SYN: the first SYNACK scheduled the task. */
1043     GNUNET_SCHEDULER_cancel (*task_id);
1044   }
1045
1046   /* Schedule the task */
1047   *task_id = GNUNET_SCHEDULER_add_delayed (delay, keepalive_task, c);
1048 }
1049
1050
1051 /**
1052  * @brief Re-initiate traffic on this connection if necessary.
1053  *
1054  * Check if there is traffic queued towards this peer
1055  * and the core transmit handle is NULL (traffic was stalled).
1056  * If so, call core tmt rdy.
1057  *
1058  * @param c Connection on which initiate traffic.
1059  * @param fwd Is this about fwd traffic?
1060  */
1061 static void
1062 connection_unlock_queue (struct MeshConnection *c, int fwd)
1063 {
1064   struct MeshPeer *peer;
1065
1066   LOG (GNUNET_ERROR_TYPE_DEBUG,
1067               "connection_unlock_queue %s on %s\n",
1068               GM_f2s (fwd), GMC_2s (c));
1069
1070   if (GMC_is_terminal (c, fwd))
1071   {
1072     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal!\n");
1073     return;
1074   }
1075
1076   peer = get_hop (c, fwd);
1077   GMP_queue_unlock (peer, c);
1078 }
1079
1080
1081 /**
1082  * Cancel all transmissions that belong to a certain connection.
1083  *
1084  * If the connection is scheduled for destruction and no more messages are left,
1085  * the connection will be destroyed by the continuation call.
1086  *
1087  * @param c Connection which to cancel. Might be destroyed during this call.
1088  * @param fwd Cancel fwd traffic?
1089  */
1090 static void
1091 connection_cancel_queues (struct MeshConnection *c, int fwd)
1092 {
1093   struct MeshFlowControl *fc;
1094   struct MeshPeer *peer;
1095
1096   LOG (GNUNET_ERROR_TYPE_DEBUG,
1097        " *** Cancel %s queues for connection %s\n",
1098        GM_f2s (fwd), GMC_2s (c));
1099   if (NULL == c)
1100   {
1101     GNUNET_break (0);
1102     return;
1103   }
1104
1105   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1106   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
1107   {
1108     GNUNET_SCHEDULER_cancel (fc->poll_task);
1109     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1110     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Cancel POLL in ccq for fc %p\n", fc);
1111   }
1112   peer = get_hop (c, fwd);
1113   GMP_queue_cancel (peer, c);
1114 }
1115
1116
1117 /**
1118  * Function called if a connection has been stalled for a while,
1119  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1120  *
1121  * @param cls Closure (poll ctx).
1122  * @param tc TaskContext.
1123  */
1124 static void
1125 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1126
1127
1128 /**
1129  * Callback called when a queued POLL message is sent.
1130  *
1131  * @param cls Closure (FC).
1132  * @param c Connection this message was on.
1133  * @param q Queue handler this call invalidates.
1134  * @param type Type of message sent.
1135  * @param fwd Was this a FWD going message?
1136  * @param size Size of the message.
1137  */
1138 static void
1139 poll_sent (void *cls,
1140            struct MeshConnection *c,
1141            struct MeshConnectionQueue *q,
1142            uint16_t type, int fwd, size_t size)
1143 {
1144   struct MeshFlowControl *fc = cls;
1145
1146   if (2 == c->destroy)
1147   {
1148     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL canceled on shutdown\n");
1149     return;
1150   }
1151   LOG (GNUNET_ERROR_TYPE_DEBUG,
1152        " *** POLL sent for , scheduling new one!\n");
1153   fc->poll_msg = NULL;
1154   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
1155   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1156                                                 &connection_poll, fc);
1157   LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
1158
1159 }
1160
1161 /**
1162  * Function called if a connection has been stalled for a while,
1163  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1164  *
1165  * @param cls Closure (poll ctx).
1166  * @param tc TaskContext.
1167  */
1168 static void
1169 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1170 {
1171   struct MeshFlowControl *fc = cls;
1172   struct GNUNET_MESH_Poll msg;
1173   struct MeshConnection *c;
1174
1175   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1176   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1177   {
1178     return;
1179   }
1180
1181   c = fc->c;
1182   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
1183   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%s]\n", GMC_2s (c));
1184   LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   %s\n",
1185        fc == &c->fwd_fc ? "FWD" : "BCK");
1186
1187   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
1188   msg.header.size = htons (sizeof (msg));
1189   msg.pid = htonl (fc->last_pid_sent);
1190   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent);
1191   fc->poll_msg = GMC_send_prebuilt_message (&msg.header, c,
1192                                             fc == &c->fwd_fc, GNUNET_YES,
1193                                             &poll_sent, fc);
1194 }
1195
1196
1197 /**
1198  * Timeout function due to lack of keepalive/traffic from the owner.
1199  * Destroys connection if called.
1200  *
1201  * @param cls Closure (connection to destroy).
1202  * @param tc TaskContext.
1203  */
1204 static void
1205 connection_fwd_timeout (void *cls,
1206                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1207 {
1208   struct MeshConnection *c = cls;
1209
1210   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1211   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1212     return;
1213
1214   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s FWD timed out. Destroying.\n",
1215        GMC_2s (c));
1216   if (GMC_is_origin (c, GNUNET_YES)) /* If local, leave. */
1217   {
1218     GNUNET_break (0);
1219     return;
1220   }
1221
1222   GMC_destroy (c);
1223 }
1224
1225
1226 /**
1227  * Timeout function due to lack of keepalive/traffic from the destination.
1228  * Destroys connection if called.
1229  *
1230  * @param cls Closure (connection to destroy).
1231  * @param tc TaskContext
1232  */
1233 static void
1234 connection_bck_timeout (void *cls,
1235                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1236 {
1237   struct MeshConnection *c = cls;
1238
1239   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1240   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1241     return;
1242
1243   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s BCK timed out. Destroying.\n",
1244        GMC_2s (c));
1245
1246   if (GMC_is_origin (c, GNUNET_NO)) /* If local, leave. */
1247   {
1248     GNUNET_break (0);
1249     return;
1250   }
1251
1252   GMC_destroy (c);
1253 }
1254
1255
1256 /**
1257  * Resets the connection timeout task, some other message has done the
1258  * task's job.
1259  * - For the first peer on the direction this means to send
1260  *   a keepalive or a path confirmation message (either create or ACK).
1261  * - For all other peers, this means to destroy the connection,
1262  *   due to lack of activity.
1263  * Starts the timeout if no timeout was running (connection just created).
1264  *
1265  * @param c Connection whose timeout to reset.
1266  * @param fwd Is this forward?
1267  *
1268  * TODO use heap to improve efficiency of scheduler.
1269  */
1270 static void
1271 connection_reset_timeout (struct MeshConnection *c, int fwd)
1272 {
1273   GNUNET_SCHEDULER_TaskIdentifier *ti;
1274   GNUNET_SCHEDULER_Task f;
1275
1276   ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1277
1278   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GM_f2s (fwd));
1279
1280   if (GNUNET_SCHEDULER_NO_TASK != *ti)
1281     GNUNET_SCHEDULER_cancel (*ti);
1282
1283   if (GMC_is_origin (c, fwd)) /* Startpoint */
1284   {
1285     f = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
1286     *ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
1287   }
1288   else /* Relay, endpoint. */
1289   {
1290     struct GNUNET_TIME_Relative delay;
1291
1292     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1293     f = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1294     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1295   }
1296 }
1297
1298
1299 /**
1300  * Add the connection to the list of both neighbors.
1301  *
1302  * @param c Connection.
1303  *
1304  * @return #GNUNET_OK if everything went fine
1305  *         #GNUNET_SYSERR if the was an error and @c c is malformed.
1306  */
1307 static int
1308 register_neighbors (struct MeshConnection *c)
1309 {
1310   struct MeshPeer *next_peer;
1311   struct MeshPeer *prev_peer;
1312
1313   next_peer = get_next_hop (c);
1314   prev_peer = get_prev_hop (c);
1315
1316   LOG (GNUNET_ERROR_TYPE_DEBUG, "register neighbors for connection %s\n",
1317        GMC_2s (c));
1318   path_debug (c->path);
1319   LOG (GNUNET_ERROR_TYPE_DEBUG, "own pos %u\n", c->own_pos);
1320   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to next peer %p\n",
1321        GMC_2s (c), next_peer);
1322   LOG (GNUNET_ERROR_TYPE_DEBUG, "next peer %p %s\n", next_peer, GMP_2s (next_peer));
1323   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to prev peer %p\n",
1324        GMC_2s (c), prev_peer);
1325   LOG (GNUNET_ERROR_TYPE_DEBUG, "prev peer %p %s\n", prev_peer, GMP_2s (prev_peer));
1326
1327   if (GNUNET_NO == GMP_is_neighbor (next_peer)
1328       || GNUNET_NO == GMP_is_neighbor (prev_peer))
1329   {
1330     if (GMC_is_origin (c, GNUNET_YES))
1331       GNUNET_STATISTICS_update (stats, "# local bad paths", 1, GNUNET_NO);
1332     GNUNET_STATISTICS_update (stats, "# bad paths", 1, GNUNET_NO);
1333
1334     LOG (GNUNET_ERROR_TYPE_DEBUG, "  register neighbors failed\n");
1335     LOG (GNUNET_ERROR_TYPE_DEBUG, "  prev: %s, neighbor?: %d\n",
1336          GMP_2s (prev_peer), GMP_is_neighbor (prev_peer));
1337     LOG (GNUNET_ERROR_TYPE_DEBUG, "  next: %s, neighbor?: %d\n",
1338          GMP_2s (next_peer), GMP_is_neighbor (next_peer));
1339     return GNUNET_SYSERR;
1340   }
1341
1342   GMP_add_connection (next_peer, c);
1343   GMP_add_connection (prev_peer, c);
1344
1345   return GNUNET_OK;
1346 }
1347
1348
1349 /**
1350  * Remove the connection from the list of both neighbors.
1351  *
1352  * @param c Connection.
1353  */
1354 static void
1355 unregister_neighbors (struct MeshConnection *c)
1356 {
1357   struct MeshPeer *peer;
1358
1359   peer = get_next_hop (c);
1360   if (GNUNET_OK != GMP_remove_connection (peer, c))
1361   {
1362     GNUNET_assert (MESH_CONNECTION_NEW == c->state
1363                   || MESH_CONNECTION_DESTROYED == c->state);
1364     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1365     if (NULL != c->t) GMT_debug (c->t);
1366   }
1367
1368   peer = get_prev_hop (c);
1369   if (GNUNET_OK != GMP_remove_connection (peer, c))
1370   {
1371     GNUNET_assert (MESH_CONNECTION_NEW == c->state
1372                   || MESH_CONNECTION_DESTROYED == c->state);
1373     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1374     if (NULL != c->t) GMT_debug (c->t);
1375   }
1376 }
1377
1378
1379 /**
1380  * Bind the connection to the peer and the tunnel to that peer.
1381  *
1382  * If the peer has no tunnel, create one. Update tunnel and connection
1383  * data structres to reflect new status.
1384  *
1385  * @param c Connection.
1386  * @param peer Peer.
1387  */
1388 static void
1389 add_to_peer (struct MeshConnection *c, struct MeshPeer *peer)
1390 {
1391   GMP_add_tunnel (peer);
1392   c->t = GMP_get_tunnel (peer);
1393   GMT_add_connection (c->t, c);
1394 }
1395
1396
1397 /**
1398  * Builds a path from a PeerIdentity array.
1399  *
1400  * @param peers PeerIdentity array.
1401  * @param size Size of the @c peers array.
1402  * @param own_pos Output parameter: own position in the path.
1403  *
1404  * @return Fixed and shortened path.
1405  */
1406 static struct MeshPeerPath *
1407 build_path_from_peer_ids (struct GNUNET_PeerIdentity *peers,
1408                           unsigned int size,
1409                           unsigned int *own_pos)
1410 {
1411   struct MeshPeerPath *path;
1412   GNUNET_PEER_Id shortid;
1413   unsigned int i;
1414   unsigned int j;
1415   unsigned int offset;
1416
1417   /* Create path */
1418   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating path...\n");
1419   path = path_new (size);
1420   *own_pos = 0;
1421   offset = 0;
1422   for (i = 0; i < size; i++)
1423   {
1424     LOG (GNUNET_ERROR_TYPE_DEBUG, "  - %u: taking %s\n",
1425          i, GNUNET_i2s (&peers[i]));
1426     shortid = GNUNET_PEER_intern (&peers[i]);
1427
1428     /* Check for loops / duplicates */
1429     for (j = 0; j < i - offset; j++)
1430     {
1431       if (path->peers[j] == shortid)
1432       {
1433         LOG (GNUNET_ERROR_TYPE_DEBUG, "    already exists at pos %u\n", j);
1434         offset += i - j;
1435         LOG (GNUNET_ERROR_TYPE_DEBUG, "    offset now\n", offset);
1436         GNUNET_PEER_change_rc (shortid, -1);
1437       }
1438     }
1439     LOG (GNUNET_ERROR_TYPE_DEBUG, "    storing at %u\n", i - offset);
1440     path->peers[i - offset] = shortid;
1441     if (path->peers[i] == myid)
1442       *own_pos = i;
1443   }
1444   path->length -= offset;
1445
1446   if (path->peers[*own_pos] != myid)
1447   {
1448     /* create path: self not found in path through self */
1449     GNUNET_break_op (0);
1450     path_destroy (path);
1451     return NULL;
1452   }
1453
1454   return path;
1455 }
1456
1457
1458 /**
1459  * Log receipt of message on stderr (INFO level).
1460  *
1461  * @param message Message received.
1462  * @param peer Peer who sent the message.
1463  * @param hash Connection ID.
1464  */
1465 static void
1466 log_message (const struct GNUNET_MessageHeader *message,
1467              const struct GNUNET_PeerIdentity *peer,
1468              const struct GNUNET_HashCode *hash)
1469 {
1470   LOG (GNUNET_ERROR_TYPE_INFO, "<- %s on connection %s from %s\n",
1471        GM_m2s (ntohs (message->type)), GNUNET_h2s (hash), GNUNET_i2s (peer));
1472 }
1473
1474 /******************************************************************************/
1475 /********************************    API    ***********************************/
1476 /******************************************************************************/
1477
1478 /**
1479  * Core handler for connection creation.
1480  *
1481  * @param cls Closure (unused).
1482  * @param peer Sender (neighbor).
1483  * @param message Message.
1484  *
1485  * @return GNUNET_OK to keep the connection open,
1486  *         GNUNET_SYSERR to close it (signal serious error)
1487  */
1488 int
1489 GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
1490                    const struct GNUNET_MessageHeader *message)
1491 {
1492   struct GNUNET_MESH_ConnectionCreate *msg;
1493   struct GNUNET_PeerIdentity *id;
1494   struct GNUNET_HashCode *cid;
1495   struct MeshPeerPath *path;
1496   struct MeshPeer *dest_peer;
1497   struct MeshPeer *orig_peer;
1498   struct MeshConnection *c;
1499   unsigned int own_pos;
1500   uint16_t size;
1501
1502   /* Check size */
1503   size = ntohs (message->size);
1504   if (size < sizeof (struct GNUNET_MESH_ConnectionCreate))
1505   {
1506     GNUNET_break_op (0);
1507     return GNUNET_OK;
1508   }
1509
1510   /* Calculate hops */
1511   size -= sizeof (struct GNUNET_MESH_ConnectionCreate);
1512   if (size % sizeof (struct GNUNET_PeerIdentity))
1513   {
1514     GNUNET_break_op (0);
1515     return GNUNET_OK;
1516   }
1517   size /= sizeof (struct GNUNET_PeerIdentity);
1518   if (1 > size)
1519   {
1520     GNUNET_break_op (0);
1521     return GNUNET_OK;
1522   }
1523   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1524
1525   /* Get parameters */
1526   msg = (struct GNUNET_MESH_ConnectionCreate *) message;
1527   cid = &msg->cid;
1528   log_message (message, peer, cid);
1529   id = (struct GNUNET_PeerIdentity *) &msg[1];
1530   LOG (GNUNET_ERROR_TYPE_DEBUG, "    origin: %s\n", GNUNET_i2s (id));
1531
1532   /* Create connection */
1533   c = connection_get (cid);
1534   if (NULL == c)
1535   {
1536     path = build_path_from_peer_ids ((struct GNUNET_PeerIdentity *) &msg[1],
1537                                      size, &own_pos);
1538     if (NULL == path)
1539       return GNUNET_OK;
1540     if (0 == own_pos)
1541     {
1542       GNUNET_break_op (0);
1543       path_destroy (path);
1544       return GNUNET_OK;
1545     }
1546     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1547     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1548     c = GMC_new (cid, NULL, path_duplicate (path), own_pos);
1549     if (NULL == c)
1550     {
1551       if (path->length - 1 == own_pos)
1552       {
1553         /* If we are destination, why did the creation fail? */
1554         GNUNET_break (0);
1555         return GNUNET_OK;
1556       }
1557       send_broken2 (cid, &my_full_id,
1558                     GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
1559                     path->peers[own_pos - 1]);
1560       path_destroy (path);
1561       return GNUNET_OK;
1562     }
1563     GMP_add_path_to_all (path, GNUNET_NO);
1564     connection_reset_timeout (c, GNUNET_YES);
1565   }
1566   else
1567   {
1568     path = path_duplicate (c->path);
1569   }
1570   if (MESH_CONNECTION_NEW == c->state)
1571     connection_change_state (c, MESH_CONNECTION_SENT);
1572
1573   /* Remember peers */
1574   dest_peer = GMP_get (&id[size - 1]);
1575   orig_peer = GMP_get (&id[0]);
1576
1577   /* Is it a connection to us? */
1578   if (c->own_pos == size - 1)
1579   {
1580     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
1581     GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
1582
1583     add_to_peer (c, orig_peer);
1584     if (MESH_TUNNEL3_NEW == GMT_get_cstate (c->t))
1585       GMT_change_cstate (c->t,  MESH_TUNNEL3_WAITING);
1586
1587     send_connection_ack (c, GNUNET_NO);
1588     if (MESH_CONNECTION_SENT == c->state)
1589       connection_change_state (c, MESH_CONNECTION_ACK);
1590   }
1591   else
1592   {
1593     /* It's for somebody else! Retransmit. */
1594     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
1595     GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
1596     GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
1597     GMC_send_prebuilt_message (message, c, GNUNET_YES, GNUNET_YES,
1598                                NULL, NULL);
1599   }
1600   path_destroy (path);
1601   return GNUNET_OK;
1602 }
1603
1604
1605 /**
1606  * Core handler for path confirmations.
1607  *
1608  * @param cls closure
1609  * @param message message
1610  * @param peer peer identity this notification is about
1611  *
1612  * @return GNUNET_OK to keep the connection open,
1613  *         GNUNET_SYSERR to close it (signal serious error)
1614  */
1615 int
1616 GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
1617                     const struct GNUNET_MessageHeader *message)
1618 {
1619   struct GNUNET_MESH_ConnectionACK *msg;
1620   struct MeshConnection *c;
1621   struct MeshPeerPath *p;
1622   struct MeshPeer *pi;
1623   enum MeshConnectionState oldstate;
1624   int fwd;
1625
1626   msg = (struct GNUNET_MESH_ConnectionACK *) message;
1627   log_message (message, peer, &msg->cid);
1628   c = connection_get (&msg->cid);
1629   if (NULL == c)
1630   {
1631     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1632                               1, GNUNET_NO);
1633     LOG (GNUNET_ERROR_TYPE_DEBUG, "  don't know the connection!\n");
1634     return GNUNET_OK;
1635   }
1636
1637   if (GNUNET_NO != c->destroy)
1638   {
1639     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection being destroyed\n");
1640     return GNUNET_OK;
1641   }
1642
1643   oldstate = c->state;
1644   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
1645   pi = GMP_get (peer);
1646   if (get_next_hop (c) == pi)
1647   {
1648     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
1649     fwd = GNUNET_NO;
1650     if (MESH_CONNECTION_SENT == oldstate)
1651       connection_change_state (c, MESH_CONNECTION_ACK);
1652   }
1653   else if (get_prev_hop (c) == pi)
1654   {
1655     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK\n");
1656     fwd = GNUNET_YES;
1657     connection_change_state (c, MESH_CONNECTION_READY);
1658   }
1659   else
1660   {
1661     GNUNET_break_op (0);
1662     return GNUNET_OK;
1663   }
1664
1665   connection_reset_timeout (c, fwd);
1666
1667   /* Add path to peers? */
1668   p = c->path;
1669   if (NULL != p)
1670   {
1671     GMP_add_path_to_all (p, GNUNET_YES);
1672   }
1673   else
1674   {
1675     GNUNET_break (0);
1676   }
1677
1678   /* Message for us as creator? */
1679   if (GMC_is_origin (c, GNUNET_YES))
1680   {
1681     if (GNUNET_NO != fwd)
1682     {
1683       GNUNET_break_op (0);
1684       return GNUNET_OK;
1685     }
1686     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
1687
1688     /* If just created, cancel the short timeout and start a long one */
1689     if (MESH_CONNECTION_SENT == oldstate)
1690       connection_reset_timeout (c, GNUNET_YES);
1691
1692     /* Change connection state */
1693     connection_change_state (c, MESH_CONNECTION_READY);
1694     send_connection_ack (c, GNUNET_YES);
1695
1696     /* Change tunnel state, trigger KX */
1697     if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
1698       GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
1699
1700     return GNUNET_OK;
1701   }
1702
1703   /* Message for us as destination? */
1704   if (GMC_is_terminal (c, GNUNET_YES))
1705   {
1706     if (GNUNET_YES != fwd)
1707     {
1708       GNUNET_break_op (0);
1709       return GNUNET_OK;
1710     }
1711     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
1712
1713     /* If just created, cancel the short timeout and start a long one */
1714     if (MESH_CONNECTION_ACK == oldstate)
1715       connection_reset_timeout (c, GNUNET_NO);
1716
1717     /* Change tunnel state */
1718     if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
1719       GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
1720
1721     return GNUNET_OK;
1722   }
1723
1724   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1725   GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
1726   return GNUNET_OK;
1727 }
1728
1729
1730 /**
1731  * Core handler for notifications of broken paths
1732  *
1733  * @param cls Closure (unused).
1734  * @param id Peer identity of sending neighbor.
1735  * @param message Message.
1736  *
1737  * @return GNUNET_OK to keep the connection open,
1738  *         GNUNET_SYSERR to close it (signal serious error)
1739  */
1740 int
1741 GMC_handle_broken (void* cls,
1742                    const struct GNUNET_PeerIdentity* id,
1743                    const struct GNUNET_MessageHeader* message)
1744 {
1745   struct GNUNET_MESH_ConnectionBroken *msg;
1746   struct MeshConnection *c;
1747   int fwd;
1748
1749   msg = (struct GNUNET_MESH_ConnectionBroken *) message;
1750   log_message (message, id, &msg->cid);
1751   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1752               GNUNET_i2s (&msg->peer1));
1753   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1754               GNUNET_i2s (&msg->peer2));
1755   c = connection_get (&msg->cid);
1756   if (NULL == c)
1757   {
1758     GNUNET_break_op (0);
1759     return GNUNET_OK;
1760   }
1761
1762   fwd = is_fwd (c, id);
1763   if (GMC_is_terminal (c, fwd))
1764   {
1765     path_invalidate (c->path);
1766     if (0 < c->pending_messages)
1767       c->destroy = GNUNET_YES;
1768     else
1769       GMC_destroy (c);
1770   }
1771   else
1772   {
1773     GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
1774     c->destroy = GNUNET_YES;
1775     connection_cancel_queues (c, !fwd);
1776   }
1777
1778   return GNUNET_OK;
1779
1780 }
1781
1782
1783 /**
1784  * Core handler for tunnel destruction
1785  *
1786  * @param cls Closure (unused).
1787  * @param peer Peer identity of sending neighbor.
1788  * @param message Message.
1789  *
1790  * @return GNUNET_OK to keep the connection open,
1791  *         GNUNET_SYSERR to close it (signal serious error)
1792  */
1793 int
1794 GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
1795                     const struct GNUNET_MessageHeader *message)
1796 {
1797   struct GNUNET_MESH_ConnectionDestroy *msg;
1798   struct MeshConnection *c;
1799   int fwd;
1800
1801   msg = (struct GNUNET_MESH_ConnectionDestroy *) message;
1802   log_message (message, peer, &msg->cid);
1803   c = connection_get (&msg->cid);
1804   if (NULL == c)
1805   {
1806     /* Probably already got the message from another path,
1807      * destroyed the tunnel and retransmitted to children.
1808      * Safe to ignore.
1809      */
1810     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1811                               1, GNUNET_NO);
1812     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection unknown: already destroyed?\n");
1813     return GNUNET_OK;
1814   }
1815   fwd = is_fwd (c, peer);
1816   if (GNUNET_SYSERR == fwd)
1817   {
1818     GNUNET_break_op (0); /* FIXME */
1819     return GNUNET_OK;
1820   }
1821   if (GNUNET_NO == GMC_is_terminal (c, fwd))
1822     GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
1823   else if (0 == c->pending_messages)
1824   {
1825     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  directly destroying connection!\n");
1826     GMC_destroy (c);
1827     return GNUNET_OK;
1828   }
1829   c->destroy = GNUNET_YES;
1830   c->state = MESH_CONNECTION_DESTROYED;
1831   if (NULL != c->t)
1832   {
1833     GMT_remove_connection (c->t, c);
1834     c->t = NULL;
1835   }
1836
1837   return GNUNET_OK;
1838 }
1839
1840 /**
1841  * Generic handler for mesh network encrypted traffic.
1842  *
1843  * @param peer Peer identity this notification is about.
1844  * @param msg Encrypted message.
1845  *
1846  * @return GNUNET_OK to keep the connection open,
1847  *         GNUNET_SYSERR to close it (signal serious error)
1848  */
1849 static int
1850 handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
1851                        const struct GNUNET_MESH_Encrypted *msg)
1852 {
1853   struct MeshConnection *c;
1854   struct MeshPeer *neighbor;
1855   struct MeshFlowControl *fc;
1856   GNUNET_PEER_Id peer_id;
1857   uint32_t pid;
1858   uint32_t ttl;
1859   size_t size;
1860   int fwd;
1861
1862   log_message (&msg->header, peer, &msg->cid);
1863
1864   /* Check size */
1865   size = ntohs (msg->header.size);
1866   if (size <
1867       sizeof (struct GNUNET_MESH_Encrypted) +
1868       sizeof (struct GNUNET_MessageHeader))
1869   {
1870     GNUNET_break_op (0);
1871     return GNUNET_OK;
1872   }
1873
1874   /* Check connection */
1875   c = connection_get (&msg->cid);
1876   if (NULL == c)
1877   {
1878     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1879     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING enc on unknown connection %s\n",
1880          GNUNET_h2s (&msg->cid));
1881     return GNUNET_OK;
1882   }
1883
1884   LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection %s\n", GMC_2s (c));
1885
1886   /* Check if origin is as expected */
1887   neighbor = get_prev_hop (c);
1888   peer_id = GNUNET_PEER_search (peer);
1889   if (peer_id == GMP_get_short_id (neighbor))
1890   {
1891     fwd = GNUNET_YES;
1892   }
1893   else
1894   {
1895     neighbor = get_next_hop (c);
1896     if (peer_id == GMP_get_short_id (neighbor))
1897     {
1898       fwd = GNUNET_NO;
1899     }
1900     else
1901     {
1902       /* Unexpected peer sending traffic on a connection. */
1903       GNUNET_break_op (0);
1904       return GNUNET_OK;
1905     }
1906   }
1907
1908   /* Check PID */
1909   fc = fwd ? &c->bck_fc : &c->fwd_fc;
1910   pid = ntohl (msg->pid);
1911   if (GM_is_pid_bigger (pid, fc->last_ack_sent))
1912   {
1913     GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
1914     LOG (GNUNET_ERROR_TYPE_DEBUG,
1915                 "WARNING Received PID %u, (prev %u), ACK %u\n",
1916                 pid, fc->last_pid_recv, fc->last_ack_sent);
1917     return GNUNET_OK;
1918   }
1919   if (GNUNET_NO == GM_is_pid_bigger (pid, fc->last_pid_recv))
1920   {
1921     GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
1922     LOG (GNUNET_ERROR_TYPE_DEBUG,
1923                 " Pid %u not expected (%u+), dropping!\n",
1924                 pid, fc->last_pid_recv + 1);
1925     return GNUNET_OK;
1926   }
1927   if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
1928     connection_change_state (c, MESH_CONNECTION_READY);
1929   connection_reset_timeout (c, fwd);
1930   fc->last_pid_recv = pid;
1931
1932   /* Is this message for us? */
1933   if (GMC_is_terminal (c, fwd))
1934   {
1935     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
1936     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
1937
1938     if (NULL == c->t)
1939     {
1940       GNUNET_break (GNUNET_NO != c->destroy);
1941       return GNUNET_OK;
1942     }
1943     fc->last_pid_recv = pid;
1944     GMT_handle_encrypted (c->t, msg);
1945     GMC_send_ack (c, fwd, GNUNET_NO);
1946     return GNUNET_OK;
1947   }
1948
1949   /* Message not for us: forward to next hop */
1950   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1951   ttl = ntohl (msg->ttl);
1952   LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
1953   if (ttl == 0)
1954   {
1955     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
1956     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
1957     GMC_send_ack (c, fwd, GNUNET_NO);
1958     return GNUNET_OK;
1959   }
1960
1961   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
1962   GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_NO, NULL, NULL);
1963
1964   return GNUNET_OK;
1965 }
1966
1967 /**
1968  * Generic handler for mesh network encrypted traffic.
1969  *
1970  * @param peer Peer identity this notification is about.
1971  * @param msg Encrypted message.
1972  *
1973  * @return GNUNET_OK to keep the connection open,
1974  *         GNUNET_SYSERR to close it (signal serious error)
1975  */
1976 static int
1977 handle_mesh_kx (const struct GNUNET_PeerIdentity *peer,
1978                 const struct GNUNET_MESH_KX *msg)
1979 {
1980   struct MeshConnection *c;
1981   struct MeshPeer *neighbor;
1982   GNUNET_PEER_Id peer_id;
1983   size_t size;
1984   int fwd;
1985
1986   log_message (&msg->header, peer, &msg->cid);
1987
1988   /* Check size */
1989   size = ntohs (msg->header.size);
1990   if (size <
1991       sizeof (struct GNUNET_MESH_Encrypted) +
1992       sizeof (struct GNUNET_MessageHeader))
1993   {
1994     GNUNET_break_op (0);
1995     return GNUNET_OK;
1996   }
1997
1998   /* Check connection */
1999   c = connection_get (&msg->cid);
2000   if (NULL == c)
2001   {
2002     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
2003     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING kx on unknown connection %s\n",
2004          GNUNET_h2s (&msg->cid));
2005     return GNUNET_OK;
2006   }
2007   LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s\n", GMC_2s (c));
2008
2009   /* Check if origin is as expected */
2010   neighbor = get_prev_hop (c);
2011   peer_id = GNUNET_PEER_search (peer);
2012   if (peer_id == GMP_get_short_id (neighbor))
2013   {
2014     fwd = GNUNET_YES;
2015   }
2016   else
2017   {
2018     neighbor = get_next_hop (c);
2019     if (peer_id == GMP_get_short_id (neighbor))
2020     {
2021       fwd = GNUNET_NO;
2022     }
2023     else
2024     {
2025       /* Unexpected peer sending traffic on a connection. */
2026       GNUNET_break_op (0);
2027       return GNUNET_OK;
2028     }
2029   }
2030
2031   /* Count as connection confirmation. */
2032   if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
2033   {
2034     connection_change_state (c, MESH_CONNECTION_READY);
2035     if (NULL != c->t)
2036     {
2037       if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
2038         GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
2039     }
2040   }
2041   connection_reset_timeout (c, fwd);
2042
2043   /* Is this message for us? */
2044   if (GMC_is_terminal (c, fwd))
2045   {
2046     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2047     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
2048     if (NULL == c->t)
2049     {
2050       GNUNET_break (0);
2051       return GNUNET_OK;
2052     }
2053     GMT_handle_kx (c->t, &msg[1].header);
2054     return GNUNET_OK;
2055   }
2056
2057   /* Message not for us: forward to next hop */
2058   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2059   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2060   GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_NO, NULL, NULL);
2061
2062   return GNUNET_OK;
2063 }
2064
2065
2066 /**
2067  * Core handler for encrypted mesh network traffic (channel mgmt, data).
2068  *
2069  * @param cls Closure (unused).
2070  * @param message Message received.
2071  * @param peer Peer who sent the message.
2072  *
2073  * @return GNUNET_OK to keep the connection open,
2074  *         GNUNET_SYSERR to close it (signal serious error)
2075  */
2076 int
2077 GMC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
2078                       const struct GNUNET_MessageHeader *message)
2079 {
2080   return handle_mesh_encrypted (peer,
2081                                 (struct GNUNET_MESH_Encrypted *)message);
2082 }
2083
2084
2085 /**
2086  * Core handler for key exchange traffic (ephemeral key, ping, pong).
2087  *
2088  * @param cls Closure (unused).
2089  * @param message Message received.
2090  * @param peer Peer who sent the message.
2091  *
2092  * @return GNUNET_OK to keep the connection open,
2093  *         GNUNET_SYSERR to close it (signal serious error)
2094  */
2095 int
2096 GMC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
2097                const struct GNUNET_MessageHeader *message)
2098 {
2099   return handle_mesh_kx (peer,
2100                          (struct GNUNET_MESH_KX *) message);
2101 }
2102
2103
2104 /**
2105  * Core handler for mesh network traffic point-to-point acks.
2106  *
2107  * @param cls closure
2108  * @param message message
2109  * @param peer peer identity this notification is about
2110  *
2111  * @return GNUNET_OK to keep the connection open,
2112  *         GNUNET_SYSERR to close it (signal serious error)
2113  */
2114 int
2115 GMC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
2116                 const struct GNUNET_MessageHeader *message)
2117 {
2118   struct GNUNET_MESH_ACK *msg;
2119   struct MeshConnection *c;
2120   struct MeshFlowControl *fc;
2121   GNUNET_PEER_Id id;
2122   uint32_t ack;
2123   int fwd;
2124
2125   msg = (struct GNUNET_MESH_ACK *) message;
2126   log_message (message, peer, &msg->cid);
2127   c = connection_get (&msg->cid);
2128   if (NULL == c)
2129   {
2130     GNUNET_STATISTICS_update (stats, "# ack on unknown connection", 1,
2131                               GNUNET_NO);
2132     return GNUNET_OK;
2133   }
2134
2135   /* Is this a forward or backward ACK? */
2136   id = GNUNET_PEER_search (peer);
2137   if (GMP_get_short_id (get_next_hop (c)) == id)
2138   {
2139     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
2140     fc = &c->fwd_fc;
2141     fwd = GNUNET_YES;
2142   }
2143   else if (GMP_get_short_id (get_prev_hop (c)) == id)
2144   {
2145     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
2146     fc = &c->bck_fc;
2147     fwd = GNUNET_NO;
2148   }
2149   else
2150   {
2151     GNUNET_break_op (0);
2152     return GNUNET_OK;
2153   }
2154
2155   ack = ntohl (msg->ack);
2156   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK %u (was %u)\n",
2157               ack, fc->last_ack_recv);
2158   if (GM_is_pid_bigger (ack, fc->last_ack_recv))
2159     fc->last_ack_recv = ack;
2160
2161   /* Cancel polling if the ACK is big enough. */
2162   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
2163       GM_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2164   {
2165     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
2166     GNUNET_SCHEDULER_cancel (fc->poll_task);
2167     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2168     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
2169   }
2170
2171   connection_unlock_queue (c, fwd);
2172
2173   return GNUNET_OK;
2174 }
2175
2176
2177 /**
2178  * Core handler for mesh network traffic point-to-point ack polls.
2179  *
2180  * @param cls closure
2181  * @param message message
2182  * @param peer peer identity this notification is about
2183  *
2184  * @return GNUNET_OK to keep the connection open,
2185  *         GNUNET_SYSERR to close it (signal serious error)
2186  */
2187 int
2188 GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
2189                  const struct GNUNET_MessageHeader *message)
2190 {
2191   struct GNUNET_MESH_Poll *msg;
2192   struct MeshConnection *c;
2193   struct MeshFlowControl *fc;
2194   GNUNET_PEER_Id id;
2195   uint32_t pid;
2196   int fwd;
2197
2198   msg = (struct GNUNET_MESH_Poll *) message;
2199   log_message (message, peer, &msg->cid);
2200   c = connection_get (&msg->cid);
2201   if (NULL == c)
2202   {
2203     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
2204                               GNUNET_NO);
2205     LOG (GNUNET_ERROR_TYPE_DEBUG,
2206          "WARNING POLL message on unknown connection %s!\n",
2207          GNUNET_h2s (&msg->cid));
2208     return GNUNET_OK;
2209   }
2210
2211   /* Is this a forward or backward ACK?
2212    * Note: a poll should never be needed in a loopback case,
2213    * since there is no possiblility of packet loss there, so
2214    * this way of discerining FWD/BCK should not be a problem.
2215    */
2216   id = GNUNET_PEER_search (peer);
2217   if (GMP_get_short_id (get_next_hop (c)) == id)
2218   {
2219     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
2220     fc = &c->fwd_fc;
2221   }
2222   else if (GMP_get_short_id (get_prev_hop (c)) == id)
2223   {
2224     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
2225     fc = &c->bck_fc;
2226   }
2227   else
2228   {
2229     GNUNET_break_op (0);
2230     return GNUNET_OK;
2231   }
2232
2233   pid = ntohl (msg->pid);
2234   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
2235   fc->last_pid_recv = pid;
2236   fwd = fc == &c->bck_fc;
2237   GMC_send_ack (c, fwd, GNUNET_YES);
2238
2239   return GNUNET_OK;
2240 }
2241
2242
2243 /**
2244  * Core handler for mesh keepalives.
2245  *
2246  * @param cls closure
2247  * @param message message
2248  * @param peer peer identity this notification is about
2249  * @return GNUNET_OK to keep the connection open,
2250  *         GNUNET_SYSERR to close it (signal serious error)
2251  *
2252  * TODO: Check who we got this from, to validate route.
2253  */
2254 int
2255 GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer,
2256                       const struct GNUNET_MessageHeader *message)
2257 {
2258   struct GNUNET_MESH_ConnectionKeepAlive *msg;
2259   struct MeshConnection *c;
2260   struct MeshPeer *neighbor;
2261   GNUNET_PEER_Id peer_id;
2262   int fwd;
2263
2264   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) message;
2265   log_message (message, peer, &msg->cid);
2266   c = connection_get (&msg->cid);
2267   if (NULL == c)
2268   {
2269     GNUNET_STATISTICS_update (stats, "# keepalive on unknown connection", 1,
2270                               GNUNET_NO);
2271     return GNUNET_OK;
2272   }
2273
2274   /* Check if origin is as expected TODO refactor and reuse */
2275   peer_id = GNUNET_PEER_search (peer);
2276   neighbor = get_prev_hop (c);
2277   if (peer_id == GMP_get_short_id (neighbor))
2278   {
2279     fwd = GNUNET_YES;
2280   }
2281   else
2282   {
2283     neighbor = get_next_hop (c);
2284     if (peer_id == GMP_get_short_id (neighbor))
2285     {
2286       fwd = GNUNET_NO;
2287     }
2288     else
2289     {
2290       GNUNET_break_op (0);
2291       return GNUNET_OK;
2292     }
2293   }
2294
2295   connection_change_state (c, MESH_CONNECTION_READY);
2296   connection_reset_timeout (c, fwd);
2297
2298   if (GMC_is_terminal (c, fwd))
2299     return GNUNET_OK;
2300
2301   GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO);
2302   GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
2303
2304   return GNUNET_OK;
2305 }
2306
2307
2308 /**
2309  * Send an ACK on the appropriate connection/channel, depending on
2310  * the direction and the position of the peer.
2311  *
2312  * @param c Which connection to send the hop-by-hop ACK.
2313  * @param fwd Is this a fwd ACK? (will go dest->root).
2314  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
2315  */
2316 void
2317 GMC_send_ack (struct MeshConnection *c, int fwd, int force)
2318 {
2319   unsigned int buffer;
2320
2321   LOG (GNUNET_ERROR_TYPE_DEBUG,
2322        "GMC send %s ACK on %s\n",
2323        GM_f2s (fwd), GMC_2s (c));
2324
2325   if (NULL == c)
2326   {
2327     GNUNET_break (0);
2328     return;
2329   }
2330
2331   if (GNUNET_NO != c->destroy)
2332   {
2333     LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
2334     return;
2335   }
2336
2337   /* Get available buffer space */
2338   if (GMC_is_terminal (c, fwd))
2339   {
2340     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
2341     buffer = GMT_get_channels_buffer (c->t);
2342   }
2343   else
2344   {
2345     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
2346     buffer = GMC_get_buffer (c, fwd);
2347   }
2348   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
2349   if (0 == buffer && GNUNET_NO == force)
2350     return;
2351
2352   /* Send available buffer space */
2353   if (GMC_is_origin (c, fwd))
2354   {
2355     GNUNET_assert (NULL != c->t);
2356     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2357     GMT_unchoke_channels (c->t);
2358   }
2359   else
2360   {
2361     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2362     send_ack (c, buffer, fwd, force);
2363   }
2364 }
2365
2366
2367 /**
2368  * Initialize the connections subsystem
2369  *
2370  * @param c Configuration handle.
2371  */
2372 void
2373 GMC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2374 {
2375   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2376   if (GNUNET_OK !=
2377       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_MSGS_QUEUE",
2378                                              &max_msgs_queue))
2379   {
2380     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2381                                "MESH", "MAX_MSGS_QUEUE", "MISSING");
2382     GNUNET_SCHEDULER_shutdown ();
2383     return;
2384   }
2385
2386   if (GNUNET_OK !=
2387       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_CONNECTIONS",
2388                                              &max_connections))
2389   {
2390     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2391                                "MESH", "MAX_CONNECTIONS", "MISSING");
2392     GNUNET_SCHEDULER_shutdown ();
2393     return;
2394   }
2395
2396   if (GNUNET_OK !=
2397       GNUNET_CONFIGURATION_get_value_time (c, "MESH", "REFRESH_CONNECTION_TIME",
2398                                            &refresh_connection_time))
2399   {
2400     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2401                                "MESH", "REFRESH_CONNECTION_TIME", "MISSING");
2402     GNUNET_SCHEDULER_shutdown ();
2403     return;
2404   }
2405   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2406   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES);
2407 }
2408
2409
2410 /**
2411  * Destroy each connection on shutdown.
2412  *
2413  * @param cls Closure (unused).
2414  * @param key Current key code (CID, unused).
2415  * @param value Value in the hash map (connection)
2416  *
2417  * @return #GNUNET_YES, because we should continue to iterate,
2418  */
2419 static int
2420 shutdown_iterator (void *cls,
2421                    const struct GNUNET_HashCode *key,
2422                    void *value)
2423 {
2424   struct MeshConnection *c = value;
2425
2426   GMC_destroy (c);
2427   return GNUNET_YES;
2428 }
2429
2430
2431 /**
2432  * Shut down the connections subsystem.
2433  */
2434 void
2435 GMC_shutdown (void)
2436 {
2437   GNUNET_CONTAINER_multihashmap_iterate (connections, &shutdown_iterator, NULL);
2438   GNUNET_CONTAINER_multihashmap_destroy (connections);
2439   connections = NULL;
2440 }
2441
2442
2443 struct MeshConnection *
2444 GMC_new (const struct GNUNET_HashCode *cid,
2445          struct MeshTunnel3 *t,
2446          struct MeshPeerPath *p,
2447          unsigned int own_pos)
2448 {
2449   struct MeshConnection *c;
2450
2451   c = GNUNET_new (struct MeshConnection);
2452   c->id = *cid;
2453   GNUNET_assert (GNUNET_OK ==
2454                  GNUNET_CONTAINER_multihashmap_put (connections,
2455                                                     &c->id, c,
2456                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2457   fc_init (&c->fwd_fc);
2458   fc_init (&c->bck_fc);
2459   c->fwd_fc.c = c;
2460   c->bck_fc.c = c;
2461
2462   c->t = t;
2463   GNUNET_assert (own_pos <= p->length - 1);
2464   c->own_pos = own_pos;
2465   c->path = p;
2466
2467   if (GNUNET_OK != register_neighbors (c))
2468   {
2469     if (0 == own_pos)
2470     {
2471       path_invalidate (c->path);
2472       c->t = NULL;
2473       c->path = NULL;
2474     }
2475     GMC_destroy (c);
2476     return NULL;
2477   }
2478
2479   return c;
2480 }
2481
2482
2483 void
2484 GMC_destroy (struct MeshConnection *c)
2485 {
2486   if (NULL == c)
2487   {
2488     GNUNET_break (0);
2489     return;
2490   }
2491
2492   if (2 == c->destroy) /* cancel queues -> GMP_queue_cancel -> q_destroy -> */
2493     return;            /* -> message_sent -> GMC_destroy. Don't loop. */
2494   c->destroy = 2;
2495
2496   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GMC_2s (c));
2497   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc's f: %p, b: %p\n",
2498        &c->fwd_fc, &c->bck_fc);
2499   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2500        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2501
2502   /* Cancel all traffic */
2503   if (NULL != c->path)
2504   {
2505     connection_cancel_queues (c, GNUNET_YES);
2506     connection_cancel_queues (c, GNUNET_NO);
2507     unregister_neighbors (c);
2508   }
2509
2510   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2511        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2512
2513   /* Cancel maintainance task (keepalive/timeout) */
2514   if (NULL != c->fwd_fc.poll_msg)
2515   {
2516     GMC_cancel (c->fwd_fc.poll_msg);
2517     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg FWD canceled\n");
2518   }
2519   if (NULL != c->bck_fc.poll_msg)
2520   {
2521     GMC_cancel (c->bck_fc.poll_msg);
2522     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg BCK canceled\n");
2523   }
2524
2525   /* Delete from tunnel */
2526   if (NULL != c->t)
2527     GMT_remove_connection (c->t, c);
2528
2529   if (GNUNET_NO == GMC_is_origin (c, GNUNET_YES) && NULL != c->path)
2530     path_destroy (c->path);
2531   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
2532     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
2533   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
2534     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
2535   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_fc.poll_task)
2536   {
2537     GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
2538     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL FWD canceled\n");
2539   }
2540   if (GNUNET_SCHEDULER_NO_TASK != c->bck_fc.poll_task)
2541   {
2542     GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
2543     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL BCK canceled\n");
2544   }
2545
2546   GNUNET_break (GNUNET_YES ==
2547                 GNUNET_CONTAINER_multihashmap_remove (connections, &c->id, c));
2548
2549   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
2550   GNUNET_free (c);
2551 }
2552
2553 /**
2554  * Get the connection ID.
2555  *
2556  * @param c Connection to get the ID from.
2557  *
2558  * @return ID of the connection.
2559  */
2560 const struct GNUNET_HashCode *
2561 GMC_get_id (const struct MeshConnection *c)
2562 {
2563   return &c->id;
2564 }
2565
2566
2567 /**
2568  * Get the connection path.
2569  *
2570  * @param c Connection to get the path from.
2571  *
2572  * @return path used by the connection.
2573  */
2574 const struct MeshPeerPath *
2575 GMC_get_path (const struct MeshConnection *c)
2576 {
2577   if (GNUNET_NO == c->destroy)
2578     return c->path;
2579   return NULL;
2580 }
2581
2582
2583 /**
2584  * Get the connection state.
2585  *
2586  * @param c Connection to get the state from.
2587  *
2588  * @return state of the connection.
2589  */
2590 enum MeshConnectionState
2591 GMC_get_state (const struct MeshConnection *c)
2592 {
2593   return c->state;
2594 }
2595
2596 /**
2597  * Get the connection tunnel.
2598  *
2599  * @param c Connection to get the tunnel from.
2600  *
2601  * @return tunnel of the connection.
2602  */
2603 struct MeshTunnel3 *
2604 GMC_get_tunnel (const struct MeshConnection *c)
2605 {
2606   return c->t;
2607 }
2608
2609
2610 /**
2611  * Get free buffer space in a connection.
2612  *
2613  * @param c Connection.
2614  * @param fwd Is query about FWD traffic?
2615  *
2616  * @return Free buffer space [0 - max_msgs_queue/max_connections]
2617  */
2618 unsigned int
2619 GMC_get_buffer (struct MeshConnection *c, int fwd)
2620 {
2621   struct MeshFlowControl *fc;
2622
2623   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2624
2625   return (fc->queue_max - fc->queue_n);
2626 }
2627
2628 /**
2629  * Get how many messages have we allowed to send to us from a direction.
2630  *
2631  * @param c Connection.
2632  * @param fwd Are we asking about traffic from FWD (BCK messages)?
2633  *
2634  * @return last_ack_sent - last_pid_recv
2635  */
2636 unsigned int
2637 GMC_get_allowed (struct MeshConnection *c, int fwd)
2638 {
2639   struct MeshFlowControl *fc;
2640
2641   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2642   if (GM_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
2643   {
2644     return 0;
2645   }
2646   return (fc->last_ack_sent - fc->last_pid_recv);
2647 }
2648
2649 /**
2650  * Get messages queued in a connection.
2651  *
2652  * @param c Connection.
2653  * @param fwd Is query about FWD traffic?
2654  *
2655  * @return Number of messages queued.
2656  */
2657 unsigned int
2658 GMC_get_qn (struct MeshConnection *c, int fwd)
2659 {
2660   struct MeshFlowControl *fc;
2661
2662   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2663
2664   return fc->queue_n;
2665 }
2666
2667
2668 /**
2669  * Allow the connection to advertise a buffer of the given size.
2670  *
2671  * The connection will send an @c fwd ACK message (so: in direction !fwd)
2672  * allowing up to last_pid_recv + buffer.
2673  *
2674  * @param c Connection.
2675  * @param buffer How many more messages the connection can accept.
2676  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
2677  */
2678 void
2679 GMC_allow (struct MeshConnection *c, unsigned int buffer, int fwd)
2680 {
2681   LOG (GNUNET_ERROR_TYPE_DEBUG, "  allowing %s %u messages %s\n",
2682        GMC_2s (c), buffer, GM_f2s (fwd));
2683   send_ack (c, buffer, fwd, GNUNET_NO);
2684 }
2685
2686
2687 /**
2688  * Notify other peers on a connection of a broken link. Mark connections
2689  * to destroy after all traffic has been sent.
2690  *
2691  * @param c Connection on which there has been a disconnection.
2692  * @param peer Peer that disconnected.
2693  */
2694 void
2695 GMC_notify_broken (struct MeshConnection *c,
2696                    struct MeshPeer *peer)
2697 {
2698   int fwd;
2699
2700   LOG (GNUNET_ERROR_TYPE_DEBUG,
2701        " notify broken on %s due to %s disconnect\n",
2702        GMC_2s (c), GMP_2s (peer));
2703
2704   fwd = peer == get_prev_hop (c);
2705
2706   if (GNUNET_YES == GMC_is_terminal (c, fwd))
2707   {
2708     /* Local shutdown, no one to notify about this. */
2709     GMC_destroy (c);
2710     return;
2711   }
2712   if (GNUNET_NO == c->destroy)
2713     send_broken (c, &my_full_id, GMP_get_id (peer), fwd);
2714
2715   /* Connection will have at least one pending message
2716    * (the one we just scheduled), so no point in checking whether to
2717    * destroy immediately. */
2718   c->destroy = GNUNET_YES;
2719   c->state = MESH_CONNECTION_DESTROYED;
2720
2721   /**
2722    * Cancel all queues, if no message is left, connection will be destroyed.
2723    */
2724   connection_cancel_queues (c, !fwd);
2725
2726   return;
2727 }
2728
2729
2730 /**
2731  * Is this peer the first one on the connection?
2732  *
2733  * @param c Connection.
2734  * @param fwd Is this about fwd traffic?
2735  *
2736  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
2737  */
2738 int
2739 GMC_is_origin (struct MeshConnection *c, int fwd)
2740 {
2741   if (!fwd && c->path->length - 1 == c->own_pos )
2742     return GNUNET_YES;
2743   if (fwd && 0 == c->own_pos)
2744     return GNUNET_YES;
2745   return GNUNET_NO;
2746 }
2747
2748
2749 /**
2750  * Is this peer the last one on the connection?
2751  *
2752  * @param c Connection.
2753  * @param fwd Is this about fwd traffic?
2754  *            Note that the ROOT is the terminal for BCK traffic!
2755  *
2756  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
2757  */
2758 int
2759 GMC_is_terminal (struct MeshConnection *c, int fwd)
2760 {
2761   return GMC_is_origin (c, !fwd);
2762 }
2763
2764
2765 /**
2766  * See if we are allowed to send by the next hop in the given direction.
2767  *
2768  * @param c Connection.
2769  * @param fwd Is this about fwd traffic?
2770  *
2771  * @return #GNUNET_YES in case it's OK to send.
2772  */
2773 int
2774 GMC_is_sendable (struct MeshConnection *c, int fwd)
2775 {
2776   struct MeshFlowControl *fc;
2777
2778   if (NULL == c)
2779   {
2780     GNUNET_break (0);
2781     return GNUNET_YES;
2782   }
2783   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2784   if (GM_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2785     return GNUNET_YES;
2786   return GNUNET_NO;
2787 }
2788
2789 /**
2790  * Sends an already built message on a connection, properly registering
2791  * all used resources.
2792  *
2793  * @param message Message to send. Function makes a copy of it.
2794  *                If message is not hop-by-hop, decrements TTL of copy.
2795  * @param c Connection on which this message is transmitted.
2796  * @param fwd Is this a fwd message?
2797  * @param force Force the connection to accept the message (buffer overfill).
2798  * @param cont Continuation called once message is sent. Can be NULL.
2799  * @param cont_cls Closure for @c cont.
2800  *
2801  * @return Handle to cancel the message before it's sent.
2802  *         NULL on error or if @c cont is NULL.
2803  *         Invalid on @c cont call.
2804  */
2805 struct MeshConnectionQueue *
2806 GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2807                            struct MeshConnection *c, int fwd, int force,
2808                            GMC_sent cont, void *cont_cls)
2809 {
2810   struct MeshFlowControl *fc;
2811   struct MeshConnectionQueue *q;
2812   void *data;
2813   size_t size;
2814   uint16_t type;
2815   int droppable;
2816
2817   size = ntohs (message->size);
2818   data = GNUNET_malloc (size);
2819   memcpy (data, message, size);
2820   type = ntohs (message->type);
2821   LOG (GNUNET_ERROR_TYPE_INFO, "-> %s on connection %s (%u bytes)\n",
2822        GM_m2s (type), GMC_2s (c), size);
2823
2824   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2825   droppable = GNUNET_NO == force;
2826   switch (type)
2827   {
2828     struct GNUNET_MESH_Encrypted *emsg;
2829     struct GNUNET_MESH_KX        *kmsg;
2830     struct GNUNET_MESH_ACK       *amsg;
2831     struct GNUNET_MESH_Poll      *pmsg;
2832     struct GNUNET_MESH_ConnectionDestroy *dmsg;
2833     struct GNUNET_MESH_ConnectionBroken  *bmsg;
2834     uint32_t ttl;
2835
2836     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
2837       emsg = (struct GNUNET_MESH_Encrypted *) data;
2838       ttl = ntohl (emsg->ttl);
2839       if (0 == ttl)
2840       {
2841         GNUNET_break_op (0);
2842         GNUNET_free (data);
2843         return NULL;
2844       }
2845       emsg->cid = c->id;
2846       emsg->ttl = htonl (ttl - 1);
2847       emsg->pid = htonl (fc->next_pid++);
2848       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
2849       if (GNUNET_YES == droppable)
2850       {
2851         fc->queue_n++;
2852         LOG (GNUNET_ERROR_TYPE_DEBUG, "pid %u\n", ntohl (emsg->pid));
2853         LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
2854         LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
2855       }
2856       else
2857       {
2858         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not droppable, Q_N stays the same\n");
2859       }
2860       if (GM_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
2861       {
2862         GMC_start_poll (c, fwd);
2863       }
2864       break;
2865
2866     case GNUNET_MESSAGE_TYPE_MESH_KX:
2867       kmsg = (struct GNUNET_MESH_KX *) data;
2868       kmsg->cid = c->id;
2869       break;
2870
2871     case GNUNET_MESSAGE_TYPE_MESH_ACK:
2872       amsg = (struct GNUNET_MESH_ACK *) data;
2873       amsg->cid = c->id;
2874       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
2875       droppable = GNUNET_NO;
2876       break;
2877
2878     case GNUNET_MESSAGE_TYPE_MESH_POLL:
2879       pmsg = (struct GNUNET_MESH_Poll *) data;
2880       pmsg->cid = c->id;
2881       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
2882       droppable = GNUNET_NO;
2883       break;
2884
2885     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY:
2886       dmsg = (struct GNUNET_MESH_ConnectionDestroy *) data;
2887       dmsg->cid = c->id;
2888       dmsg->reserved = 0;
2889       break;
2890
2891     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
2892       bmsg = (struct GNUNET_MESH_ConnectionBroken *) data;
2893       bmsg->cid = c->id;
2894       bmsg->reserved = 0;
2895       break;
2896
2897     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
2898     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
2899     case GNUNET_MESSAGE_TYPE_MESH_KEEPALIVE:
2900       break;
2901
2902     default:
2903       GNUNET_break (0);
2904       GNUNET_free (data);
2905       return NULL;
2906   }
2907
2908   if (fc->queue_n > fc->queue_max && droppable)
2909   {
2910     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
2911                               1, GNUNET_NO);
2912     GNUNET_break (0);
2913     LOG (GNUNET_ERROR_TYPE_DEBUG,
2914                 "queue full: %u/%u\n",
2915                 fc->queue_n, fc->queue_max);
2916     if (GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED == type)
2917     {
2918       fc->queue_n--;
2919       fc->next_pid--;
2920     }
2921     GNUNET_free (data);
2922     return NULL; /* Drop this message */
2923   }
2924
2925   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u\n", c, c->pending_messages);
2926   c->pending_messages++;
2927
2928   q = GNUNET_new (struct MeshConnectionQueue);
2929   q->forced = !droppable;
2930   q->q = GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd,
2931                         &message_sent, q);
2932   if (NULL == q->q)
2933   {
2934     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING dropping msg on %s\n", GMC_2s (c));
2935     GNUNET_free (data);
2936     GNUNET_free (q);
2937     return NULL;
2938   }
2939   q->cont = cont;
2940   q->cont_cls = cont_cls;
2941   return q;
2942 }
2943
2944
2945 /**
2946  * Cancel a previously sent message while it's in the queue.
2947  *
2948  * ONLY can be called before the continuation given to the send function
2949  * is called. Once the continuation is called, the message is no longer in the
2950  * queue.
2951  *
2952  * @param q Handle to the queue.
2953  */
2954 void
2955 GMC_cancel (struct MeshConnectionQueue *q)
2956 {
2957   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GMC cancel message\n");
2958
2959   /* queue destroy calls message_sent, which calls q->cont and frees q */
2960   GMP_queue_destroy (q->q, GNUNET_YES);
2961 }
2962
2963
2964 /**
2965  * Sends a CREATE CONNECTION message for a path to a peer.
2966  * Changes the connection and tunnel states if necessary.
2967  *
2968  * @param connection Connection to create.
2969  */
2970 void
2971 GMC_send_create (struct MeshConnection *connection)
2972 {
2973   enum MeshTunnel3CState state;
2974   size_t size;
2975
2976   size = sizeof (struct GNUNET_MESH_ConnectionCreate);
2977   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
2978
2979   LOG (GNUNET_ERROR_TYPE_INFO, "=> %s on connection %s  (%u bytes)\n",
2980        GM_m2s (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE),
2981        GMC_2s (connection), size);
2982   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
2983        connection, connection->pending_messages);
2984   connection->pending_messages++;
2985
2986   connection->maintenance_q =
2987     GMP_queue_add (get_next_hop (connection), NULL,
2988                    GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE,
2989                    size, connection, GNUNET_YES, &message_sent, NULL);
2990
2991   state = GMT_get_cstate (connection->t);
2992   if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state)
2993     GMT_change_cstate (connection->t, MESH_TUNNEL3_WAITING);
2994   if (MESH_CONNECTION_NEW == connection->state)
2995     connection_change_state (connection, MESH_CONNECTION_SENT);
2996 }
2997
2998
2999 /**
3000  * Send a message to all peers in this connection that the connection
3001  * is no longer valid.
3002  *
3003  * If some peer should not receive the message, it should be zero'ed out
3004  * before calling this function.
3005  *
3006  * @param c The connection whose peers to notify.
3007  */
3008 void
3009 GMC_send_destroy (struct MeshConnection *c)
3010 {
3011   struct GNUNET_MESH_ConnectionDestroy msg;
3012
3013   if (GNUNET_YES == c->destroy)
3014     return;
3015
3016   msg.header.size = htons (sizeof (msg));
3017   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY);;
3018   msg.cid = c->id;
3019   LOG (GNUNET_ERROR_TYPE_DEBUG,
3020               "  sending connection destroy for connection %s\n",
3021               GMC_2s (c));
3022
3023   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_YES))
3024     GMC_send_prebuilt_message (&msg.header, c,
3025                                GNUNET_YES, GNUNET_YES, NULL, NULL);
3026   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO))
3027     GMC_send_prebuilt_message (&msg.header, c,
3028                                GNUNET_NO, GNUNET_YES, NULL, NULL);
3029   c->destroy = GNUNET_YES;
3030   c->state = MESH_CONNECTION_DESTROYED;
3031 }
3032
3033
3034 /**
3035  * @brief Start a polling timer for the connection.
3036  *
3037  * When a neighbor does not accept more traffic on the connection it could be
3038  * caused by a simple congestion or by a lost ACK. Polling enables to check
3039  * for the lastest ACK status for a connection.
3040  *
3041  * @param c Connection.
3042  * @param fwd Should we poll in the FWD direction?
3043  */
3044 void
3045 GMC_start_poll (struct MeshConnection *c, int fwd)
3046 {
3047   struct MeshFlowControl *fc;
3048
3049   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3050   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL %s requested\n",
3051        GM_f2s (fwd));
3052   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task || NULL != fc->poll_msg)
3053   {
3054     LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   not needed (%u, %p)\n",
3055          fc->poll_task, fc->poll_msg);
3056     return;
3057   }
3058   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL started on request\n");
3059   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
3060                                                 &connection_poll,
3061                                                 fc);
3062 }
3063
3064
3065 /**
3066  * @brief Stop polling a connection for ACKs.
3067  *
3068  * Once we have enough ACKs for future traffic, polls are no longer necessary.
3069  *
3070  * @param c Connection.
3071  * @param fwd Should we stop the poll in the FWD direction?
3072  */
3073 void
3074 GMC_stop_poll (struct MeshConnection *c, int fwd)
3075 {
3076   struct MeshFlowControl *fc;
3077
3078   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3079   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
3080   {
3081     GNUNET_SCHEDULER_cancel (fc->poll_task);
3082     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
3083   }
3084 }
3085
3086 /**
3087  * Get a (static) string for a connection.
3088  *
3089  * @param c Connection.
3090  */
3091 const char *
3092 GMC_2s (const struct MeshConnection *c)
3093 {
3094   if (NULL == c)
3095     return "NULL";
3096
3097   if (NULL != c->t)
3098   {
3099     static char buf[128];
3100
3101     sprintf (buf, "%s (->%s)", GNUNET_h2s (&c->id), GMT_2s (c->t));
3102     return buf;
3103   }
3104   return GNUNET_h2s (&c->id);
3105 }