- use MeshHash
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh_connection.c
1 /*
2      This file is part of GNUnet.
3      (C) 2001-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file mesh/gnunet-service-mesh_connection.c
23  * @brief GNUnet MESH service connection handling
24  * @author Bartlomiej Polot
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30 #include "gnunet_statistics_service.h"
31
32 #include "mesh_path.h"
33 #include "mesh_protocol.h"
34 #include "mesh.h"
35 #include "gnunet-service-mesh_connection.h"
36 #include "gnunet-service-mesh_peer.h"
37 #include "gnunet-service-mesh_tunnel.h"
38
39
40 #define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__)
41
42 #define MESH_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
43                                   GNUNET_TIME_UNIT_MINUTES,\
44                                   10)
45 #define AVG_MSGS                32
46
47
48 /******************************************************************************/
49 /********************************   STRUCTS  **********************************/
50 /******************************************************************************/
51
52 /**
53  * Struct to encapsulate all the Flow Control information to a peer to which
54  * we are directly connected (on a core level).
55  */
56 struct MeshFlowControl
57 {
58   /**
59    * Connection this controls.
60    */
61   struct MeshConnection *c;
62
63   /**
64    * How many messages are in the queue on this connection.
65    */
66   unsigned int queue_n;
67
68   /**
69    * How many messages do we accept in the queue.
70    */
71   unsigned int queue_max;
72
73   /**
74    * Next ID to use.
75    */
76   uint32_t next_pid;
77
78   /**
79    * ID of the last packet sent towards the peer.
80    */
81   uint32_t last_pid_sent;
82
83   /**
84    * ID of the last packet received from the peer.
85    */
86   uint32_t last_pid_recv;
87
88   /**
89    * Last ACK sent to the peer (peer can't send more than this PID).
90    */
91   uint32_t last_ack_sent;
92
93   /**
94    * Last ACK sent towards the origin (for traffic towards leaf node).
95    */
96   uint32_t last_ack_recv;
97
98   /**
99    * Task to poll the peer in case of a lost ACK causes stall.
100    */
101   GNUNET_SCHEDULER_TaskIdentifier poll_task;
102
103   /**
104    * How frequently to poll for ACKs.
105    */
106   struct GNUNET_TIME_Relative poll_time;
107
108   /**
109    * Queued poll message, to cancel if not necessary anymore (got ACK).
110    */
111   struct MeshConnectionQueue *poll_msg;
112
113   /**
114    * Queued poll message, to cancel if not necessary anymore (got ACK).
115    */
116   struct MeshConnectionQueue *ack_msg;
117 };
118
119 /**
120  * Keep a record of the last messages sent on this connection.
121  */
122 struct MeshConnectionPerformance
123 {
124   /**
125    * Circular buffer for storing measurements.
126    */
127   double usecsperbyte[AVG_MSGS];
128
129   /**
130    * Running average of @c usecsperbyte.
131    */
132   double avg;
133
134   /**
135    * How many values of @c usecsperbyte are valid.
136    */
137   uint16_t size;
138
139   /**
140    * Index of the next "free" position in @c usecsperbyte.
141    */
142   uint16_t idx;
143 };
144
145
146 /**
147  * Struct containing all information regarding a connection to a peer.
148  */
149 struct MeshConnection
150 {
151   /**
152    * Tunnel this connection is part of.
153    */
154   struct MeshTunnel3 *t;
155
156   /**
157    * Flow control information for traffic fwd.
158    */
159   struct MeshFlowControl fwd_fc;
160
161   /**
162    * Flow control information for traffic bck.
163    */
164   struct MeshFlowControl bck_fc;
165
166   /**
167    * Measure connection performance on the endpoint.
168    */
169   struct MeshConnectionPerformance *perf;
170
171   /**
172    * ID of the connection.
173    */
174   struct GNUNET_MeshHash id;
175
176   /**
177    * State of the connection.
178    */
179   enum MeshConnectionState state;
180
181   /**
182    * Path being used for the tunnel. At the origin of the connection
183    * it's a pointer to the destination's path pool, otherwise just a copy.
184    */
185   struct MeshPeerPath *path;
186
187   /**
188    * Position of the local peer in the path.
189    */
190   unsigned int own_pos;
191
192   /**
193    * Task to keep the used paths alive at the owner,
194    * time tunnel out on all the other peers.
195    */
196   GNUNET_SCHEDULER_TaskIdentifier fwd_maintenance_task;
197
198   /**
199    * Task to keep the used paths alive at the destination,
200    * time tunnel out on all the other peers.
201    */
202   GNUNET_SCHEDULER_TaskIdentifier bck_maintenance_task;
203
204   /**
205    * Queue handle for maintainance traffic. One handle for FWD and BCK since
206    * one peer never needs to maintain both directions (no loopback connections).
207    */
208   struct MeshPeerQueue *maintenance_q;
209
210   /**
211    * Counter to do exponential backoff when creating a connection (max 64).
212    */
213   unsigned short create_retry;
214
215   /**
216    * Pending message count.
217    */
218   int pending_messages;
219
220   /**
221    * Destroy flag: if true, destroy on last message.
222    */
223   int destroy;
224 };
225
226 /**
227  * Handle for messages queued but not yet sent.
228  */
229 struct MeshConnectionQueue
230 {
231   /**
232    * Peer queue handle, to cancel if necessary.
233    */
234   struct MeshPeerQueue *q;
235
236   /**
237    * Was this a forced message? (Do not account for it)
238    */
239   int forced;
240
241   /**
242    * Continuation to call once sent.
243    */
244   GMC_sent cont;
245
246   /**
247    * Closure for @c cont.
248    */
249   void *cont_cls;
250 };
251
252 /******************************************************************************/
253 /*******************************   GLOBALS  ***********************************/
254 /******************************************************************************/
255
256 /**
257  * Global handle to the statistics service.
258  */
259 extern struct GNUNET_STATISTICS_Handle *stats;
260
261 /**
262  * Local peer own ID (memory efficient handle).
263  */
264 extern GNUNET_PEER_Id myid;
265
266 /**
267  * Local peer own ID (full value).
268  */
269 extern struct GNUNET_PeerIdentity my_full_id;
270
271 /**
272  * Connections known, indexed by cid (MeshConnection).
273  */
274 static struct GNUNET_CONTAINER_MultiHashMap *connections;
275
276 /**
277  * How many connections are we willing to maintain.
278  * Local connections are always allowed, even if there are more connections than max.
279  */
280 static unsigned long long max_connections;
281
282 /**
283  * How many messages *in total* are we willing to queue, divide by number of
284  * connections to get connection queue size.
285  */
286 static unsigned long long max_msgs_queue;
287
288 /**
289  * How often to send path keepalives. Paths timeout after 4 missed.
290  */
291 static struct GNUNET_TIME_Relative refresh_connection_time;
292
293 /**
294  * How often to send path create / ACKs.
295  */
296 static struct GNUNET_TIME_Relative create_connection_time;
297
298
299 /******************************************************************************/
300 /********************************   STATIC  ***********************************/
301 /******************************************************************************/
302
303 #if 0 // avoid compiler warning for unused static function
304 static void
305 fc_debug (struct MeshFlowControl *fc)
306 {
307   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
308               fc->last_pid_recv, fc->last_ack_sent);
309   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
310               fc->last_pid_sent, fc->last_ack_recv);
311   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
312               fc->queue_n, fc->queue_max);
313 }
314
315 static void
316 connection_debug (struct MeshConnection *c)
317 {
318   if (NULL == c)
319   {
320     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CONNECTION ***\n");
321     return;
322   }
323   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
324               peer2s (c->t->peer), GMC_2s (c));
325   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
326               c->state, c->pending_messages);
327   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
328   fc_debug (&c->fwd_fc);
329   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
330   fc_debug (&c->bck_fc);
331 }
332 #endif
333
334
335 /**
336  * Schedule next keepalive task, taking in consideration
337  * the connection state and number of retries.
338  *
339  * @param c Connection for which to schedule the next keepalive.
340  * @param fwd Direction for the next keepalive.
341  */
342 static void
343 schedule_next_keepalive (struct MeshConnection *c, int fwd);
344
345
346 /**
347  * Resets the connection timeout task, some other message has done the
348  * task's job.
349  * - For the first peer on the direction this means to send
350  *   a keepalive or a path confirmation message (either create or ACK).
351  * - For all other peers, this means to destroy the connection,
352  *   due to lack of activity.
353  * Starts the timeout if no timeout was running (connection just created).
354  *
355  * @param c Connection whose timeout to reset.
356  * @param fwd Is this forward?
357  */
358 static void
359 connection_reset_timeout (struct MeshConnection *c, int fwd);
360
361
362 /**
363  * Get string description for tunnel state.
364  *
365  * @param s Tunnel state.
366  *
367  * @return String representation.
368  */
369 static const char *
370 GMC_state2s (enum MeshConnectionState s)
371 {
372   switch (s)
373   {
374     case MESH_CONNECTION_NEW:
375       return "MESH_CONNECTION_NEW";
376     case MESH_CONNECTION_SENT:
377       return "MESH_CONNECTION_SENT";
378     case MESH_CONNECTION_ACK:
379       return "MESH_CONNECTION_ACK";
380     case MESH_CONNECTION_READY:
381       return "MESH_CONNECTION_READY";
382     case MESH_CONNECTION_DESTROYED:
383       return "MESH_CONNECTION_DESTROYED";
384     default:
385       return "MESH_CONNECTION_STATE_ERROR";
386   }
387 }
388
389
390 /**
391  * Initialize a Flow Control structure to the initial state.
392  *
393  * @param fc Flow Control structure to initialize.
394  */
395 static void
396 fc_init (struct MeshFlowControl *fc)
397 {
398   fc->next_pid = 0;
399   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
400   fc->last_pid_recv = (uint32_t) -1;
401   fc->last_ack_sent = (uint32_t) 0;
402   fc->last_ack_recv = (uint32_t) 0;
403   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
404   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
405   fc->queue_n = 0;
406   fc->queue_max = (max_msgs_queue / max_connections) + 1;
407 }
408
409
410 /**
411  * Find a connection.
412  *
413  * @param cid Connection ID.
414  */
415 static struct MeshConnection *
416 connection_get (const struct GNUNET_MeshHash *cid)
417 {
418   struct GNUNET_HashCode hash;
419   struct GNUNET_MeshHash *aux;
420
421   memcpy (&hash, cid, sizeof (cid));
422   aux = (struct GNUNET_MeshHash *) &hash;
423   memset (&aux[1], 0, sizeof (hash) - sizeof (*cid));
424
425   return GNUNET_CONTAINER_multihashmap_get (connections, &hash);
426 }
427
428
429 static void
430 connection_change_state (struct MeshConnection* c,
431                          enum MeshConnectionState state)
432 {
433   LOG (GNUNET_ERROR_TYPE_DEBUG,
434               "Connection %s state was %s\n",
435               GMC_2s (c), GMC_state2s (c->state));
436   if (MESH_CONNECTION_DESTROYED == c->state)
437   {
438     LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
439     return;
440   }
441   LOG (GNUNET_ERROR_TYPE_DEBUG,
442               "Connection %s state is now %s\n",
443               GMC_2s (c), GMC_state2s (state));
444   c->state = state;
445   if (MESH_CONNECTION_READY == state)
446     c->create_retry = 1;
447 }
448
449
450 /**
451  * Callback called when a queued ACK message is sent.
452  *
453  * @param cls Closure (FC).
454  * @param c Connection this message was on.
455  * @param q Queue handler this call invalidates.
456  * @param type Type of message sent.
457  * @param fwd Was this a FWD going message?
458  * @param size Size of the message.
459  */
460 static void
461 ack_sent (void *cls,
462           struct MeshConnection *c,
463           struct MeshConnectionQueue *q,
464           uint16_t type, int fwd, size_t size)
465 {
466   struct MeshFlowControl *fc = cls;
467
468   fc->ack_msg = NULL;
469 }
470
471
472 /**
473  * Send an ACK on the connection, informing the predecessor about
474  * the available buffer space. Should not be called in case the peer
475  * is origin (no predecessor) in the @c fwd direction.
476  *
477  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
478  * the ACK itself goes "back" (dest->root).
479  *
480  * @param c Connection on which to send the ACK.
481  * @param buffer How much space free to advertise?
482  * @param fwd Is this FWD ACK? (Going dest -> root)
483  * @param force Don't optimize out.
484  */
485 static void
486 send_ack (struct MeshConnection *c, unsigned int buffer, int fwd, int force)
487 {
488   struct MeshFlowControl *next_fc;
489   struct MeshFlowControl *prev_fc;
490   struct GNUNET_MESH_ACK msg;
491   uint32_t ack;
492   int delta;
493
494   /* If origin, there is no connection to send ACKs. Wrong function! */
495   if (GMC_is_origin (c, fwd))
496   {
497     LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s is origin in %s\n",
498          GMC_2s (c), GM_f2s (fwd));
499     GNUNET_break (0);
500     return;
501   }
502
503   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
504   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
505
506   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection send %s ack on %s\n",
507        GM_f2s (fwd), GMC_2s (c));
508
509   /* Check if we need to transmit the ACK. */
510   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
511   if (3 < delta && buffer < delta && GNUNET_NO == force)
512   {
513     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
514     LOG (GNUNET_ERROR_TYPE_DEBUG,
515          "  last pid recv: %u, last ack sent: %u\n",
516          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
517     return;
518   }
519
520   /* Ok, ACK might be necessary, what PID to ACK? */
521   ack = prev_fc->last_pid_recv + buffer;
522   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
523   LOG (GNUNET_ERROR_TYPE_DEBUG,
524        " last pid %u, last ack %u, qmax %u, q %u\n",
525        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
526        next_fc->queue_max, next_fc->queue_n);
527   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
528   {
529     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
530     return;
531   }
532
533   /* Check if message is already in queue */
534   if (NULL != prev_fc->ack_msg)
535   {
536     if (GM_is_pid_bigger (ack, prev_fc->last_ack_sent))
537     {
538       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
539       GMC_cancel (prev_fc->ack_msg);
540       /* GMC_cancel triggers ack_sent(), which clears fc->ack_msg */
541     }
542     else
543     {
544       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
545       return;
546     }
547   }
548
549   prev_fc->last_ack_sent = ack;
550
551   /* Build ACK message and send on connection */
552   msg.header.size = htons (sizeof (msg));
553   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
554   msg.ack = htonl (ack);
555   msg.cid = c->id;
556
557   prev_fc->ack_msg = GMC_send_prebuilt_message (&msg.header, c,
558                                                 !fwd, GNUNET_YES,
559                                                 &ack_sent, prev_fc);
560 }
561
562
563 /**
564  * Callback called when a queued message is sent.
565  *
566  * Calculates the average time and connection packet tracking.
567  *
568  * @param cls Closure (ConnectionQueue Handle).
569  * @param c Connection this message was on.
570  * @param type Type of message sent.
571  * @param fwd Was this a FWD going message?
572  * @param size Size of the message.
573  * @param wait Time spent waiting for core (only the time for THIS message)
574  */
575 static void
576 message_sent (void *cls,
577               struct MeshConnection *c, uint16_t type,
578               int fwd, size_t size,
579               struct GNUNET_TIME_Relative wait)
580 {
581   struct MeshConnectionPerformance *p;
582   struct MeshFlowControl *fc;
583   struct MeshConnectionQueue *q = cls;
584   double usecsperbyte;
585   int forced;
586
587   fc = fwd ? &c->fwd_fc : &c->bck_fc;
588   LOG (GNUNET_ERROR_TYPE_DEBUG,
589        "!  sent %s %s\n",
590        GM_f2s (fwd),
591        GM_m2s (type));
592   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  C_P- %p %u\n", c, c->pending_messages);
593   if (NULL != q)
594   {
595     forced = q->forced;
596     if (NULL != q->cont)
597     {
598       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  calling cont\n");
599       q->cont (q->cont_cls, c, q, type, fwd, size);
600     }
601     GNUNET_free (q);
602   }
603   else if (type == GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED)
604   {
605     /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
606     forced = GNUNET_YES;
607   }
608   else
609   {
610     forced = GNUNET_NO;
611   }
612   c->pending_messages--;
613   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
614   {
615     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
616     GMC_destroy (c);
617     return;
618   }
619   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
620   switch (type)
621   {
622     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
623     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
624       c->maintenance_q = NULL;
625       /* Don't trigger a keepalive for sent ACKs, only SYN and SYNACKs */
626       if (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE == type || !fwd)
627         schedule_next_keepalive (c, fwd);
628       break;
629
630     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
631       fc->last_pid_sent++;
632       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
633       if (GNUNET_NO == forced)
634       {
635         fc->queue_n--;
636         LOG (GNUNET_ERROR_TYPE_DEBUG,
637             "!   accounting pid %u\n",
638             fc->last_pid_sent);
639       }
640       else
641       {
642         LOG (GNUNET_ERROR_TYPE_DEBUG,
643              "!   forced, Q_N not accounting pid %u\n",
644              fc->last_pid_sent);
645       }
646       GMC_send_ack (c, fwd, GNUNET_NO);
647       connection_reset_timeout (c, fwd);
648       break;
649
650     case GNUNET_MESSAGE_TYPE_MESH_POLL:
651       fc->poll_msg = NULL;
652       break;
653
654     case GNUNET_MESSAGE_TYPE_MESH_ACK:
655       fc->ack_msg = NULL;
656       break;
657
658     default:
659       break;
660   }
661   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
662
663   if (NULL == c->perf)
664     return; /* Only endpoints are interested in timing. */
665
666   p = c->perf;
667   usecsperbyte = ((double) wait.rel_value_us) / size;
668   if (p->size == AVG_MSGS)
669   {
670     /* Array is full. Substract oldest value, add new one and store. */
671     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
672     p->usecsperbyte[p->idx] = usecsperbyte;
673     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
674   }
675   else
676   {
677     /* Array not yet full. Add current value to avg and store. */
678     p->usecsperbyte[p->idx] = usecsperbyte;
679     p->avg *= p->size;
680     p->avg += p->usecsperbyte[p->idx];
681     p->size++;
682     p->avg /= p->size;
683   }
684   p->idx = (p->idx + 1) % AVG_MSGS;
685 }
686
687
688 /**
689  * Get the previous hop in a connection
690  *
691  * @param c Connection.
692  *
693  * @return Previous peer in the connection.
694  */
695 static struct MeshPeer *
696 get_prev_hop (const struct MeshConnection *c)
697 {
698   GNUNET_PEER_Id id;
699
700   LOG (GNUNET_ERROR_TYPE_DEBUG, " get prev hop %s [%u/%u]\n",
701        GMC_2s (c), c->own_pos, c->path->length);
702   if (0 == c->own_pos || c->path->length < 2)
703     id = c->path->peers[0];
704   else
705     id = c->path->peers[c->own_pos - 1];
706
707   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
708        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
709
710   return GMP_get_short (id);
711 }
712
713
714 /**
715  * Get the next hop in a connection
716  *
717  * @param c Connection.
718  *
719  * @return Next peer in the connection.
720  */
721 static struct MeshPeer *
722 get_next_hop (const struct MeshConnection *c)
723 {
724   GNUNET_PEER_Id id;
725
726   LOG (GNUNET_ERROR_TYPE_DEBUG, " get next hop %s [%u/%u]\n",
727        GMC_2s (c), c->own_pos, c->path->length);
728   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
729     id = c->path->peers[c->path->length - 1];
730   else
731     id = c->path->peers[c->own_pos + 1];
732
733   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s (%u)\n",
734        GNUNET_i2s (GNUNET_PEER_resolve2 (id)), id);
735
736   return GMP_get_short (id);
737 }
738
739
740 /**
741  * Get the hop in a connection.
742  *
743  * @param c Connection.
744  * @param fwd Next hop?
745  *
746  * @return Next peer in the connection.
747  */
748 static struct MeshPeer *
749 get_hop (struct MeshConnection *c, int fwd)
750 {
751   if (fwd)
752     return get_next_hop (c);
753   return get_prev_hop (c);
754 }
755
756
757 /**
758  * Is traffic coming from this sender 'FWD' traffic?
759  *
760  * @param c Connection to check.
761  * @param sender Peer identity of neighbor.
762  *
763  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
764  *         the traffic is 'FWD'.
765  *         #GNUNET_NO for BCK.
766  *         #GNUNET_SYSERR for errors.
767  */
768 static int
769 is_fwd (const struct MeshConnection *c,
770         const struct GNUNET_PeerIdentity *sender)
771 {
772   GNUNET_PEER_Id id;
773
774   id = GNUNET_PEER_search (sender);
775   if (GMP_get_short_id (get_prev_hop (c)) == id)
776     return GNUNET_YES;
777
778   if (GMP_get_short_id (get_next_hop (c)) == id)
779     return GNUNET_NO;
780
781   GNUNET_break (0);
782   return GNUNET_SYSERR;
783 }
784
785
786 /**
787  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
788  * or a first CONNECTION_ACK directed to us.
789  *
790  * @param connection Connection to confirm.
791  * @param fwd Should we send it FWD? (root->dest)
792  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
793  */
794 static void
795 send_connection_ack (struct MeshConnection *connection, int fwd)
796 {
797   struct MeshTunnel3 *t;
798
799   t = connection->t;
800   LOG (GNUNET_ERROR_TYPE_INFO, "=> {%18s ACK} on connection %s\n",
801        GM_f2s (!fwd), GMC_2s (connection));
802   GMP_queue_add (get_hop (connection, fwd), NULL,
803                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
804                  sizeof (struct GNUNET_MESH_ConnectionACK),
805                  connection, fwd, &message_sent, NULL);
806   connection->pending_messages++;
807   if (MESH_TUNNEL3_NEW == GMT_get_cstate (t))
808     GMT_change_cstate (t, MESH_TUNNEL3_WAITING);
809   if (MESH_CONNECTION_READY != connection->state)
810     connection_change_state (connection, MESH_CONNECTION_SENT);
811 }
812
813
814 /**
815  * Send a notification that a connection is broken.
816  *
817  * @param c Connection that is broken.
818  * @param id1 Peer that has disconnected.
819  * @param id2 Peer that has disconnected.
820  * @param fwd Direction towards which to send it.
821  */
822 static void
823 send_broken (struct MeshConnection *c,
824              const struct GNUNET_PeerIdentity *id1,
825              const struct GNUNET_PeerIdentity *id2,
826              int fwd)
827 {
828   struct GNUNET_MESH_ConnectionBroken msg;
829
830   msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
831   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
832   msg.cid = c->id;
833   msg.peer1 = *id1;
834   msg.peer2 = *id2;
835   GMC_send_prebuilt_message (&msg.header, c, fwd, GNUNET_YES, NULL, NULL);
836 }
837
838
839 /**
840  * Send a notification that a connection is broken, when a connection
841  * isn't even created.
842  *
843  * @param connection_id Connection ID.
844  * @param id1 Peer that has disconnected.
845  * @param id2 Peer that has disconnected.
846  * @param peer Peer to notify (neighbor who sent the connection).
847  */
848 static void
849 send_broken2 (struct GNUNET_HashCode *connection_id,
850              const struct GNUNET_PeerIdentity *id1,
851              const struct GNUNET_PeerIdentity *id2,
852              GNUNET_PEER_Id peer_id)
853 {
854   struct GNUNET_MESH_ConnectionBroken *msg;
855   struct MeshPeer *neighbor;
856
857   LOG (GNUNET_ERROR_TYPE_INFO,
858        "Send BROKEN on unknown connection %s\n", GNUNET_h2s (connection_id));
859
860   msg = GNUNET_new (struct GNUNET_MESH_ConnectionBroken);
861   msg->header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
862   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
863   msg->cid = *connection_id;
864   msg->peer1 = *id1;
865   msg->peer2 = *id2;
866   neighbor = GMP_get_short (peer_id);
867   GMP_queue_add (neighbor, msg,
868                  GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED,
869                  sizeof (struct GNUNET_MESH_ConnectionBroken),
870                  NULL, GNUNET_SYSERR, /* connection, fwd */
871                  NULL, NULL); /* continuation */
872 }
873
874
875 /**
876  * Send keepalive packets for a connection.
877  *
878  * @param c Connection to keep alive..
879  * @param fwd Is this a FWD keepalive? (owner -> dest).
880  */
881 static void
882 send_connection_keepalive (struct MeshConnection *c, int fwd)
883 {
884   struct GNUNET_MessageHeader msg;
885
886   LOG (GNUNET_ERROR_TYPE_INFO,
887        "keepalive %s for connection %s\n",
888        GM_f2s (fwd), GMC_2s (c));
889   GNUNET_STATISTICS_update (stats, "# keepalives sent", 1, GNUNET_NO);
890
891   GNUNET_assert (NULL != c->t);
892   msg.size = htons (sizeof (msg));
893   msg.type = htons (GNUNET_MESSAGE_TYPE_MESH_KEEPALIVE);
894
895   GNUNET_assert (NULL ==
896                  GMT_send_prebuilt_message (&msg, c->t, c,
897                                             GNUNET_NO, NULL, NULL));
898 }
899
900
901 /**
902  * Send CONNECTION_{CREATE/ACK} packets for a connection.
903  *
904  * @param c Connection for which to send the message.
905  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
906  */
907 static void
908 connection_recreate (struct MeshConnection *c, int fwd)
909 {
910   LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
911   if (fwd)
912     GMC_send_create (c);
913   else
914     send_connection_ack (c, GNUNET_NO);
915 }
916
917
918 /**
919  * Generic connection timer management.
920  * Depending on the role of the peer in the connection will send the
921  * appropriate message (build or keepalive)
922  *
923  * @param c Conncetion to maintain.
924  * @param fwd Is FWD?
925  */
926 static void
927 connection_maintain (struct MeshConnection *c, int fwd)
928 {
929   if (GNUNET_NO != c->destroy)
930     return;
931
932   if (MESH_TUNNEL3_SEARCHING == GMT_get_cstate (c->t))
933   {
934     /* TODO DHT GET with RO_BART */
935     return;
936   }
937   switch (c->state)
938   {
939     case MESH_CONNECTION_NEW:
940       GNUNET_break (0);
941       /* fall-through */
942     case MESH_CONNECTION_SENT:
943       connection_recreate (c, fwd);
944       break;
945     case MESH_CONNECTION_READY:
946       send_connection_keepalive (c, fwd);
947       break;
948     default:
949       break;
950   }
951 }
952
953
954
955 /**
956  * Keep the connection alive.
957  *
958  * @param c Connection to keep alive.
959  * @param fwd Direction.
960  * @param shutdown Are we shutting down? (Don't send traffic)
961  *                 Non-zero value for true, not necessarily GNUNET_YES.
962  */
963 static void
964 connection_keepalive (struct MeshConnection *c, int fwd, int shutdown)
965 {
966   LOG (GNUNET_ERROR_TYPE_DEBUG, "%s keepalive for %s\n",
967        GM_f2s (fwd), GMC_2s (c));
968
969   if (fwd)
970     c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
971   else
972     c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
973
974   if (GNUNET_NO != shutdown)
975     return;
976
977   connection_maintain (c, fwd);
978
979   /* Next execution will be scheduled by message_sent */
980 }
981
982
983 /**
984  * Keep the connection alive in the FWD direction.
985  *
986  * @param cls Closure (connection to keepalive).
987  * @param tc TaskContext.
988  */
989 static void
990 connection_fwd_keepalive (void *cls,
991                           const struct GNUNET_SCHEDULER_TaskContext *tc)
992 {
993   connection_keepalive ((struct MeshConnection *) cls,
994                         GNUNET_YES,
995                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
996 }
997
998
999 /**
1000  * Keep the connection alive in the BCK direction.
1001  *
1002  * @param cls Closure (connection to keepalive).
1003  * @param tc TaskContext.
1004  */
1005 static void
1006 connection_bck_keepalive (void *cls,
1007                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1008 {
1009   connection_keepalive ((struct MeshConnection *) cls,
1010                         GNUNET_NO,
1011                         tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN);
1012 }
1013
1014
1015 /**
1016  * Schedule next keepalive task, taking in consideration
1017  * the connection state and number of retries.
1018  *
1019  * If the peer is not the origin, do nothing.
1020  *
1021  * @param c Connection for which to schedule the next keepalive.
1022  * @param fwd Direction for the next keepalive.
1023  */
1024 static void
1025 schedule_next_keepalive (struct MeshConnection *c, int fwd)
1026 {
1027   struct GNUNET_TIME_Relative delay;
1028   GNUNET_SCHEDULER_TaskIdentifier *task_id;
1029   GNUNET_SCHEDULER_Task keepalive_task;
1030
1031   if (GNUNET_NO == GMC_is_origin (c, fwd))
1032     return;
1033
1034   /* Calculate delay to use, depending on the state of the connection */
1035   if (MESH_CONNECTION_READY == c->state)
1036   {
1037     delay = refresh_connection_time;
1038   }
1039   else
1040   {
1041     if (1 > c->create_retry)
1042       c->create_retry = 1;
1043     delay = GNUNET_TIME_relative_multiply (create_connection_time,
1044                                            c->create_retry);
1045     if (c->create_retry < 64)
1046       c->create_retry *= 2;
1047   }
1048
1049   /* Select direction-dependent parameters */
1050   if (GNUNET_YES == fwd)
1051   {
1052     task_id = &c->fwd_maintenance_task;
1053     keepalive_task = &connection_fwd_keepalive;
1054   }
1055   else
1056   {
1057     task_id = &c->bck_maintenance_task;
1058     keepalive_task = &connection_bck_keepalive;
1059   }
1060
1061   /* Check that no one scheduled it before us */
1062   if (GNUNET_SCHEDULER_NO_TASK != *task_id)
1063   {
1064     /* No need for a _break. It can happen for instance when sending a SYNACK
1065      * for a duplicate SYN: the first SYNACK scheduled the task. */
1066     GNUNET_SCHEDULER_cancel (*task_id);
1067   }
1068
1069   /* Schedule the task */
1070   *task_id = GNUNET_SCHEDULER_add_delayed (delay, keepalive_task, c);
1071   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "next keepalive in %s\n",
1072               GNUNET_STRINGS_relative_time_to_string (delay, GNUNET_YES));
1073 }
1074
1075
1076 /**
1077  * @brief Re-initiate traffic on this connection if necessary.
1078  *
1079  * Check if there is traffic queued towards this peer
1080  * and the core transmit handle is NULL (traffic was stalled).
1081  * If so, call core tmt rdy.
1082  *
1083  * @param c Connection on which initiate traffic.
1084  * @param fwd Is this about fwd traffic?
1085  */
1086 static void
1087 connection_unlock_queue (struct MeshConnection *c, int fwd)
1088 {
1089   struct MeshPeer *peer;
1090
1091   LOG (GNUNET_ERROR_TYPE_DEBUG,
1092               "connection_unlock_queue %s on %s\n",
1093               GM_f2s (fwd), GMC_2s (c));
1094
1095   if (GMC_is_terminal (c, fwd))
1096   {
1097     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal!\n");
1098     return;
1099   }
1100
1101   peer = get_hop (c, fwd);
1102   GMP_queue_unlock (peer, c);
1103 }
1104
1105
1106 /**
1107  * Cancel all transmissions that belong to a certain connection.
1108  *
1109  * If the connection is scheduled for destruction and no more messages are left,
1110  * the connection will be destroyed by the continuation call.
1111  *
1112  * @param c Connection which to cancel. Might be destroyed during this call.
1113  * @param fwd Cancel fwd traffic?
1114  */
1115 static void
1116 connection_cancel_queues (struct MeshConnection *c, int fwd)
1117 {
1118   struct MeshFlowControl *fc;
1119   struct MeshPeer *peer;
1120
1121   LOG (GNUNET_ERROR_TYPE_DEBUG,
1122        " *** Cancel %s queues for connection %s\n",
1123        GM_f2s (fwd), GMC_2s (c));
1124   if (NULL == c)
1125   {
1126     GNUNET_break (0);
1127     return;
1128   }
1129
1130   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1131   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
1132   {
1133     GNUNET_SCHEDULER_cancel (fc->poll_task);
1134     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1135     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Cancel POLL in ccq for fc %p\n", fc);
1136   }
1137   peer = get_hop (c, fwd);
1138   GMP_queue_cancel (peer, c);
1139 }
1140
1141
1142 /**
1143  * Function called if a connection has been stalled for a while,
1144  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1145  *
1146  * @param cls Closure (poll ctx).
1147  * @param tc TaskContext.
1148  */
1149 static void
1150 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1151
1152
1153 /**
1154  * Callback called when a queued POLL message is sent.
1155  *
1156  * @param cls Closure (FC).
1157  * @param c Connection this message was on.
1158  * @param q Queue handler this call invalidates.
1159  * @param type Type of message sent.
1160  * @param fwd Was this a FWD going message?
1161  * @param size Size of the message.
1162  */
1163 static void
1164 poll_sent (void *cls,
1165            struct MeshConnection *c,
1166            struct MeshConnectionQueue *q,
1167            uint16_t type, int fwd, size_t size)
1168 {
1169   struct MeshFlowControl *fc = cls;
1170
1171   if (2 == c->destroy)
1172   {
1173     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL canceled on shutdown\n");
1174     return;
1175   }
1176   LOG (GNUNET_ERROR_TYPE_DEBUG,
1177        " *** POLL sent for , scheduling new one!\n");
1178   fc->poll_msg = NULL;
1179   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
1180   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1181                                                 &connection_poll, fc);
1182   LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
1183
1184 }
1185
1186 /**
1187  * Function called if a connection has been stalled for a while,
1188  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1189  *
1190  * @param cls Closure (poll ctx).
1191  * @param tc TaskContext.
1192  */
1193 static void
1194 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1195 {
1196   struct MeshFlowControl *fc = cls;
1197   struct GNUNET_MESH_Poll msg;
1198   struct MeshConnection *c;
1199
1200   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1201   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1202   {
1203     return;
1204   }
1205
1206   c = fc->c;
1207   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
1208   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%s]\n", GMC_2s (c));
1209   LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   %s\n",
1210        fc == &c->fwd_fc ? "FWD" : "BCK");
1211
1212   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
1213   msg.header.size = htons (sizeof (msg));
1214   msg.pid = htonl (fc->last_pid_sent);
1215   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent);
1216   fc->poll_msg = GMC_send_prebuilt_message (&msg.header, c,
1217                                             fc == &c->fwd_fc, GNUNET_YES,
1218                                             &poll_sent, fc);
1219 }
1220
1221
1222 /**
1223  * Timeout function due to lack of keepalive/traffic from the owner.
1224  * Destroys connection if called.
1225  *
1226  * @param cls Closure (connection to destroy).
1227  * @param tc TaskContext.
1228  */
1229 static void
1230 connection_fwd_timeout (void *cls,
1231                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1232 {
1233   struct MeshConnection *c = cls;
1234
1235   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1236   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1237     return;
1238
1239   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s FWD timed out. Destroying.\n",
1240        GMC_2s (c));
1241   if (GMC_is_origin (c, GNUNET_YES)) /* If local, leave. */
1242   {
1243     GNUNET_break (0);
1244     return;
1245   }
1246
1247   GMC_destroy (c);
1248 }
1249
1250
1251 /**
1252  * Timeout function due to lack of keepalive/traffic from the destination.
1253  * Destroys connection if called.
1254  *
1255  * @param cls Closure (connection to destroy).
1256  * @param tc TaskContext
1257  */
1258 static void
1259 connection_bck_timeout (void *cls,
1260                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1261 {
1262   struct MeshConnection *c = cls;
1263
1264   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1265   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1266     return;
1267
1268   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s BCK timed out. Destroying.\n",
1269        GMC_2s (c));
1270
1271   if (GMC_is_origin (c, GNUNET_NO)) /* If local, leave. */
1272   {
1273     GNUNET_break (0);
1274     return;
1275   }
1276
1277   GMC_destroy (c);
1278 }
1279
1280
1281 /**
1282  * Resets the connection timeout task, some other message has done the
1283  * task's job.
1284  * - For the first peer on the direction this means to send
1285  *   a keepalive or a path confirmation message (either create or ACK).
1286  * - For all other peers, this means to destroy the connection,
1287  *   due to lack of activity.
1288  * Starts the timeout if no timeout was running (connection just created).
1289  *
1290  * @param c Connection whose timeout to reset.
1291  * @param fwd Is this forward?
1292  *
1293  * TODO use heap to improve efficiency of scheduler.
1294  */
1295 static void
1296 connection_reset_timeout (struct MeshConnection *c, int fwd)
1297 {
1298   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GM_f2s (fwd));
1299
1300   if (GMC_is_origin (c, fwd)) /* Startpoint */
1301   {
1302     schedule_next_keepalive (c, fwd);
1303   }
1304   else /* Relay, endpoint. */
1305   {
1306     struct GNUNET_TIME_Relative delay;
1307     GNUNET_SCHEDULER_TaskIdentifier *ti;
1308     GNUNET_SCHEDULER_Task f;
1309
1310     ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1311
1312     if (GNUNET_SCHEDULER_NO_TASK != *ti)
1313       GNUNET_SCHEDULER_cancel (*ti);
1314     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1315     f = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1316     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1317   }
1318 }
1319
1320
1321 /**
1322  * Add the connection to the list of both neighbors.
1323  *
1324  * @param c Connection.
1325  *
1326  * @return #GNUNET_OK if everything went fine
1327  *         #GNUNET_SYSERR if the was an error and @c c is malformed.
1328  */
1329 static int
1330 register_neighbors (struct MeshConnection *c)
1331 {
1332   struct MeshPeer *next_peer;
1333   struct MeshPeer *prev_peer;
1334
1335   next_peer = get_next_hop (c);
1336   prev_peer = get_prev_hop (c);
1337
1338   LOG (GNUNET_ERROR_TYPE_DEBUG, "register neighbors for connection %s\n",
1339        GMC_2s (c));
1340   path_debug (c->path);
1341   LOG (GNUNET_ERROR_TYPE_DEBUG, "own pos %u\n", c->own_pos);
1342   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to next peer %p\n",
1343        GMC_2s (c), next_peer);
1344   LOG (GNUNET_ERROR_TYPE_DEBUG, "next peer %p %s\n", next_peer, GMP_2s (next_peer));
1345   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to prev peer %p\n",
1346        GMC_2s (c), prev_peer);
1347   LOG (GNUNET_ERROR_TYPE_DEBUG, "prev peer %p %s\n", prev_peer, GMP_2s (prev_peer));
1348
1349   if (GNUNET_NO == GMP_is_neighbor (next_peer)
1350       || GNUNET_NO == GMP_is_neighbor (prev_peer))
1351   {
1352     if (GMC_is_origin (c, GNUNET_YES))
1353       GNUNET_STATISTICS_update (stats, "# local bad paths", 1, GNUNET_NO);
1354     GNUNET_STATISTICS_update (stats, "# bad paths", 1, GNUNET_NO);
1355
1356     LOG (GNUNET_ERROR_TYPE_DEBUG, "  register neighbors failed\n");
1357     LOG (GNUNET_ERROR_TYPE_DEBUG, "  prev: %s, neighbor?: %d\n",
1358          GMP_2s (prev_peer), GMP_is_neighbor (prev_peer));
1359     LOG (GNUNET_ERROR_TYPE_DEBUG, "  next: %s, neighbor?: %d\n",
1360          GMP_2s (next_peer), GMP_is_neighbor (next_peer));
1361     return GNUNET_SYSERR;
1362   }
1363
1364   GMP_add_connection (next_peer, c);
1365   GMP_add_connection (prev_peer, c);
1366
1367   return GNUNET_OK;
1368 }
1369
1370
1371 /**
1372  * Remove the connection from the list of both neighbors.
1373  *
1374  * @param c Connection.
1375  */
1376 static void
1377 unregister_neighbors (struct MeshConnection *c)
1378 {
1379   struct MeshPeer *peer;
1380
1381   peer = get_next_hop (c);
1382   if (GNUNET_OK != GMP_remove_connection (peer, c))
1383   {
1384     GNUNET_assert (MESH_CONNECTION_NEW == c->state
1385                   || MESH_CONNECTION_DESTROYED == c->state);
1386     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1387     if (NULL != c->t) GMT_debug (c->t);
1388   }
1389
1390   peer = get_prev_hop (c);
1391   if (GNUNET_OK != GMP_remove_connection (peer, c))
1392   {
1393     GNUNET_assert (MESH_CONNECTION_NEW == c->state
1394                   || MESH_CONNECTION_DESTROYED == c->state);
1395     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1396     if (NULL != c->t) GMT_debug (c->t);
1397   }
1398 }
1399
1400
1401 /**
1402  * Bind the connection to the peer and the tunnel to that peer.
1403  *
1404  * If the peer has no tunnel, create one. Update tunnel and connection
1405  * data structres to reflect new status.
1406  *
1407  * @param c Connection.
1408  * @param peer Peer.
1409  */
1410 static void
1411 add_to_peer (struct MeshConnection *c, struct MeshPeer *peer)
1412 {
1413   GMP_add_tunnel (peer);
1414   c->t = GMP_get_tunnel (peer);
1415   GMT_add_connection (c->t, c);
1416 }
1417
1418
1419 /**
1420  * Builds a path from a PeerIdentity array.
1421  *
1422  * @param peers PeerIdentity array.
1423  * @param size Size of the @c peers array.
1424  * @param own_pos Output parameter: own position in the path.
1425  *
1426  * @return Fixed and shortened path.
1427  */
1428 static struct MeshPeerPath *
1429 build_path_from_peer_ids (struct GNUNET_PeerIdentity *peers,
1430                           unsigned int size,
1431                           unsigned int *own_pos)
1432 {
1433   struct MeshPeerPath *path;
1434   GNUNET_PEER_Id shortid;
1435   unsigned int i;
1436   unsigned int j;
1437   unsigned int offset;
1438
1439   /* Create path */
1440   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating path...\n");
1441   path = path_new (size);
1442   *own_pos = 0;
1443   offset = 0;
1444   for (i = 0; i < size; i++)
1445   {
1446     LOG (GNUNET_ERROR_TYPE_DEBUG, "  - %u: taking %s\n",
1447          i, GNUNET_i2s (&peers[i]));
1448     shortid = GNUNET_PEER_intern (&peers[i]);
1449
1450     /* Check for loops / duplicates */
1451     for (j = 0; j < i - offset; j++)
1452     {
1453       if (path->peers[j] == shortid)
1454       {
1455         LOG (GNUNET_ERROR_TYPE_DEBUG, "    already exists at pos %u\n", j);
1456         offset += i - j;
1457         LOG (GNUNET_ERROR_TYPE_DEBUG, "    offset now\n", offset);
1458         GNUNET_PEER_change_rc (shortid, -1);
1459       }
1460     }
1461     LOG (GNUNET_ERROR_TYPE_DEBUG, "    storing at %u\n", i - offset);
1462     path->peers[i - offset] = shortid;
1463     if (path->peers[i] == myid)
1464       *own_pos = i;
1465   }
1466   path->length -= offset;
1467
1468   if (path->peers[*own_pos] != myid)
1469   {
1470     /* create path: self not found in path through self */
1471     GNUNET_break_op (0);
1472     path_destroy (path);
1473     return NULL;
1474   }
1475
1476   return path;
1477 }
1478
1479
1480 /**
1481  * Log receipt of message on stderr (INFO level).
1482  *
1483  * @param message Message received.
1484  * @param peer Peer who sent the message.
1485  * @param hash Connection ID.
1486  */
1487 static void
1488 log_message (const struct GNUNET_MessageHeader *message,
1489              const struct GNUNET_PeerIdentity *peer,
1490              const struct GNUNET_HashCode *hash)
1491 {
1492   LOG (GNUNET_ERROR_TYPE_INFO, "<- %s on connection %s from %s\n",
1493        GM_m2s (ntohs (message->type)), GNUNET_h2s (hash), GNUNET_i2s (peer));
1494 }
1495
1496 /******************************************************************************/
1497 /********************************    API    ***********************************/
1498 /******************************************************************************/
1499
1500 /**
1501  * Core handler for connection creation.
1502  *
1503  * @param cls Closure (unused).
1504  * @param peer Sender (neighbor).
1505  * @param message Message.
1506  *
1507  * @return GNUNET_OK to keep the connection open,
1508  *         GNUNET_SYSERR to close it (signal serious error)
1509  */
1510 int
1511 GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
1512                    const struct GNUNET_MessageHeader *message)
1513 {
1514   struct GNUNET_MESH_ConnectionCreate *msg;
1515   struct GNUNET_PeerIdentity *id;
1516   struct GNUNET_HashCode *cid;
1517   struct MeshPeerPath *path;
1518   struct MeshPeer *dest_peer;
1519   struct MeshPeer *orig_peer;
1520   struct MeshConnection *c;
1521   unsigned int own_pos;
1522   uint16_t size;
1523
1524   /* Check size */
1525   size = ntohs (message->size);
1526   if (size < sizeof (struct GNUNET_MESH_ConnectionCreate))
1527   {
1528     GNUNET_break_op (0);
1529     return GNUNET_OK;
1530   }
1531
1532   /* Calculate hops */
1533   size -= sizeof (struct GNUNET_MESH_ConnectionCreate);
1534   if (size % sizeof (struct GNUNET_PeerIdentity))
1535   {
1536     GNUNET_break_op (0);
1537     return GNUNET_OK;
1538   }
1539   size /= sizeof (struct GNUNET_PeerIdentity);
1540   if (1 > size)
1541   {
1542     GNUNET_break_op (0);
1543     return GNUNET_OK;
1544   }
1545   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1546
1547   /* Get parameters */
1548   msg = (struct GNUNET_MESH_ConnectionCreate *) message;
1549   cid = &msg->cid;
1550   log_message (message, peer, cid);
1551   id = (struct GNUNET_PeerIdentity *) &msg[1];
1552   LOG (GNUNET_ERROR_TYPE_DEBUG, "    origin: %s\n", GNUNET_i2s (id));
1553
1554   /* Create connection */
1555   c = connection_get (cid);
1556   if (NULL == c)
1557   {
1558     path = build_path_from_peer_ids ((struct GNUNET_PeerIdentity *) &msg[1],
1559                                      size, &own_pos);
1560     if (NULL == path)
1561       return GNUNET_OK;
1562     if (0 == own_pos)
1563     {
1564       GNUNET_break_op (0);
1565       path_destroy (path);
1566       return GNUNET_OK;
1567     }
1568     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1569     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1570     c = GMC_new (cid, NULL, path_duplicate (path), own_pos);
1571     if (NULL == c)
1572     {
1573       if (path->length - 1 == own_pos)
1574       {
1575         /* If we are destination, why did the creation fail? */
1576         GNUNET_break (0);
1577         return GNUNET_OK;
1578       }
1579       send_broken2 (cid, &my_full_id,
1580                     GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
1581                     path->peers[own_pos - 1]);
1582       path_destroy (path);
1583       return GNUNET_OK;
1584     }
1585     GMP_add_path_to_all (path, GNUNET_NO);
1586     connection_reset_timeout (c, GNUNET_YES);
1587   }
1588   else
1589   {
1590     path = path_duplicate (c->path);
1591   }
1592   if (MESH_CONNECTION_NEW == c->state)
1593     connection_change_state (c, MESH_CONNECTION_SENT);
1594
1595   /* Remember peers */
1596   dest_peer = GMP_get (&id[size - 1]);
1597   orig_peer = GMP_get (&id[0]);
1598
1599   /* Is it a connection to us? */
1600   if (c->own_pos == size - 1)
1601   {
1602     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
1603     GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
1604
1605     add_to_peer (c, orig_peer);
1606     if (MESH_TUNNEL3_NEW == GMT_get_cstate (c->t))
1607       GMT_change_cstate (c->t,  MESH_TUNNEL3_WAITING);
1608
1609     send_connection_ack (c, GNUNET_NO);
1610     if (MESH_CONNECTION_SENT == c->state)
1611       connection_change_state (c, MESH_CONNECTION_ACK);
1612   }
1613   else
1614   {
1615     /* It's for somebody else! Retransmit. */
1616     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
1617     GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
1618     GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
1619     GMC_send_prebuilt_message (message, c, GNUNET_YES, GNUNET_YES,
1620                                NULL, NULL);
1621   }
1622   path_destroy (path);
1623   return GNUNET_OK;
1624 }
1625
1626
1627 /**
1628  * Core handler for path confirmations.
1629  *
1630  * @param cls closure
1631  * @param message message
1632  * @param peer peer identity this notification is about
1633  *
1634  * @return GNUNET_OK to keep the connection open,
1635  *         GNUNET_SYSERR to close it (signal serious error)
1636  */
1637 int
1638 GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
1639                     const struct GNUNET_MessageHeader *message)
1640 {
1641   struct GNUNET_MESH_ConnectionACK *msg;
1642   struct MeshConnection *c;
1643   struct MeshPeerPath *p;
1644   struct MeshPeer *pi;
1645   enum MeshConnectionState oldstate;
1646   int fwd;
1647
1648   msg = (struct GNUNET_MESH_ConnectionACK *) message;
1649   log_message (message, peer, &msg->cid);
1650   c = connection_get (&msg->cid);
1651   if (NULL == c)
1652   {
1653     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1654                               1, GNUNET_NO);
1655     LOG (GNUNET_ERROR_TYPE_DEBUG, "  don't know the connection!\n");
1656     return GNUNET_OK;
1657   }
1658
1659   if (GNUNET_NO != c->destroy)
1660   {
1661     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection being destroyed\n");
1662     return GNUNET_OK;
1663   }
1664
1665   oldstate = c->state;
1666   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
1667   pi = GMP_get (peer);
1668   if (get_next_hop (c) == pi)
1669   {
1670     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
1671     fwd = GNUNET_NO;
1672     if (MESH_CONNECTION_SENT == oldstate)
1673       connection_change_state (c, MESH_CONNECTION_ACK);
1674   }
1675   else if (get_prev_hop (c) == pi)
1676   {
1677     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK\n");
1678     fwd = GNUNET_YES;
1679     connection_change_state (c, MESH_CONNECTION_READY);
1680   }
1681   else
1682   {
1683     GNUNET_break_op (0);
1684     return GNUNET_OK;
1685   }
1686
1687   connection_reset_timeout (c, fwd);
1688
1689   /* Add path to peers? */
1690   p = c->path;
1691   if (NULL != p)
1692   {
1693     GMP_add_path_to_all (p, GNUNET_YES);
1694   }
1695   else
1696   {
1697     GNUNET_break (0);
1698   }
1699
1700   /* Message for us as creator? */
1701   if (GMC_is_origin (c, GNUNET_YES))
1702   {
1703     if (GNUNET_NO != fwd)
1704     {
1705       GNUNET_break_op (0);
1706       return GNUNET_OK;
1707     }
1708     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
1709
1710     /* If just created, cancel the short timeout and start a long one */
1711     if (MESH_CONNECTION_SENT == oldstate)
1712       connection_reset_timeout (c, GNUNET_YES);
1713
1714     /* Change connection state */
1715     connection_change_state (c, MESH_CONNECTION_READY);
1716     send_connection_ack (c, GNUNET_YES);
1717
1718     /* Change tunnel state, trigger KX */
1719     if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
1720       GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
1721
1722     return GNUNET_OK;
1723   }
1724
1725   /* Message for us as destination? */
1726   if (GMC_is_terminal (c, GNUNET_YES))
1727   {
1728     if (GNUNET_YES != fwd)
1729     {
1730       GNUNET_break_op (0);
1731       return GNUNET_OK;
1732     }
1733     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
1734
1735     /* If just created, cancel the short timeout and start a long one */
1736     if (MESH_CONNECTION_ACK == oldstate)
1737       connection_reset_timeout (c, GNUNET_NO);
1738
1739     /* Change tunnel state */
1740     if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
1741       GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
1742
1743     return GNUNET_OK;
1744   }
1745
1746   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1747   GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
1748   return GNUNET_OK;
1749 }
1750
1751
1752 /**
1753  * Core handler for notifications of broken paths
1754  *
1755  * @param cls Closure (unused).
1756  * @param id Peer identity of sending neighbor.
1757  * @param message Message.
1758  *
1759  * @return GNUNET_OK to keep the connection open,
1760  *         GNUNET_SYSERR to close it (signal serious error)
1761  */
1762 int
1763 GMC_handle_broken (void* cls,
1764                    const struct GNUNET_PeerIdentity* id,
1765                    const struct GNUNET_MessageHeader* message)
1766 {
1767   struct GNUNET_MESH_ConnectionBroken *msg;
1768   struct MeshConnection *c;
1769   int fwd;
1770
1771   msg = (struct GNUNET_MESH_ConnectionBroken *) message;
1772   log_message (message, id, &msg->cid);
1773   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1774               GNUNET_i2s (&msg->peer1));
1775   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1776               GNUNET_i2s (&msg->peer2));
1777   c = connection_get (&msg->cid);
1778   if (NULL == c)
1779   {
1780     GNUNET_break_op (0);
1781     return GNUNET_OK;
1782   }
1783
1784   fwd = is_fwd (c, id);
1785   if (GMC_is_terminal (c, fwd))
1786   {
1787     path_invalidate (c->path);
1788     if (0 < c->pending_messages)
1789       c->destroy = GNUNET_YES;
1790     else
1791       GMC_destroy (c);
1792   }
1793   else
1794   {
1795     GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
1796     c->destroy = GNUNET_YES;
1797     connection_cancel_queues (c, !fwd);
1798   }
1799
1800   return GNUNET_OK;
1801
1802 }
1803
1804
1805 /**
1806  * Core handler for tunnel destruction
1807  *
1808  * @param cls Closure (unused).
1809  * @param peer Peer identity of sending neighbor.
1810  * @param message Message.
1811  *
1812  * @return GNUNET_OK to keep the connection open,
1813  *         GNUNET_SYSERR to close it (signal serious error)
1814  */
1815 int
1816 GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
1817                     const struct GNUNET_MessageHeader *message)
1818 {
1819   struct GNUNET_MESH_ConnectionDestroy *msg;
1820   struct MeshConnection *c;
1821   int fwd;
1822
1823   msg = (struct GNUNET_MESH_ConnectionDestroy *) message;
1824   log_message (message, peer, &msg->cid);
1825   c = connection_get (&msg->cid);
1826   if (NULL == c)
1827   {
1828     /* Probably already got the message from another path,
1829      * destroyed the tunnel and retransmitted to children.
1830      * Safe to ignore.
1831      */
1832     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1833                               1, GNUNET_NO);
1834     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection unknown: already destroyed?\n");
1835     return GNUNET_OK;
1836   }
1837   fwd = is_fwd (c, peer);
1838   if (GNUNET_SYSERR == fwd)
1839   {
1840     GNUNET_break_op (0); /* FIXME */
1841     return GNUNET_OK;
1842   }
1843   if (GNUNET_NO == GMC_is_terminal (c, fwd))
1844     GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
1845   else if (0 == c->pending_messages)
1846   {
1847     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  directly destroying connection!\n");
1848     GMC_destroy (c);
1849     return GNUNET_OK;
1850   }
1851   c->destroy = GNUNET_YES;
1852   c->state = MESH_CONNECTION_DESTROYED;
1853   if (NULL != c->t)
1854   {
1855     GMT_remove_connection (c->t, c);
1856     c->t = NULL;
1857   }
1858
1859   return GNUNET_OK;
1860 }
1861
1862 /**
1863  * Generic handler for mesh network encrypted traffic.
1864  *
1865  * @param peer Peer identity this notification is about.
1866  * @param msg Encrypted message.
1867  *
1868  * @return GNUNET_OK to keep the connection open,
1869  *         GNUNET_SYSERR to close it (signal serious error)
1870  */
1871 static int
1872 handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
1873                        const struct GNUNET_MESH_Encrypted *msg)
1874 {
1875   struct MeshConnection *c;
1876   struct MeshPeer *neighbor;
1877   struct MeshFlowControl *fc;
1878   GNUNET_PEER_Id peer_id;
1879   uint32_t pid;
1880   uint32_t ttl;
1881   size_t size;
1882   int fwd;
1883
1884   log_message (&msg->header, peer, &msg->cid);
1885
1886   /* Check size */
1887   size = ntohs (msg->header.size);
1888   if (size <
1889       sizeof (struct GNUNET_MESH_Encrypted) +
1890       sizeof (struct GNUNET_MessageHeader))
1891   {
1892     GNUNET_break_op (0);
1893     return GNUNET_OK;
1894   }
1895
1896   /* Check connection */
1897   c = connection_get (&msg->cid);
1898   if (NULL == c)
1899   {
1900     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1901     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING enc on unknown connection %s\n",
1902          GNUNET_h2s (&msg->cid));
1903     return GNUNET_OK;
1904   }
1905
1906   LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection %s\n", GMC_2s (c));
1907
1908   /* Check if origin is as expected */
1909   neighbor = get_prev_hop (c);
1910   peer_id = GNUNET_PEER_search (peer);
1911   if (peer_id == GMP_get_short_id (neighbor))
1912   {
1913     fwd = GNUNET_YES;
1914   }
1915   else
1916   {
1917     neighbor = get_next_hop (c);
1918     if (peer_id == GMP_get_short_id (neighbor))
1919     {
1920       fwd = GNUNET_NO;
1921     }
1922     else
1923     {
1924       /* Unexpected peer sending traffic on a connection. */
1925       GNUNET_break_op (0);
1926       return GNUNET_OK;
1927     }
1928   }
1929
1930   /* Check PID */
1931   fc = fwd ? &c->bck_fc : &c->fwd_fc;
1932   pid = ntohl (msg->pid);
1933   if (GM_is_pid_bigger (pid, fc->last_ack_sent))
1934   {
1935     GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
1936     LOG (GNUNET_ERROR_TYPE_DEBUG,
1937                 "WARNING Received PID %u, (prev %u), ACK %u\n",
1938                 pid, fc->last_pid_recv, fc->last_ack_sent);
1939     return GNUNET_OK;
1940   }
1941   if (GNUNET_NO == GM_is_pid_bigger (pid, fc->last_pid_recv))
1942   {
1943     GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
1944     LOG (GNUNET_ERROR_TYPE_DEBUG,
1945                 " Pid %u not expected (%u+), dropping!\n",
1946                 pid, fc->last_pid_recv + 1);
1947     return GNUNET_OK;
1948   }
1949   if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
1950     connection_change_state (c, MESH_CONNECTION_READY);
1951   connection_reset_timeout (c, fwd);
1952   fc->last_pid_recv = pid;
1953
1954   /* Is this message for us? */
1955   if (GMC_is_terminal (c, fwd))
1956   {
1957     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
1958     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
1959
1960     if (NULL == c->t)
1961     {
1962       GNUNET_break (GNUNET_NO != c->destroy);
1963       return GNUNET_OK;
1964     }
1965     fc->last_pid_recv = pid;
1966     GMT_handle_encrypted (c->t, msg);
1967     GMC_send_ack (c, fwd, GNUNET_NO);
1968     return GNUNET_OK;
1969   }
1970
1971   /* Message not for us: forward to next hop */
1972   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1973   ttl = ntohl (msg->ttl);
1974   LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
1975   if (ttl == 0)
1976   {
1977     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
1978     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
1979     GMC_send_ack (c, fwd, GNUNET_NO);
1980     return GNUNET_OK;
1981   }
1982
1983   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
1984   GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_NO, NULL, NULL);
1985
1986   return GNUNET_OK;
1987 }
1988
1989 /**
1990  * Generic handler for mesh network encrypted traffic.
1991  *
1992  * @param peer Peer identity this notification is about.
1993  * @param msg Encrypted message.
1994  *
1995  * @return GNUNET_OK to keep the connection open,
1996  *         GNUNET_SYSERR to close it (signal serious error)
1997  */
1998 static int
1999 handle_mesh_kx (const struct GNUNET_PeerIdentity *peer,
2000                 const struct GNUNET_MESH_KX *msg)
2001 {
2002   struct MeshConnection *c;
2003   struct MeshPeer *neighbor;
2004   GNUNET_PEER_Id peer_id;
2005   size_t size;
2006   int fwd;
2007
2008   log_message (&msg->header, peer, &msg->cid);
2009
2010   /* Check size */
2011   size = ntohs (msg->header.size);
2012   if (size <
2013       sizeof (struct GNUNET_MESH_KX) +
2014       sizeof (struct GNUNET_MessageHeader))
2015   {
2016     GNUNET_break_op (0);
2017     return GNUNET_OK;
2018   }
2019
2020   /* Check connection */
2021   c = connection_get (&msg->cid);
2022   if (NULL == c)
2023   {
2024     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
2025     LOG (GNUNET_ERROR_TYPE_DEBUG, "kx on unknown connection %s\n",
2026          GNUNET_h2s (&msg->cid));
2027     return GNUNET_OK;
2028   }
2029   LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s\n", GMC_2s (c));
2030
2031   /* Check if origin is as expected */
2032   neighbor = get_prev_hop (c);
2033   peer_id = GNUNET_PEER_search (peer);
2034   if (peer_id == GMP_get_short_id (neighbor))
2035   {
2036     fwd = GNUNET_YES;
2037   }
2038   else
2039   {
2040     neighbor = get_next_hop (c);
2041     if (peer_id == GMP_get_short_id (neighbor))
2042     {
2043       fwd = GNUNET_NO;
2044     }
2045     else
2046     {
2047       /* Unexpected peer sending traffic on a connection. */
2048       GNUNET_break_op (0);
2049       return GNUNET_OK;
2050     }
2051   }
2052
2053   /* Count as connection confirmation. */
2054   if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
2055   {
2056     connection_change_state (c, MESH_CONNECTION_READY);
2057     if (NULL != c->t)
2058     {
2059       if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
2060         GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
2061     }
2062   }
2063   connection_reset_timeout (c, fwd);
2064
2065   /* Is this message for us? */
2066   if (GMC_is_terminal (c, fwd))
2067   {
2068     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
2069     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
2070     if (NULL == c->t)
2071     {
2072       GNUNET_break (0);
2073       return GNUNET_OK;
2074     }
2075     GMT_handle_kx (c->t, &msg[1].header);
2076     return GNUNET_OK;
2077   }
2078
2079   /* Message not for us: forward to next hop */
2080   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
2081   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
2082   GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_NO, NULL, NULL);
2083
2084   return GNUNET_OK;
2085 }
2086
2087
2088 /**
2089  * Core handler for encrypted mesh network traffic (channel mgmt, data).
2090  *
2091  * @param cls Closure (unused).
2092  * @param message Message received.
2093  * @param peer Peer who sent the message.
2094  *
2095  * @return GNUNET_OK to keep the connection open,
2096  *         GNUNET_SYSERR to close it (signal serious error)
2097  */
2098 int
2099 GMC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
2100                       const struct GNUNET_MessageHeader *message)
2101 {
2102   return handle_mesh_encrypted (peer,
2103                                 (struct GNUNET_MESH_Encrypted *)message);
2104 }
2105
2106
2107 /**
2108  * Core handler for key exchange traffic (ephemeral key, ping, pong).
2109  *
2110  * @param cls Closure (unused).
2111  * @param message Message received.
2112  * @param peer Peer who sent the message.
2113  *
2114  * @return GNUNET_OK to keep the connection open,
2115  *         GNUNET_SYSERR to close it (signal serious error)
2116  */
2117 int
2118 GMC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
2119                const struct GNUNET_MessageHeader *message)
2120 {
2121   return handle_mesh_kx (peer,
2122                          (struct GNUNET_MESH_KX *) message);
2123 }
2124
2125
2126 /**
2127  * Core handler for mesh network traffic point-to-point acks.
2128  *
2129  * @param cls closure
2130  * @param message message
2131  * @param peer peer identity this notification is about
2132  *
2133  * @return GNUNET_OK to keep the connection open,
2134  *         GNUNET_SYSERR to close it (signal serious error)
2135  */
2136 int
2137 GMC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
2138                 const struct GNUNET_MessageHeader *message)
2139 {
2140   struct GNUNET_MESH_ACK *msg;
2141   struct MeshConnection *c;
2142   struct MeshFlowControl *fc;
2143   GNUNET_PEER_Id id;
2144   uint32_t ack;
2145   int fwd;
2146
2147   msg = (struct GNUNET_MESH_ACK *) message;
2148   log_message (message, peer, &msg->cid);
2149   c = connection_get (&msg->cid);
2150   if (NULL == c)
2151   {
2152     GNUNET_STATISTICS_update (stats, "# ack on unknown connection", 1,
2153                               GNUNET_NO);
2154     return GNUNET_OK;
2155   }
2156
2157   /* Is this a forward or backward ACK? */
2158   id = GNUNET_PEER_search (peer);
2159   if (GMP_get_short_id (get_next_hop (c)) == id)
2160   {
2161     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
2162     fc = &c->fwd_fc;
2163     fwd = GNUNET_YES;
2164   }
2165   else if (GMP_get_short_id (get_prev_hop (c)) == id)
2166   {
2167     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
2168     fc = &c->bck_fc;
2169     fwd = GNUNET_NO;
2170   }
2171   else
2172   {
2173     GNUNET_break_op (0);
2174     return GNUNET_OK;
2175   }
2176
2177   ack = ntohl (msg->ack);
2178   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK %u (was %u)\n",
2179               ack, fc->last_ack_recv);
2180   if (GM_is_pid_bigger (ack, fc->last_ack_recv))
2181     fc->last_ack_recv = ack;
2182
2183   /* Cancel polling if the ACK is big enough. */
2184   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
2185       GM_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2186   {
2187     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
2188     GNUNET_SCHEDULER_cancel (fc->poll_task);
2189     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2190     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
2191   }
2192
2193   connection_unlock_queue (c, fwd);
2194
2195   return GNUNET_OK;
2196 }
2197
2198
2199 /**
2200  * Core handler for mesh network traffic point-to-point ack polls.
2201  *
2202  * @param cls closure
2203  * @param message message
2204  * @param peer peer identity this notification is about
2205  *
2206  * @return GNUNET_OK to keep the connection open,
2207  *         GNUNET_SYSERR to close it (signal serious error)
2208  */
2209 int
2210 GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
2211                  const struct GNUNET_MessageHeader *message)
2212 {
2213   struct GNUNET_MESH_Poll *msg;
2214   struct MeshConnection *c;
2215   struct MeshFlowControl *fc;
2216   GNUNET_PEER_Id id;
2217   uint32_t pid;
2218   int fwd;
2219
2220   msg = (struct GNUNET_MESH_Poll *) message;
2221   log_message (message, peer, &msg->cid);
2222   c = connection_get (&msg->cid);
2223   if (NULL == c)
2224   {
2225     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
2226                               GNUNET_NO);
2227     LOG (GNUNET_ERROR_TYPE_DEBUG,
2228          "WARNING POLL message on unknown connection %s!\n",
2229          GNUNET_h2s (&msg->cid));
2230     return GNUNET_OK;
2231   }
2232
2233   /* Is this a forward or backward ACK?
2234    * Note: a poll should never be needed in a loopback case,
2235    * since there is no possiblility of packet loss there, so
2236    * this way of discerining FWD/BCK should not be a problem.
2237    */
2238   id = GNUNET_PEER_search (peer);
2239   if (GMP_get_short_id (get_next_hop (c)) == id)
2240   {
2241     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
2242     fc = &c->fwd_fc;
2243   }
2244   else if (GMP_get_short_id (get_prev_hop (c)) == id)
2245   {
2246     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
2247     fc = &c->bck_fc;
2248   }
2249   else
2250   {
2251     GNUNET_break_op (0);
2252     return GNUNET_OK;
2253   }
2254
2255   pid = ntohl (msg->pid);
2256   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
2257   fc->last_pid_recv = pid;
2258   fwd = fc == &c->bck_fc;
2259   GMC_send_ack (c, fwd, GNUNET_YES);
2260
2261   return GNUNET_OK;
2262 }
2263
2264
2265 /**
2266  * Send an ACK on the appropriate connection/channel, depending on
2267  * the direction and the position of the peer.
2268  *
2269  * @param c Which connection to send the hop-by-hop ACK.
2270  * @param fwd Is this a fwd ACK? (will go dest->root).
2271  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
2272  */
2273 void
2274 GMC_send_ack (struct MeshConnection *c, int fwd, int force)
2275 {
2276   unsigned int buffer;
2277
2278   LOG (GNUNET_ERROR_TYPE_DEBUG,
2279        "GMC send %s ACK on %s\n",
2280        GM_f2s (fwd), GMC_2s (c));
2281
2282   if (NULL == c)
2283   {
2284     GNUNET_break (0);
2285     return;
2286   }
2287
2288   if (GNUNET_NO != c->destroy)
2289   {
2290     LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
2291     return;
2292   }
2293
2294   /* Get available buffer space */
2295   if (GMC_is_terminal (c, fwd))
2296   {
2297     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
2298     buffer = GMT_get_channels_buffer (c->t);
2299   }
2300   else
2301   {
2302     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
2303     buffer = GMC_get_buffer (c, fwd);
2304   }
2305   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
2306   if (0 == buffer && GNUNET_NO == force)
2307     return;
2308
2309   /* Send available buffer space */
2310   if (GMC_is_origin (c, fwd))
2311   {
2312     GNUNET_assert (NULL != c->t);
2313     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2314     GMT_unchoke_channels (c->t);
2315   }
2316   else
2317   {
2318     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2319     send_ack (c, buffer, fwd, force);
2320   }
2321 }
2322
2323
2324 /**
2325  * Initialize the connections subsystem
2326  *
2327  * @param c Configuration handle.
2328  */
2329 void
2330 GMC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2331 {
2332   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2333   if (GNUNET_OK !=
2334       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_MSGS_QUEUE",
2335                                              &max_msgs_queue))
2336   {
2337     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2338                                "MESH", "MAX_MSGS_QUEUE", "MISSING");
2339     GNUNET_SCHEDULER_shutdown ();
2340     return;
2341   }
2342
2343   if (GNUNET_OK !=
2344       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_CONNECTIONS",
2345                                              &max_connections))
2346   {
2347     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2348                                "MESH", "MAX_CONNECTIONS", "MISSING");
2349     GNUNET_SCHEDULER_shutdown ();
2350     return;
2351   }
2352
2353   if (GNUNET_OK !=
2354       GNUNET_CONFIGURATION_get_value_time (c, "MESH", "REFRESH_CONNECTION_TIME",
2355                                            &refresh_connection_time))
2356   {
2357     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2358                                "MESH", "REFRESH_CONNECTION_TIME", "MISSING");
2359     GNUNET_SCHEDULER_shutdown ();
2360     return;
2361   }
2362   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2363   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES);
2364 }
2365
2366
2367 /**
2368  * Destroy each connection on shutdown.
2369  *
2370  * @param cls Closure (unused).
2371  * @param key Current key code (CID, unused).
2372  * @param value Value in the hash map (connection)
2373  *
2374  * @return #GNUNET_YES, because we should continue to iterate,
2375  */
2376 static int
2377 shutdown_iterator (void *cls,
2378                    const struct GNUNET_HashCode *key,
2379                    void *value)
2380 {
2381   struct MeshConnection *c = value;
2382
2383   GMC_destroy (c);
2384   return GNUNET_YES;
2385 }
2386
2387
2388 /**
2389  * Shut down the connections subsystem.
2390  */
2391 void
2392 GMC_shutdown (void)
2393 {
2394   GNUNET_CONTAINER_multihashmap_iterate (connections, &shutdown_iterator, NULL);
2395   GNUNET_CONTAINER_multihashmap_destroy (connections);
2396   connections = NULL;
2397 }
2398
2399
2400 struct MeshConnection *
2401 GMC_new (const struct GNUNET_HashCode *cid,
2402          struct MeshTunnel3 *t,
2403          struct MeshPeerPath *p,
2404          unsigned int own_pos)
2405 {
2406   struct MeshConnection *c;
2407
2408   c = GNUNET_new (struct MeshConnection);
2409   c->id = *cid;
2410   GNUNET_assert (GNUNET_OK ==
2411                  GNUNET_CONTAINER_multihashmap_put (connections,
2412                                                     &c->id, c,
2413                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2414   fc_init (&c->fwd_fc);
2415   fc_init (&c->bck_fc);
2416   c->fwd_fc.c = c;
2417   c->bck_fc.c = c;
2418
2419   c->t = t;
2420   GNUNET_assert (own_pos <= p->length - 1);
2421   c->own_pos = own_pos;
2422   c->path = p;
2423
2424   if (GNUNET_OK != register_neighbors (c))
2425   {
2426     if (0 == own_pos)
2427     {
2428       path_invalidate (c->path);
2429       c->t = NULL;
2430       c->path = NULL;
2431     }
2432     GMC_destroy (c);
2433     return NULL;
2434   }
2435
2436   return c;
2437 }
2438
2439
2440 void
2441 GMC_destroy (struct MeshConnection *c)
2442 {
2443   if (NULL == c)
2444   {
2445     GNUNET_break (0);
2446     return;
2447   }
2448
2449   if (2 == c->destroy) /* cancel queues -> GMP_queue_cancel -> q_destroy -> */
2450     return;            /* -> message_sent -> GMC_destroy. Don't loop. */
2451   c->destroy = 2;
2452
2453   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GMC_2s (c));
2454   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc's f: %p, b: %p\n",
2455        &c->fwd_fc, &c->bck_fc);
2456   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2457        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2458
2459   /* Cancel all traffic */
2460   if (NULL != c->path)
2461   {
2462     connection_cancel_queues (c, GNUNET_YES);
2463     connection_cancel_queues (c, GNUNET_NO);
2464     unregister_neighbors (c);
2465   }
2466
2467   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2468        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2469
2470   /* Cancel maintainance task (keepalive/timeout) */
2471   if (NULL != c->fwd_fc.poll_msg)
2472   {
2473     GMC_cancel (c->fwd_fc.poll_msg);
2474     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg FWD canceled\n");
2475   }
2476   if (NULL != c->bck_fc.poll_msg)
2477   {
2478     GMC_cancel (c->bck_fc.poll_msg);
2479     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg BCK canceled\n");
2480   }
2481
2482   /* Delete from tunnel */
2483   if (NULL != c->t)
2484     GMT_remove_connection (c->t, c);
2485
2486   if (GNUNET_NO == GMC_is_origin (c, GNUNET_YES) && NULL != c->path)
2487     path_destroy (c->path);
2488   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
2489     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
2490   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
2491     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
2492   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_fc.poll_task)
2493   {
2494     GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
2495     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL FWD canceled\n");
2496   }
2497   if (GNUNET_SCHEDULER_NO_TASK != c->bck_fc.poll_task)
2498   {
2499     GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
2500     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL BCK canceled\n");
2501   }
2502
2503   GNUNET_break (GNUNET_YES ==
2504                 GNUNET_CONTAINER_multihashmap_remove (connections, &c->id, c));
2505
2506   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
2507   GNUNET_free (c);
2508 }
2509
2510 /**
2511  * Get the connection ID.
2512  *
2513  * @param c Connection to get the ID from.
2514  *
2515  * @return ID of the connection.
2516  */
2517 const struct GNUNET_MeshHash *
2518 GMC_get_id (const struct MeshConnection *c)
2519 {
2520   return &c->id;
2521 }
2522
2523
2524 /**
2525  * Get the connection path.
2526  *
2527  * @param c Connection to get the path from.
2528  *
2529  * @return path used by the connection.
2530  */
2531 const struct MeshPeerPath *
2532 GMC_get_path (const struct MeshConnection *c)
2533 {
2534   if (GNUNET_NO == c->destroy)
2535     return c->path;
2536   return NULL;
2537 }
2538
2539
2540 /**
2541  * Get the connection state.
2542  *
2543  * @param c Connection to get the state from.
2544  *
2545  * @return state of the connection.
2546  */
2547 enum MeshConnectionState
2548 GMC_get_state (const struct MeshConnection *c)
2549 {
2550   return c->state;
2551 }
2552
2553 /**
2554  * Get the connection tunnel.
2555  *
2556  * @param c Connection to get the tunnel from.
2557  *
2558  * @return tunnel of the connection.
2559  */
2560 struct MeshTunnel3 *
2561 GMC_get_tunnel (const struct MeshConnection *c)
2562 {
2563   return c->t;
2564 }
2565
2566
2567 /**
2568  * Get free buffer space in a connection.
2569  *
2570  * @param c Connection.
2571  * @param fwd Is query about FWD traffic?
2572  *
2573  * @return Free buffer space [0 - max_msgs_queue/max_connections]
2574  */
2575 unsigned int
2576 GMC_get_buffer (struct MeshConnection *c, int fwd)
2577 {
2578   struct MeshFlowControl *fc;
2579
2580   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2581
2582   return (fc->queue_max - fc->queue_n);
2583 }
2584
2585 /**
2586  * Get how many messages have we allowed to send to us from a direction.
2587  *
2588  * @param c Connection.
2589  * @param fwd Are we asking about traffic from FWD (BCK messages)?
2590  *
2591  * @return last_ack_sent - last_pid_recv
2592  */
2593 unsigned int
2594 GMC_get_allowed (struct MeshConnection *c, int fwd)
2595 {
2596   struct MeshFlowControl *fc;
2597
2598   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2599   if (GM_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
2600   {
2601     return 0;
2602   }
2603   return (fc->last_ack_sent - fc->last_pid_recv);
2604 }
2605
2606 /**
2607  * Get messages queued in a connection.
2608  *
2609  * @param c Connection.
2610  * @param fwd Is query about FWD traffic?
2611  *
2612  * @return Number of messages queued.
2613  */
2614 unsigned int
2615 GMC_get_qn (struct MeshConnection *c, int fwd)
2616 {
2617   struct MeshFlowControl *fc;
2618
2619   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2620
2621   return fc->queue_n;
2622 }
2623
2624
2625 /**
2626  * Allow the connection to advertise a buffer of the given size.
2627  *
2628  * The connection will send an @c fwd ACK message (so: in direction !fwd)
2629  * allowing up to last_pid_recv + buffer.
2630  *
2631  * @param c Connection.
2632  * @param buffer How many more messages the connection can accept.
2633  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
2634  */
2635 void
2636 GMC_allow (struct MeshConnection *c, unsigned int buffer, int fwd)
2637 {
2638   LOG (GNUNET_ERROR_TYPE_DEBUG, "  allowing %s %u messages %s\n",
2639        GMC_2s (c), buffer, GM_f2s (fwd));
2640   send_ack (c, buffer, fwd, GNUNET_NO);
2641 }
2642
2643
2644 /**
2645  * Notify other peers on a connection of a broken link. Mark connections
2646  * to destroy after all traffic has been sent.
2647  *
2648  * @param c Connection on which there has been a disconnection.
2649  * @param peer Peer that disconnected.
2650  */
2651 void
2652 GMC_notify_broken (struct MeshConnection *c,
2653                    struct MeshPeer *peer)
2654 {
2655   int fwd;
2656
2657   LOG (GNUNET_ERROR_TYPE_DEBUG,
2658        " notify broken on %s due to %s disconnect\n",
2659        GMC_2s (c), GMP_2s (peer));
2660
2661   fwd = peer == get_prev_hop (c);
2662
2663   if (GNUNET_YES == GMC_is_terminal (c, fwd))
2664   {
2665     /* Local shutdown, no one to notify about this. */
2666     GMC_destroy (c);
2667     return;
2668   }
2669   if (GNUNET_NO == c->destroy)
2670     send_broken (c, &my_full_id, GMP_get_id (peer), fwd);
2671
2672   /* Connection will have at least one pending message
2673    * (the one we just scheduled), so no point in checking whether to
2674    * destroy immediately. */
2675   c->destroy = GNUNET_YES;
2676   c->state = MESH_CONNECTION_DESTROYED;
2677
2678   /**
2679    * Cancel all queues, if no message is left, connection will be destroyed.
2680    */
2681   connection_cancel_queues (c, !fwd);
2682
2683   return;
2684 }
2685
2686
2687 /**
2688  * Is this peer the first one on the connection?
2689  *
2690  * @param c Connection.
2691  * @param fwd Is this about fwd traffic?
2692  *
2693  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
2694  */
2695 int
2696 GMC_is_origin (struct MeshConnection *c, int fwd)
2697 {
2698   if (!fwd && c->path->length - 1 == c->own_pos )
2699     return GNUNET_YES;
2700   if (fwd && 0 == c->own_pos)
2701     return GNUNET_YES;
2702   return GNUNET_NO;
2703 }
2704
2705
2706 /**
2707  * Is this peer the last one on the connection?
2708  *
2709  * @param c Connection.
2710  * @param fwd Is this about fwd traffic?
2711  *            Note that the ROOT is the terminal for BCK traffic!
2712  *
2713  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
2714  */
2715 int
2716 GMC_is_terminal (struct MeshConnection *c, int fwd)
2717 {
2718   return GMC_is_origin (c, !fwd);
2719 }
2720
2721
2722 /**
2723  * See if we are allowed to send by the next hop in the given direction.
2724  *
2725  * @param c Connection.
2726  * @param fwd Is this about fwd traffic?
2727  *
2728  * @return #GNUNET_YES in case it's OK to send.
2729  */
2730 int
2731 GMC_is_sendable (struct MeshConnection *c, int fwd)
2732 {
2733   struct MeshFlowControl *fc;
2734
2735   if (NULL == c)
2736   {
2737     GNUNET_break (0);
2738     return GNUNET_YES;
2739   }
2740   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2741   if (GM_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2742     return GNUNET_YES;
2743   return GNUNET_NO;
2744 }
2745
2746 /**
2747  * Sends an already built message on a connection, properly registering
2748  * all used resources.
2749  *
2750  * @param message Message to send. Function makes a copy of it.
2751  *                If message is not hop-by-hop, decrements TTL of copy.
2752  * @param c Connection on which this message is transmitted.
2753  * @param fwd Is this a fwd message?
2754  * @param force Force the connection to accept the message (buffer overfill).
2755  * @param cont Continuation called once message is sent. Can be NULL.
2756  * @param cont_cls Closure for @c cont.
2757  *
2758  * @return Handle to cancel the message before it's sent.
2759  *         NULL on error or if @c cont is NULL.
2760  *         Invalid on @c cont call.
2761  */
2762 struct MeshConnectionQueue *
2763 GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2764                            struct MeshConnection *c, int fwd, int force,
2765                            GMC_sent cont, void *cont_cls)
2766 {
2767   struct MeshFlowControl *fc;
2768   struct MeshConnectionQueue *q;
2769   void *data;
2770   size_t size;
2771   uint16_t type;
2772   int droppable;
2773
2774   size = ntohs (message->size);
2775   data = GNUNET_malloc (size);
2776   memcpy (data, message, size);
2777   type = ntohs (message->type);
2778   LOG (GNUNET_ERROR_TYPE_INFO, "-> %s on connection %s (%u bytes)\n",
2779        GM_m2s (type), GMC_2s (c), size);
2780
2781   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2782   droppable = GNUNET_NO == force;
2783   switch (type)
2784   {
2785     struct GNUNET_MESH_Encrypted *emsg;
2786     struct GNUNET_MESH_KX        *kmsg;
2787     struct GNUNET_MESH_ACK       *amsg;
2788     struct GNUNET_MESH_Poll      *pmsg;
2789     struct GNUNET_MESH_ConnectionDestroy *dmsg;
2790     struct GNUNET_MESH_ConnectionBroken  *bmsg;
2791     uint32_t ttl;
2792
2793     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
2794       emsg = (struct GNUNET_MESH_Encrypted *) data;
2795       ttl = ntohl (emsg->ttl);
2796       if (0 == ttl)
2797       {
2798         GNUNET_break_op (0);
2799         GNUNET_free (data);
2800         return NULL;
2801       }
2802       emsg->cid = c->id;
2803       emsg->ttl = htonl (ttl - 1);
2804       emsg->pid = htonl (fc->next_pid++);
2805       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
2806       if (GNUNET_YES == droppable)
2807       {
2808         fc->queue_n++;
2809         LOG (GNUNET_ERROR_TYPE_DEBUG, "pid %u\n", ntohl (emsg->pid));
2810         LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
2811         LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
2812       }
2813       else
2814       {
2815         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not droppable, Q_N stays the same\n");
2816       }
2817       if (GM_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
2818       {
2819         GMC_start_poll (c, fwd);
2820       }
2821       break;
2822
2823     case GNUNET_MESSAGE_TYPE_MESH_KX:
2824       kmsg = (struct GNUNET_MESH_KX *) data;
2825       kmsg->cid = c->id;
2826       break;
2827
2828     case GNUNET_MESSAGE_TYPE_MESH_ACK:
2829       amsg = (struct GNUNET_MESH_ACK *) data;
2830       amsg->cid = c->id;
2831       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
2832       droppable = GNUNET_NO;
2833       break;
2834
2835     case GNUNET_MESSAGE_TYPE_MESH_POLL:
2836       pmsg = (struct GNUNET_MESH_Poll *) data;
2837       pmsg->cid = c->id;
2838       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
2839       droppable = GNUNET_NO;
2840       break;
2841
2842     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY:
2843       dmsg = (struct GNUNET_MESH_ConnectionDestroy *) data;
2844       dmsg->cid = c->id;
2845       break;
2846
2847     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
2848       bmsg = (struct GNUNET_MESH_ConnectionBroken *) data;
2849       bmsg->cid = c->id;
2850       break;
2851
2852     case GNUNET_MESSAGE_TYPE_MESH_KEEPALIVE:
2853       GNUNET_break (0);
2854       /* falltrough */
2855     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
2856     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
2857       break;
2858
2859     default:
2860       GNUNET_break (0);
2861       GNUNET_free (data);
2862       return NULL;
2863   }
2864
2865   if (fc->queue_n > fc->queue_max && droppable)
2866   {
2867     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
2868                               1, GNUNET_NO);
2869     GNUNET_break (0);
2870     LOG (GNUNET_ERROR_TYPE_DEBUG,
2871                 "queue full: %u/%u\n",
2872                 fc->queue_n, fc->queue_max);
2873     if (GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED == type)
2874     {
2875       fc->queue_n--;
2876       fc->next_pid--;
2877     }
2878     GNUNET_free (data);
2879     return NULL; /* Drop this message */
2880   }
2881
2882   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u\n", c, c->pending_messages);
2883   c->pending_messages++;
2884
2885   q = GNUNET_new (struct MeshConnectionQueue);
2886   q->forced = !droppable;
2887   q->q = GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd,
2888                         &message_sent, q);
2889   if (NULL == q->q)
2890   {
2891     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING dropping msg on %s\n", GMC_2s (c));
2892     GNUNET_free (data);
2893     GNUNET_free (q);
2894     return NULL;
2895   }
2896   q->cont = cont;
2897   q->cont_cls = cont_cls;
2898   return q;
2899 }
2900
2901
2902 /**
2903  * Cancel a previously sent message while it's in the queue.
2904  *
2905  * ONLY can be called before the continuation given to the send function
2906  * is called. Once the continuation is called, the message is no longer in the
2907  * queue.
2908  *
2909  * @param q Handle to the queue.
2910  */
2911 void
2912 GMC_cancel (struct MeshConnectionQueue *q)
2913 {
2914   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GMC cancel message\n");
2915
2916   /* queue destroy calls message_sent, which calls q->cont and frees q */
2917   GMP_queue_destroy (q->q, GNUNET_YES);
2918 }
2919
2920
2921 /**
2922  * Sends a CREATE CONNECTION message for a path to a peer.
2923  * Changes the connection and tunnel states if necessary.
2924  *
2925  * @param connection Connection to create.
2926  */
2927 void
2928 GMC_send_create (struct MeshConnection *connection)
2929 {
2930   enum MeshTunnel3CState state;
2931   size_t size;
2932
2933   size = sizeof (struct GNUNET_MESH_ConnectionCreate);
2934   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
2935
2936   LOG (GNUNET_ERROR_TYPE_INFO, "=> %s on connection %s  (%u bytes)\n",
2937        GM_m2s (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE),
2938        GMC_2s (connection), size);
2939   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
2940        connection, connection->pending_messages);
2941   connection->pending_messages++;
2942
2943   connection->maintenance_q =
2944     GMP_queue_add (get_next_hop (connection), NULL,
2945                    GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE,
2946                    size, connection, GNUNET_YES, &message_sent, NULL);
2947
2948   state = GMT_get_cstate (connection->t);
2949   if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state)
2950     GMT_change_cstate (connection->t, MESH_TUNNEL3_WAITING);
2951   if (MESH_CONNECTION_NEW == connection->state)
2952     connection_change_state (connection, MESH_CONNECTION_SENT);
2953 }
2954
2955
2956 /**
2957  * Send a message to all peers in this connection that the connection
2958  * is no longer valid.
2959  *
2960  * If some peer should not receive the message, it should be zero'ed out
2961  * before calling this function.
2962  *
2963  * @param c The connection whose peers to notify.
2964  */
2965 void
2966 GMC_send_destroy (struct MeshConnection *c)
2967 {
2968   struct GNUNET_MESH_ConnectionDestroy msg;
2969
2970   if (GNUNET_YES == c->destroy)
2971     return;
2972
2973   msg.header.size = htons (sizeof (msg));
2974   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY);;
2975   msg.cid = c->id;
2976   LOG (GNUNET_ERROR_TYPE_DEBUG,
2977               "  sending connection destroy for connection %s\n",
2978               GMC_2s (c));
2979
2980   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_YES))
2981     GMC_send_prebuilt_message (&msg.header, c,
2982                                GNUNET_YES, GNUNET_YES, NULL, NULL);
2983   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO))
2984     GMC_send_prebuilt_message (&msg.header, c,
2985                                GNUNET_NO, GNUNET_YES, NULL, NULL);
2986   c->destroy = GNUNET_YES;
2987   c->state = MESH_CONNECTION_DESTROYED;
2988 }
2989
2990
2991 /**
2992  * @brief Start a polling timer for the connection.
2993  *
2994  * When a neighbor does not accept more traffic on the connection it could be
2995  * caused by a simple congestion or by a lost ACK. Polling enables to check
2996  * for the lastest ACK status for a connection.
2997  *
2998  * @param c Connection.
2999  * @param fwd Should we poll in the FWD direction?
3000  */
3001 void
3002 GMC_start_poll (struct MeshConnection *c, int fwd)
3003 {
3004   struct MeshFlowControl *fc;
3005
3006   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3007   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL %s requested\n",
3008        GM_f2s (fwd));
3009   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task || NULL != fc->poll_msg)
3010   {
3011     LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   not needed (%u, %p)\n",
3012          fc->poll_task, fc->poll_msg);
3013     return;
3014   }
3015   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL started on request\n");
3016   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
3017                                                 &connection_poll,
3018                                                 fc);
3019 }
3020
3021
3022 /**
3023  * @brief Stop polling a connection for ACKs.
3024  *
3025  * Once we have enough ACKs for future traffic, polls are no longer necessary.
3026  *
3027  * @param c Connection.
3028  * @param fwd Should we stop the poll in the FWD direction?
3029  */
3030 void
3031 GMC_stop_poll (struct MeshConnection *c, int fwd)
3032 {
3033   struct MeshFlowControl *fc;
3034
3035   fc = fwd ? &c->fwd_fc : &c->bck_fc;
3036   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
3037   {
3038     GNUNET_SCHEDULER_cancel (fc->poll_task);
3039     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
3040   }
3041 }
3042
3043 /**
3044  * Get a (static) string for a connection.
3045  *
3046  * @param c Connection.
3047  */
3048 const char *
3049 GMC_2s (const struct MeshConnection *c)
3050 {
3051   if (NULL == c)
3052     return "NULL";
3053
3054   if (NULL != c->t)
3055   {
3056     static char buf[128];
3057
3058     sprintf (buf, "%s (->%s)", GNUNET_h2s (&c->id), GMT_2s (c->t));
3059     return buf;
3060   }
3061   return GNUNET_h2s (&c->id);
3062 }