- fix core_disconnect handler to avoid reusing the direct path being destroyed
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh_connection.c
1 /*
2      This file is part of GNUnet.
3      (C) 2001-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file mesh/gnunet-service-mesh_connection.c
23  * @brief GNUnet MESH service connection handling
24  * @author Bartlomiej Polot
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30 #include "gnunet_statistics_service.h"
31
32 #include "mesh_path.h"
33 #include "mesh_protocol.h"
34 #include "mesh.h"
35 #include "gnunet-service-mesh_connection.h"
36 #include "gnunet-service-mesh_peer.h"
37 #include "gnunet-service-mesh_tunnel.h"
38
39
40 #define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__)
41
42 #define MESH_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
43                                   GNUNET_TIME_UNIT_MINUTES,\
44                                   10)
45 #define AVG_MSGS                32
46
47
48 /******************************************************************************/
49 /********************************   STRUCTS  **********************************/
50 /******************************************************************************/
51
52 /**
53  * Struct to encapsulate all the Flow Control information to a peer to which
54  * we are directly connected (on a core level).
55  */
56 struct MeshFlowControl
57 {
58   /**
59    * Connection this controls.
60    */
61   struct MeshConnection *c;
62
63   /**
64    * How many messages are in the queue on this connection.
65    */
66   unsigned int queue_n;
67
68   /**
69    * How many messages do we accept in the queue.
70    */
71   unsigned int queue_max;
72
73   /**
74    * Next ID to use.
75    */
76   uint32_t next_pid;
77
78   /**
79    * ID of the last packet sent towards the peer.
80    */
81   uint32_t last_pid_sent;
82
83   /**
84    * ID of the last packet received from the peer.
85    */
86   uint32_t last_pid_recv;
87
88   /**
89    * Last ACK sent to the peer (peer can't send more than this PID).
90    */
91   uint32_t last_ack_sent;
92
93   /**
94    * Last ACK sent towards the origin (for traffic towards leaf node).
95    */
96   uint32_t last_ack_recv;
97
98   /**
99    * Task to poll the peer in case of a lost ACK causes stall.
100    */
101   GNUNET_SCHEDULER_TaskIdentifier poll_task;
102
103   /**
104    * How frequently to poll for ACKs.
105    */
106   struct GNUNET_TIME_Relative poll_time;
107
108   /**
109    * Queued poll message, to cancel if not necessary anymore (got ACK).
110    */
111   struct MeshConnectionQueue *poll_msg;
112
113   /**
114    * Queued poll message, to cancel if not necessary anymore (got ACK).
115    */
116   struct MeshConnectionQueue *ack_msg;
117 };
118
119 /**
120  * Keep a record of the last messages sent on this connection.
121  */
122 struct MeshConnectionPerformance
123 {
124   /**
125    * Circular buffer for storing measurements.
126    */
127   double usecsperbyte[AVG_MSGS];
128
129   /**
130    * Running average of @c usecsperbyte.
131    */
132   double avg;
133
134   /**
135    * How many values of @c usecsperbyte are valid.
136    */
137   uint16_t size;
138
139   /**
140    * Index of the next "free" position in @c usecsperbyte.
141    */
142   uint16_t idx;
143 };
144
145
146 /**
147  * Struct containing all information regarding a connection to a peer.
148  */
149 struct MeshConnection
150 {
151   /**
152    * Tunnel this connection is part of.
153    */
154   struct MeshTunnel3 *t;
155
156   /**
157    * Flow control information for traffic fwd.
158    */
159   struct MeshFlowControl fwd_fc;
160
161   /**
162    * Flow control information for traffic bck.
163    */
164   struct MeshFlowControl bck_fc;
165
166   /**
167    * Measure connection performance on the endpoint.
168    */
169   struct MeshConnectionPerformance *perf;
170
171   /**
172    * ID of the connection.
173    */
174   struct GNUNET_HashCode id;
175
176   /**
177    * State of the connection.
178    */
179   enum MeshConnectionState state;
180
181   /**
182    * Path being used for the tunnel. At the origin of the connection
183    * it's a pointer to the destination's path pool, otherwise just a copy.
184    */
185   struct MeshPeerPath *path;
186
187   /**
188    * Position of the local peer in the path.
189    */
190   unsigned int own_pos;
191
192   /**
193    * Task to keep the used paths alive at the owner,
194    * time tunnel out on all the other peers.
195    */
196   GNUNET_SCHEDULER_TaskIdentifier fwd_maintenance_task;
197
198   /**
199    * Task to keep the used paths alive at the destination,
200    * time tunnel out on all the other peers.
201    */
202   GNUNET_SCHEDULER_TaskIdentifier bck_maintenance_task;
203
204   /**
205    * Pending message count.
206    */
207   int pending_messages;
208
209   /**
210    * Destroy flag: if true, destroy on last message.
211    */
212   int destroy;
213 };
214
215 /**
216  * Handle for messages queued but not yet sent.
217  */
218 struct MeshConnectionQueue
219 {
220   /**
221    * Peer queue handle, to cancel if necessary.
222    */
223   struct MeshPeerQueue *q;
224
225   /**
226    * Was this a forced message? (Do not account for it)
227    */
228   int forced;
229
230   /**
231    * Continuation to call once sent.
232    */
233   GMC_sent cont;
234
235   /**
236    * Closure for @c cont.
237    */
238   void *cont_cls;
239 };
240
241 /******************************************************************************/
242 /*******************************   GLOBALS  ***********************************/
243 /******************************************************************************/
244
245 /**
246  * Global handle to the statistics service.
247  */
248 extern struct GNUNET_STATISTICS_Handle *stats;
249
250 /**
251  * Local peer own ID (memory efficient handle).
252  */
253 extern GNUNET_PEER_Id myid;
254
255 /**
256  * Local peer own ID (full value).
257  */
258 extern struct GNUNET_PeerIdentity my_full_id;
259
260 /**
261  * Connections known, indexed by cid (MeshConnection).
262  */
263 static struct GNUNET_CONTAINER_MultiHashMap *connections;
264
265 /**
266  * How many connections are we willing to maintain.
267  * Local connections are always allowed, even if there are more connections than max.
268  */
269 static unsigned long long max_connections;
270
271 /**
272  * How many messages *in total* are we willing to queue, divide by number of
273  * connections to get connection queue size.
274  */
275 static unsigned long long max_msgs_queue;
276
277 /**
278  * How often to send path keepalives. Paths timeout after 4 missed.
279  */
280 static struct GNUNET_TIME_Relative refresh_connection_time;
281
282 /**
283  * How often to send path create / ACKs.
284  */
285 static struct GNUNET_TIME_Relative create_connection_time;
286
287
288 /******************************************************************************/
289 /********************************   STATIC  ***********************************/
290 /******************************************************************************/
291
292 #if 0 // avoid compiler warning for unused static function
293 static void
294 fc_debug (struct MeshFlowControl *fc)
295 {
296   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
297               fc->last_pid_recv, fc->last_ack_sent);
298   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
299               fc->last_pid_sent, fc->last_ack_recv);
300   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
301               fc->queue_n, fc->queue_max);
302 }
303
304 static void
305 connection_debug (struct MeshConnection *c)
306 {
307   if (NULL == c)
308   {
309     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CONNECTION ***\n");
310     return;
311   }
312   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
313               peer2s (c->t->peer), GMC_2s (c));
314   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
315               c->state, c->pending_messages);
316   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
317   fc_debug (&c->fwd_fc);
318   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
319   fc_debug (&c->bck_fc);
320 }
321 #endif
322
323 /**
324  * Get string description for tunnel state.
325  *
326  * @param s Tunnel state.
327  *
328  * @return String representation.
329  */
330 static const char *
331 GMC_state2s (enum MeshConnectionState s)
332 {
333   switch (s)
334   {
335     case MESH_CONNECTION_NEW:
336       return "MESH_CONNECTION_NEW";
337     case MESH_CONNECTION_SENT:
338       return "MESH_CONNECTION_SENT";
339     case MESH_CONNECTION_ACK:
340       return "MESH_CONNECTION_ACK";
341     case MESH_CONNECTION_READY:
342       return "MESH_CONNECTION_READY";
343     case MESH_CONNECTION_DESTROYED:
344       return "MESH_CONNECTION_DESTROYED";
345     default:
346       return "MESH_CONNECTION_STATE_ERROR";
347   }
348 }
349
350
351 /**
352  * Initialize a Flow Control structure to the initial state.
353  *
354  * @param fc Flow Control structure to initialize.
355  */
356 static void
357 fc_init (struct MeshFlowControl *fc)
358 {
359   fc->next_pid = 0;
360   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
361   fc->last_pid_recv = (uint32_t) -1;
362   fc->last_ack_sent = (uint32_t) 0;
363   fc->last_ack_recv = (uint32_t) 0;
364   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
365   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
366   fc->queue_n = 0;
367   fc->queue_max = (max_msgs_queue / max_connections) + 1;
368 }
369
370
371 /**
372  * Find a connection.
373  *
374  * @param cid Connection ID.
375  */
376 static struct MeshConnection *
377 connection_get (const struct GNUNET_HashCode *cid)
378 {
379   return GNUNET_CONTAINER_multihashmap_get (connections, cid);
380 }
381
382
383 static void
384 connection_change_state (struct MeshConnection* c,
385                          enum MeshConnectionState state)
386 {
387   LOG (GNUNET_ERROR_TYPE_DEBUG,
388               "Connection %s state was %s\n",
389               GMC_2s (c), GMC_state2s (c->state));
390   if (MESH_CONNECTION_DESTROYED == c->state)
391   {
392     LOG (GNUNET_ERROR_TYPE_DEBUG, "state not changing anymore\n");
393     return;
394   }
395   LOG (GNUNET_ERROR_TYPE_DEBUG,
396               "Connection %s state is now %s\n",
397               GMC_2s (c), GMC_state2s (state));
398   c->state = state;
399 }
400
401
402 /**
403  * Callback called when a queued ACK message is sent.
404  *
405  * @param cls Closure (FC).
406  * @param c Connection this message was on.
407  * @param q Queue handler this call invalidates.
408  * @param type Type of message sent.
409  * @param fwd Was this a FWD going message?
410  * @param size Size of the message.
411  */
412 static void
413 ack_sent (void *cls,
414           struct MeshConnection *c,
415           struct MeshConnectionQueue *q,
416           uint16_t type, int fwd, size_t size)
417 {
418   struct MeshFlowControl *fc = cls;
419
420   fc->ack_msg = NULL;
421 }
422
423
424 /**
425  * Send an ACK on the connection, informing the predecessor about
426  * the available buffer space. Should not be called in case the peer
427  * is origin (no predecessor) in the @c fwd direction.
428  *
429  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
430  * the ACK itself goes "back" (dest->root).
431  *
432  * @param c Connection on which to send the ACK.
433  * @param buffer How much space free to advertise?
434  * @param fwd Is this FWD ACK? (Going dest -> root)
435  * @param force Don't optimize out.
436  */
437 static void
438 send_ack (struct MeshConnection *c, unsigned int buffer, int fwd, int force)
439 {
440   struct MeshFlowControl *next_fc;
441   struct MeshFlowControl *prev_fc;
442   struct GNUNET_MESH_ACK msg;
443   uint32_t ack;
444   int delta;
445
446   /* If origin, there is no connection to send ACKs. Wrong function! */
447   if (GMC_is_origin (c, fwd))
448   {
449     LOG (GNUNET_ERROR_TYPE_DEBUG, "connection %s is origin in %s\n",
450          GMC_2s (c), GM_f2s (fwd));
451     GNUNET_break (0);
452     return;
453   }
454
455   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
456   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
457
458   LOG (GNUNET_ERROR_TYPE_DEBUG, "connection send %s ack on %s\n",
459        GM_f2s (fwd), GMC_2s (c));
460
461   /* Check if we need to transmit the ACK. */
462   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
463   if (3 < delta && buffer < delta && GNUNET_NO == force)
464   {
465     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
466     LOG (GNUNET_ERROR_TYPE_DEBUG,
467          "  last pid recv: %u, last ack sent: %u\n",
468          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
469     return;
470   }
471
472   /* Ok, ACK might be necessary, what PID to ACK? */
473   ack = prev_fc->last_pid_recv + buffer;
474   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
475   LOG (GNUNET_ERROR_TYPE_DEBUG,
476        " last pid %u, last ack %u, qmax %u, q %u\n",
477        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
478        next_fc->queue_max, next_fc->queue_n);
479   if (ack == prev_fc->last_ack_sent && GNUNET_NO == force)
480   {
481     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
482     return;
483   }
484
485   /* Check if message is already in queue */
486   if (NULL != prev_fc->ack_msg)
487   {
488     if (GM_is_pid_bigger (ack, prev_fc->last_ack_sent))
489     {
490       LOG (GNUNET_ERROR_TYPE_DEBUG, " canceling old ACK\n");
491       GMC_cancel (prev_fc->ack_msg);
492       /* GMC_cancel triggers ack_sent(), which clears fc->ack_msg */
493     }
494     else
495     {
496       LOG (GNUNET_ERROR_TYPE_DEBUG, " same ACK already in queue\n");
497       return;
498     }
499   }
500
501   prev_fc->last_ack_sent = ack;
502
503   /* Build ACK message and send on connection */
504   msg.header.size = htons (sizeof (msg));
505   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
506   msg.ack = htonl (ack);
507   msg.cid = c->id;
508
509   prev_fc->ack_msg = GMC_send_prebuilt_message (&msg.header, c,
510                                                 !fwd, GNUNET_YES,
511                                                 &ack_sent, prev_fc);
512 }
513
514
515 /**
516  * Callback called when a queued message is sent.
517  *
518  * Calculates the average time and connection packet tracking.
519  *
520  * @param cls Closure (ConnectionQueue Handle).
521  * @param c Connection this message was on.
522  * @param type Type of message sent.
523  * @param fwd Was this a FWD going message?
524  * @param size Size of the message.
525  * @param wait Time spent waiting for core (only the time for THIS message)
526  */
527 static void
528 message_sent (void *cls,
529               struct MeshConnection *c, uint16_t type,
530               int fwd, size_t size,
531               struct GNUNET_TIME_Relative wait)
532 {
533   struct MeshConnectionPerformance *p;
534   struct MeshFlowControl *fc;
535   struct MeshConnectionQueue *q = cls;
536   double usecsperbyte;
537   int forced;
538
539   fc = fwd ? &c->fwd_fc : &c->bck_fc;
540   LOG (GNUNET_ERROR_TYPE_DEBUG,
541        "!  sent %s %s\n",
542        GM_f2s (fwd),
543        GM_m2s (type));
544   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  C_P- %p %u\n", c, c->pending_messages);
545   if (NULL != q)
546   {
547     forced = q->forced;
548     if (NULL != q->cont)
549     {
550       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  calling cont\n");
551       q->cont (q->cont_cls, c, q, type, fwd, size);
552     }
553     GNUNET_free (q);
554   }
555   else if (type == GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED)
556   {
557     /* If NULL == q and ENCRYPTED == type, message must have been ch_mngmnt */
558     forced = GNUNET_YES;
559   }
560   else
561   {
562     forced = GNUNET_NO;
563   }
564   c->pending_messages--;
565   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
566   {
567     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
568     GMC_destroy (c);
569     return;
570   }
571   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
572   switch (type)
573   {
574     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
575       fc->last_pid_sent++;
576       LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
577       if (GNUNET_NO == forced)
578       {
579         fc->queue_n--;
580         LOG (GNUNET_ERROR_TYPE_DEBUG,
581             "!   accounting pid %u\n",
582             fc->last_pid_sent);
583       }
584       else
585       {
586         LOG (GNUNET_ERROR_TYPE_DEBUG,
587              "!   forced, Q_N not accounting pid %u\n",
588              fc->last_pid_sent);
589       }
590       GMC_send_ack (c, fwd, GNUNET_NO);
591       break;
592
593     case GNUNET_MESSAGE_TYPE_MESH_POLL:
594       fc->poll_msg = NULL;
595       break;
596
597     case GNUNET_MESSAGE_TYPE_MESH_ACK:
598       fc->ack_msg = NULL;
599       break;
600
601     default:
602       break;
603   }
604   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
605
606   if (NULL == c->perf)
607     return; /* Only endpoints are interested in timing. */
608
609   p = c->perf;
610   usecsperbyte = ((double) wait.rel_value_us) / size;
611   if (p->size == AVG_MSGS)
612   {
613     /* Array is full. Substract oldest value, add new one and store. */
614     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
615     p->usecsperbyte[p->idx] = usecsperbyte;
616     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
617   }
618   else
619   {
620     /* Array not yet full. Add current value to avg and store. */
621     p->usecsperbyte[p->idx] = usecsperbyte;
622     p->avg *= p->size;
623     p->avg += p->usecsperbyte[p->idx];
624     p->size++;
625     p->avg /= p->size;
626   }
627   p->idx = (p->idx + 1) % AVG_MSGS;
628 }
629
630
631 /**
632  * Get the previous hop in a connection
633  *
634  * @param c Connection.
635  *
636  * @return Previous peer in the connection.
637  */
638 static struct MeshPeer *
639 get_prev_hop (const struct MeshConnection *c)
640 {
641   GNUNET_PEER_Id id;
642
643   LOG (GNUNET_ERROR_TYPE_DEBUG, "  get prev hop %s [%u/%u]\n",
644        GMC_2s (c), c->own_pos, c->path->length);
645   if (0 == c->own_pos || c->path->length < 2)
646   {
647     LOG (GNUNET_ERROR_TYPE_DEBUG, "  own pos is zero\n");
648     id = c->path->peers[0];
649   }
650   else
651   {
652     LOG (GNUNET_ERROR_TYPE_DEBUG, "  own pos is NOT zero\n");
653     id = c->path->peers[c->own_pos - 1];
654   }
655
656   LOG (GNUNET_ERROR_TYPE_DEBUG, "  id: %u\n", id);
657   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s\n", GNUNET_i2s (GNUNET_PEER_resolve2 (id)));
658   return GMP_get_short (id);
659 }
660
661
662 /**
663  * Get the next hop in a connection
664  *
665  * @param c Connection.
666  *
667  * @return Next peer in the connection.
668  */
669 static struct MeshPeer *
670 get_next_hop (const struct MeshConnection *c)
671 {
672   GNUNET_PEER_Id id;
673
674   LOG (GNUNET_ERROR_TYPE_DEBUG, "  get next hop %s [%u/%u]\n",
675        GMC_2s (c), c->own_pos, c->path->length);
676   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
677   {
678     LOG (GNUNET_ERROR_TYPE_DEBUG, "  own pos is end\n");
679     id = c->path->peers[c->path->length - 1];
680   }
681   else
682   {
683     LOG (GNUNET_ERROR_TYPE_DEBUG, "  own pos is NOT end\n");
684     id = c->path->peers[c->own_pos + 1];
685   }
686
687   LOG (GNUNET_ERROR_TYPE_DEBUG, "  id: %u\n", id);
688   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ID: %s\n", GNUNET_i2s (GNUNET_PEER_resolve2 (id)));
689
690   return GMP_get_short (id);
691 }
692
693
694 /**
695  * Get the hop in a connection.
696  *
697  * @param c Connection.
698  * @param fwd Next hop?
699  *
700  * @return Next peer in the connection.
701  */
702 static struct MeshPeer *
703 get_hop (struct MeshConnection *c, int fwd)
704 {
705   if (fwd)
706     return get_next_hop (c);
707   return get_prev_hop (c);
708 }
709
710
711 /**
712  * Is traffic coming from this sender 'FWD' traffic?
713  *
714  * @param c Connection to check.
715  * @param sender Peer identity of neighbor.
716  *
717  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
718  *         the traffic is 'FWD'.
719  *         #GNUNET_NO for BCK.
720  *         #GNUNET_SYSERR for errors.
721  */
722 static int
723 is_fwd (const struct MeshConnection *c,
724         const struct GNUNET_PeerIdentity *sender)
725 {
726   GNUNET_PEER_Id id;
727
728   id = GNUNET_PEER_search (sender);
729   if (GMP_get_short_id (get_prev_hop (c)) == id)
730     return GNUNET_YES;
731
732   if (GMP_get_short_id (get_next_hop (c)) == id)
733     return GNUNET_NO;
734
735   GNUNET_break (0);
736   return GNUNET_SYSERR;
737 }
738
739
740 /**
741  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
742  * or a first CONNECTION_ACK directed to us.
743  *
744  * @param connection Connection to confirm.
745  * @param fwd Should we send it FWD? (root->dest)
746  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
747  */
748 static void
749 send_connection_ack (struct MeshConnection *connection, int fwd)
750 {
751   struct MeshTunnel3 *t;
752
753   t = connection->t;
754   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection %s ACK\n",
755        GM_f2s (!fwd));
756   GMP_queue_add (get_hop (connection, fwd), NULL,
757                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
758                  sizeof (struct GNUNET_MESH_ConnectionACK),
759                  connection, fwd, &message_sent, NULL);
760   connection->pending_messages++;
761   if (MESH_TUNNEL3_NEW == GMT_get_cstate (t))
762     GMT_change_cstate (t, MESH_TUNNEL3_WAITING);
763   if (MESH_CONNECTION_READY != connection->state)
764     connection_change_state (connection, MESH_CONNECTION_SENT);
765 }
766
767
768 /**
769  * Send a notification that a connection is broken.
770  *
771  * @param c Connection that is broken.
772  * @param id1 Peer that has disconnected.
773  * @param id2 Peer that has disconnected.
774  * @param fwd Direction towards which to send it.
775  */
776 static void
777 send_broken (struct MeshConnection *c,
778              const struct GNUNET_PeerIdentity *id1,
779              const struct GNUNET_PeerIdentity *id2,
780              int fwd)
781 {
782   struct GNUNET_MESH_ConnectionBroken msg;
783
784   msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
785   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
786   msg.cid = c->id;
787   msg.peer1 = *id1;
788   msg.peer2 = *id2;
789   GMC_send_prebuilt_message (&msg.header, c, fwd, GNUNET_YES, NULL, NULL);
790 }
791
792
793 /**
794  * Send keepalive packets for a connection.
795  *
796  * @param c Connection to keep alive..
797  * @param fwd Is this a FWD keepalive? (owner -> dest).
798  */
799 static void
800 connection_keepalive (struct MeshConnection *c, int fwd)
801 {
802   struct GNUNET_MESH_ConnectionKeepAlive *msg;
803   size_t size = sizeof (struct GNUNET_MESH_ConnectionKeepAlive);
804   char cbuf[size];
805
806   LOG (GNUNET_ERROR_TYPE_DEBUG,
807        "sending %s keepalive for connection %s]\n",
808        GM_f2s (fwd), GMC_2s (c));
809
810   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) cbuf;
811   msg->header.size = htons (size);
812   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_KEEPALIVE);
813   msg->cid = c->id;
814   msg->reserved = htonl (0);
815
816   GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_YES, NULL, NULL);
817 }
818
819
820 /**
821  * Send CONNECTION_{CREATE/ACK} packets for a connection.
822  *
823  * @param c Connection for which to send the message.
824  * @param fwd If #GNUNET_YES, send CREATE, otherwise send ACK.
825  */
826 static void
827 connection_recreate (struct MeshConnection *c, int fwd)
828 {
829   LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
830   if (fwd)
831     GMC_send_create (c);
832   else
833     send_connection_ack (c, GNUNET_NO);
834 }
835
836
837 /**
838  * Generic connection timer management.
839  * Depending on the role of the peer in the connection will send the
840  * appropriate message (build or keepalive)
841  *
842  * @param c Conncetion to maintain.
843  * @param fwd Is FWD?
844  */
845 static void
846 connection_maintain (struct MeshConnection *c, int fwd)
847 {
848   if (GNUNET_NO != c->destroy)
849     return;
850
851   if (MESH_TUNNEL3_SEARCHING == GMT_get_cstate (c->t))
852   {
853     /* TODO DHT GET with RO_BART */
854     return;
855   }
856   switch (c->state)
857   {
858     case MESH_CONNECTION_NEW:
859       GNUNET_break (0);
860       /* fall-through */
861     case MESH_CONNECTION_SENT:
862       connection_recreate (c, fwd);
863       break;
864     case MESH_CONNECTION_READY:
865       connection_keepalive (c, fwd);
866       break;
867     default:
868       break;
869   }
870 }
871
872
873 static void
874 connection_fwd_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
875 {
876   struct MeshConnection *c = cls;
877   struct GNUNET_TIME_Relative delay;
878
879   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
880   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
881     return;
882
883   connection_maintain (c, GNUNET_YES);
884   delay = c->state == MESH_CONNECTION_READY ?
885           refresh_connection_time : create_connection_time;
886   c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
887                                                           &connection_fwd_keepalive,
888                                                           c);
889 }
890
891
892 static void
893 connection_bck_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
894 {
895   struct MeshConnection *c = cls;
896   struct GNUNET_TIME_Relative delay;
897
898   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
899   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
900     return;
901
902   connection_maintain (c, GNUNET_NO);
903   delay = c->state == MESH_CONNECTION_READY ?
904           refresh_connection_time : create_connection_time;
905   c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (delay,
906                                                           &connection_bck_keepalive,
907                                                           c);
908 }
909
910
911 /**
912  * @brief Re-initiate traffic on this connection if necessary.
913  *
914  * Check if there is traffic queued towards this peer
915  * and the core transmit handle is NULL (traffic was stalled).
916  * If so, call core tmt rdy.
917  *
918  * @param c Connection on which initiate traffic.
919  * @param fwd Is this about fwd traffic?
920  */
921 static void
922 connection_unlock_queue (struct MeshConnection *c, int fwd)
923 {
924   struct MeshPeer *peer;
925
926   LOG (GNUNET_ERROR_TYPE_DEBUG,
927               "connection_unlock_queue %s on %s\n",
928               GM_f2s (fwd), GMC_2s (c));
929
930   if (GMC_is_terminal (c, fwd))
931   {
932     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal!\n");
933     return;
934   }
935
936   peer = get_hop (c, fwd);
937   GMP_queue_unlock (peer, c);
938 }
939
940
941 /**
942  * Cancel all transmissions that belong to a certain connection.
943  *
944  * If the connection is scheduled for destruction and no more messages are left,
945  * the connection will be destroyed by the continuation call.
946  *
947  * @param c Connection which to cancel. Might be destroyed during this call.
948  * @param fwd Cancel fwd traffic?
949  */
950 static void
951 connection_cancel_queues (struct MeshConnection *c, int fwd)
952 {
953   struct MeshFlowControl *fc;
954   struct MeshPeer *peer;
955
956   LOG (GNUNET_ERROR_TYPE_DEBUG,
957        " *** Cancel %s queues for connection %s\n",
958        GM_f2s (fwd), GMC_2s (c));
959   if (NULL == c)
960   {
961     GNUNET_break (0);
962     return;
963   }
964
965   fc = fwd ? &c->fwd_fc : &c->bck_fc;
966   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
967   {
968     GNUNET_SCHEDULER_cancel (fc->poll_task);
969     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
970     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Cancel POLL in ccq for fc %p\n", fc);
971   }
972   peer = get_hop (c, fwd);
973   GMP_queue_cancel (peer, c);
974 }
975
976
977 /**
978  * Function called if a connection has been stalled for a while,
979  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
980  *
981  * @param cls Closure (poll ctx).
982  * @param tc TaskContext.
983  */
984 static void
985 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
986
987
988 /**
989  * Callback called when a queued POLL message is sent.
990  *
991  * @param cls Closure (FC).
992  * @param c Connection this message was on.
993  * @param q Queue handler this call invalidates.
994  * @param type Type of message sent.
995  * @param fwd Was this a FWD going message?
996  * @param size Size of the message.
997  */
998 static void
999 poll_sent (void *cls,
1000            struct MeshConnection *c,
1001            struct MeshConnectionQueue *q,
1002            uint16_t type, int fwd, size_t size)
1003 {
1004   struct MeshFlowControl *fc = cls;
1005
1006   if (2 == c->destroy)
1007   {
1008     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL canceled on shutdown\n");
1009     return;
1010   }
1011   LOG (GNUNET_ERROR_TYPE_DEBUG,
1012        " *** POLL sent for , scheduling new one!\n");
1013   fc->poll_msg = NULL;
1014   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
1015   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
1016                                                 &connection_poll, fc);
1017   LOG (GNUNET_ERROR_TYPE_DEBUG, " task %u\n", fc->poll_task);
1018
1019 }
1020
1021 /**
1022  * Function called if a connection has been stalled for a while,
1023  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
1024  *
1025  * @param cls Closure (poll ctx).
1026  * @param tc TaskContext.
1027  */
1028 static void
1029 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1030 {
1031   struct MeshFlowControl *fc = cls;
1032   struct GNUNET_MESH_Poll msg;
1033   struct MeshConnection *c;
1034
1035   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1036   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1037   {
1038     return;
1039   }
1040
1041   c = fc->c;
1042   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
1043   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%s]\n", GMC_2s (c));
1044   LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   %s\n",
1045        fc == &c->fwd_fc ? "FWD" : "BCK");
1046
1047   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
1048   msg.header.size = htons (sizeof (msg));
1049   msg.pid = htonl (fc->last_pid_sent);
1050   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** last pid sent: %u!\n", fc->last_pid_sent);
1051   fc->poll_msg = GMC_send_prebuilt_message (&msg.header, c,
1052                                             fc == &c->fwd_fc, GNUNET_YES,
1053                                             &poll_sent, fc);
1054 }
1055
1056
1057 /**
1058  * Timeout function due to lack of keepalive/traffic from the owner.
1059  * Destroys connection if called.
1060  *
1061  * @param cls Closure (connection to destroy).
1062  * @param tc TaskContext.
1063  */
1064 static void
1065 connection_fwd_timeout (void *cls,
1066                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1067 {
1068   struct MeshConnection *c = cls;
1069
1070   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1071   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1072     return;
1073
1074   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s FWD timed out. Destroying.\n",
1075        GMC_2s (c));
1076   if (GMC_is_origin (c, GNUNET_YES)) /* If local, leave. */
1077   {
1078     GNUNET_break (0);
1079     return;
1080   }
1081
1082   GMC_destroy (c);
1083 }
1084
1085
1086 /**
1087  * Timeout function due to lack of keepalive/traffic from the destination.
1088  * Destroys connection if called.
1089  *
1090  * @param cls Closure (connection to destroy).
1091  * @param tc TaskContext
1092  */
1093 static void
1094 connection_bck_timeout (void *cls,
1095                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1096 {
1097   struct MeshConnection *c = cls;
1098
1099   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
1100   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1101     return;
1102
1103   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s BCK timed out. Destroying.\n",
1104        GMC_2s (c));
1105
1106   if (GMC_is_origin (c, GNUNET_NO)) /* If local, leave. */
1107   {
1108     GNUNET_break (0);
1109     return;
1110   }
1111
1112   GMC_destroy (c);
1113 }
1114
1115
1116 /**
1117  * Resets the connection timeout task, some other message has done the
1118  * task's job.
1119  * - For the first peer on the direction this means to send
1120  *   a keepalive or a path confirmation message (either create or ACK).
1121  * - For all other peers, this means to destroy the connection,
1122  *   due to lack of activity.
1123  * Starts the timeout if no timeout was running (connection just created).
1124  *
1125  * @param c Connection whose timeout to reset.
1126  * @param fwd Is this forward?
1127  *
1128  * TODO use heap to improve efficiency of scheduler.
1129  */
1130 static void
1131 connection_reset_timeout (struct MeshConnection *c, int fwd)
1132 {
1133   GNUNET_SCHEDULER_TaskIdentifier *ti;
1134   GNUNET_SCHEDULER_Task f;
1135
1136   ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
1137
1138   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s reset timeout\n", GM_f2s (fwd));
1139
1140   if (GNUNET_SCHEDULER_NO_TASK != *ti)
1141     GNUNET_SCHEDULER_cancel (*ti);
1142
1143   if (GMC_is_origin (c, fwd)) /* Startpoint */
1144   {
1145     f  = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
1146     *ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
1147   }
1148   else /* Relay, endpoint. */
1149   {
1150     struct GNUNET_TIME_Relative delay;
1151
1152     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
1153     f  = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
1154     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
1155   }
1156 }
1157
1158
1159 /**
1160  * Add the connection to the list of both neighbors.
1161  *
1162  * @param c Connection.
1163  *
1164  * @return #GNUNET_OK if everything went fine
1165  *         #GNUNET_SYSERR if the was an error and @c c is malformed.
1166  */
1167 static int
1168 register_neighbors (struct MeshConnection *c)
1169 {
1170   struct MeshPeer *next_peer;
1171   struct MeshPeer *prev_peer;
1172
1173   next_peer = get_next_hop (c);
1174   prev_peer = get_prev_hop (c);
1175
1176   LOG (GNUNET_ERROR_TYPE_DEBUG, "register neighbors for connection %s\n",
1177        GMC_2s (c));
1178   path_debug (c->path);
1179   LOG (GNUNET_ERROR_TYPE_DEBUG, "own pos %u\n", c->own_pos);
1180   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to next peer %p\n",
1181        GMC_2s (c), next_peer);
1182   LOG (GNUNET_ERROR_TYPE_DEBUG, "next peer %p %s\n", next_peer, GMP_2s (next_peer));
1183   LOG (GNUNET_ERROR_TYPE_DEBUG, "putting connection %s to prev peer %p\n",
1184        GMC_2s (c), prev_peer);
1185   LOG (GNUNET_ERROR_TYPE_DEBUG, "prev peer %p %s\n", prev_peer, GMP_2s (prev_peer));
1186
1187   if (GNUNET_NO == GMP_is_neighbor (next_peer)
1188       || GNUNET_NO == GMP_is_neighbor (prev_peer))
1189   {
1190     if (GMC_is_origin (c, GNUNET_YES))
1191       GNUNET_STATISTICS_update (stats, "# local bad paths", 1, GNUNET_NO);
1192     GNUNET_STATISTICS_update (stats, "# bad paths", 1, GNUNET_NO);
1193
1194     LOG (GNUNET_ERROR_TYPE_DEBUG, "  register neighbors failed\n");
1195     LOG (GNUNET_ERROR_TYPE_DEBUG, "  prev: %s, neighbor?: %d\n",
1196          GMP_2s (prev_peer), GMP_is_neighbor (prev_peer));
1197     LOG (GNUNET_ERROR_TYPE_DEBUG, "  next: %s, neighbor?: %d\n",
1198          GMP_2s (next_peer), GMP_is_neighbor (next_peer));
1199     return GNUNET_SYSERR;
1200   }
1201
1202   GMP_add_connection (next_peer, c);
1203   GMP_add_connection (prev_peer, c);
1204
1205   return GNUNET_OK;
1206 }
1207
1208
1209 /**
1210  * Remove the connection from the list of both neighbors.
1211  *
1212  * @param c Connection.
1213  */
1214 static void
1215 unregister_neighbors (struct MeshConnection *c)
1216 {
1217   struct MeshPeer *peer;
1218
1219   peer = get_next_hop (c);
1220   if (GNUNET_OK != GMP_remove_connection (peer, c))
1221   {
1222     GNUNET_assert (MESH_CONNECTION_NEW == c->state
1223                   || MESH_CONNECTION_DESTROYED == c->state);
1224     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1225     if (NULL != c->t) GMT_debug (c->t);
1226   }
1227
1228   peer = get_prev_hop (c);
1229   if (GNUNET_OK != GMP_remove_connection (peer, c))
1230   {
1231     GNUNET_assert (MESH_CONNECTION_NEW == c->state
1232                   || MESH_CONNECTION_DESTROYED == c->state);
1233     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cstate: %u\n", c->state);
1234     if (NULL != c->t) GMT_debug (c->t);
1235   }
1236 }
1237
1238
1239 /**
1240  * Bind the connection to the peer and the tunnel to that peer.
1241  *
1242  * If the peer has no tunnel, create one. Update tunnel and connection
1243  * data structres to reflect new status.
1244  *
1245  * @param c Connection.
1246  * @param peer Peer.
1247  */
1248 static void
1249 add_to_peer (struct MeshConnection *c, struct MeshPeer *peer)
1250 {
1251   GMP_add_tunnel (peer);
1252   c->t = GMP_get_tunnel (peer);
1253   GMT_add_connection (c->t, c);
1254 }
1255
1256
1257 /**
1258  * Builds a path from a PeerIdentity array.
1259  *
1260  * @param peers PeerIdentity array.
1261  * @param size Size of the @c peers array.
1262  * @param own_pos Output parameter: own position in the path.
1263  *
1264  * @return Fixed and shortened path.
1265  */
1266 static struct MeshPeerPath *
1267 build_path_from_peer_ids (struct GNUNET_PeerIdentity *peers,
1268                           unsigned int size,
1269                           unsigned int *own_pos)
1270 {
1271   struct MeshPeerPath *path;
1272   GNUNET_PEER_Id shortid;
1273   unsigned int i;
1274   unsigned int j;
1275   unsigned int offset;
1276
1277   /* Create path */
1278   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating path...\n");
1279   path = path_new (size);
1280   *own_pos = 0;
1281   offset = 0;
1282   for (i = 0; i < size; i++)
1283   {
1284     LOG (GNUNET_ERROR_TYPE_DEBUG, "  - %u: taking %s\n",
1285          i, GNUNET_i2s (&peers[i]));
1286     shortid = GNUNET_PEER_intern (&peers[i]);
1287
1288     /* Check for loops / duplicates */
1289     for (j = 0; j < i - offset; j++)
1290     {
1291       if (path->peers[j] == shortid)
1292       {
1293         LOG (GNUNET_ERROR_TYPE_DEBUG, "    already exists at pos %u\n", j);
1294         offset += i - j;
1295         LOG (GNUNET_ERROR_TYPE_DEBUG, "    offset now\n", offset);
1296         GNUNET_PEER_change_rc (shortid, -1);
1297       }
1298     }
1299     LOG (GNUNET_ERROR_TYPE_DEBUG, "    storing at %u\n", i - offset);
1300     path->peers[i - offset] = shortid;
1301     if (path->peers[i] == myid)
1302       *own_pos = i;
1303   }
1304   path->length -= offset;
1305
1306   if (path->peers[*own_pos] != myid)
1307   {
1308     /* create path: self not found in path through self */
1309     GNUNET_break_op (0);
1310     path_destroy (path);
1311     return NULL;
1312   }
1313
1314   return path;
1315 }
1316
1317 /******************************************************************************/
1318 /********************************    API    ***********************************/
1319 /******************************************************************************/
1320
1321 /**
1322  * Core handler for connection creation.
1323  *
1324  * @param cls Closure (unused).
1325  * @param peer Sender (neighbor).
1326  * @param message Message.
1327  *
1328  * @return GNUNET_OK to keep the connection open,
1329  *         GNUNET_SYSERR to close it (signal serious error)
1330  */
1331 int
1332 GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
1333                    const struct GNUNET_MessageHeader *message)
1334 {
1335   struct GNUNET_MESH_ConnectionCreate *msg;
1336   struct GNUNET_PeerIdentity *id;
1337   struct GNUNET_HashCode *cid;
1338   struct MeshPeerPath *path;
1339   struct MeshPeer *dest_peer;
1340   struct MeshPeer *orig_peer;
1341   struct MeshConnection *c;
1342   unsigned int own_pos;
1343   uint16_t size;
1344
1345   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1346   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a connection create msg\n");
1347
1348   /* Check size */
1349   size = ntohs (message->size);
1350   if (size < sizeof (struct GNUNET_MESH_ConnectionCreate))
1351   {
1352     GNUNET_break_op (0);
1353     return GNUNET_OK;
1354   }
1355
1356   /* Calculate hops */
1357   size -= sizeof (struct GNUNET_MESH_ConnectionCreate);
1358   if (size % sizeof (struct GNUNET_PeerIdentity))
1359   {
1360     GNUNET_break_op (0);
1361     return GNUNET_OK;
1362   }
1363   size /= sizeof (struct GNUNET_PeerIdentity);
1364   if (1 > size)
1365   {
1366     GNUNET_break_op (0);
1367     return GNUNET_OK;
1368   }
1369   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1370
1371   /* Get parameters */
1372   msg = (struct GNUNET_MESH_ConnectionCreate *) message;
1373   cid = &msg->cid;
1374   id = (struct GNUNET_PeerIdentity *) &msg[1];
1375   LOG (GNUNET_ERROR_TYPE_DEBUG, "    connection %s (%s->).\n",
1376        GNUNET_h2s (cid), GNUNET_i2s (id));
1377
1378   /* Create connection */
1379   c = connection_get (cid);
1380   if (NULL == c)
1381   {
1382     path = build_path_from_peer_ids ((struct GNUNET_PeerIdentity *) &msg[1],
1383                                      size, &own_pos);
1384     if (NULL == path)
1385       return GNUNET_OK;
1386     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1387     GMP_add_path_to_all (path, GNUNET_NO);
1388     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1389     c = GMC_new (cid, NULL, path_duplicate (path), own_pos);
1390     if (NULL == c)
1391     {
1392       path_destroy (path);
1393       return GNUNET_OK;
1394     }
1395     connection_reset_timeout (c, GNUNET_YES);
1396   }
1397   else
1398   {
1399     path = path_duplicate (c->path);
1400   }
1401   if (MESH_CONNECTION_NEW == c->state)
1402     connection_change_state (c, MESH_CONNECTION_SENT);
1403
1404   /* Remember peers */
1405   dest_peer = GMP_get (&id[size - 1]);
1406   orig_peer = GMP_get (&id[0]);
1407
1408   /* Is it a connection to us? */
1409   if (c->own_pos == size - 1)
1410   {
1411     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
1412     GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_YES);
1413
1414     add_to_peer (c, orig_peer);
1415     if (MESH_TUNNEL3_NEW == GMT_get_cstate (c->t))
1416       GMT_change_cstate (c->t,  MESH_TUNNEL3_WAITING);
1417
1418     send_connection_ack (c, GNUNET_NO);
1419     if (MESH_CONNECTION_SENT == c->state)
1420       connection_change_state (c, MESH_CONNECTION_ACK);
1421
1422     /* Keep tunnel alive in direction dest->owner*/
1423     if (GNUNET_SCHEDULER_NO_TASK == c->bck_maintenance_task)
1424     {
1425       c->bck_maintenance_task =
1426         GNUNET_SCHEDULER_add_delayed (create_connection_time,
1427                                       &connection_bck_keepalive, c);
1428     }
1429   }
1430   else
1431   {
1432     /* It's for somebody else! Retransmit. */
1433     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
1434     GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
1435     GMP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
1436     GMC_send_prebuilt_message (message, c, GNUNET_YES, GNUNET_YES,
1437                                NULL, NULL);
1438   }
1439   path_destroy (path);
1440   return GNUNET_OK;
1441 }
1442
1443
1444 /**
1445  * Core handler for path confirmations.
1446  *
1447  * @param cls closure
1448  * @param message message
1449  * @param peer peer identity this notification is about
1450  *
1451  * @return GNUNET_OK to keep the connection open,
1452  *         GNUNET_SYSERR to close it (signal serious error)
1453  */
1454 int
1455 GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
1456                     const struct GNUNET_MessageHeader *message)
1457 {
1458   struct GNUNET_MESH_ConnectionACK *msg;
1459   struct MeshConnection *c;
1460   struct MeshPeerPath *p;
1461   struct MeshPeer *pi;
1462   enum MeshConnectionState oldstate;
1463   int fwd;
1464
1465   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1466   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a connection ACK msg\n");
1467   msg = (struct GNUNET_MESH_ConnectionACK *) message;
1468   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on connection %s\n",
1469               GNUNET_h2s (&msg->cid));
1470   c = connection_get (&msg->cid);
1471   if (NULL == c)
1472   {
1473     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1474                               1, GNUNET_NO);
1475     LOG (GNUNET_ERROR_TYPE_DEBUG, "  don't know the connection!\n");
1476     return GNUNET_OK;
1477   }
1478
1479   if (GNUNET_NO != c->destroy)
1480   {
1481     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection being destroyed\n");
1482     return GNUNET_OK;
1483   }
1484
1485   oldstate = c->state;
1486   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
1487   pi = GMP_get (peer);
1488   if (get_next_hop (c) == pi)
1489   {
1490     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
1491     fwd = GNUNET_NO;
1492     if (MESH_CONNECTION_SENT == oldstate)
1493       connection_change_state (c, MESH_CONNECTION_ACK);
1494   }
1495   else if (get_prev_hop (c) == pi)
1496   {
1497     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK\n");
1498     fwd = GNUNET_YES;
1499     connection_change_state (c, MESH_CONNECTION_READY);
1500   }
1501   else
1502   {
1503     GNUNET_break_op (0);
1504     return GNUNET_OK;
1505   }
1506
1507   connection_reset_timeout (c, fwd);
1508
1509   /* Add path to peers? */
1510   p = c->path;
1511   if (NULL != p)
1512   {
1513     GMP_add_path_to_all (p, GNUNET_YES);
1514   }
1515   else
1516   {
1517     GNUNET_break (0);
1518   }
1519
1520   /* Message for us as creator? */
1521   if (GMC_is_origin (c, GNUNET_YES))
1522   {
1523     if (GNUNET_NO != fwd)
1524     {
1525       GNUNET_break_op (0);
1526       return GNUNET_OK;
1527     }
1528     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
1529
1530     /* If just created, cancel the short timeout and start a long one */
1531     if (MESH_CONNECTION_SENT == oldstate)
1532       connection_reset_timeout (c, GNUNET_YES);
1533
1534     /* Change connection state */
1535     connection_change_state (c, MESH_CONNECTION_READY);
1536     send_connection_ack (c, GNUNET_YES);
1537
1538     /* Change tunnel state, trigger KX */
1539     if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
1540       GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
1541
1542     return GNUNET_OK;
1543   }
1544
1545   /* Message for us as destination? */
1546   if (GMC_is_terminal (c, GNUNET_YES))
1547   {
1548     if (GNUNET_YES != fwd)
1549     {
1550       GNUNET_break_op (0);
1551       return GNUNET_OK;
1552     }
1553     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
1554
1555     /* If just created, cancel the short timeout and start a long one */
1556     if (MESH_CONNECTION_ACK == oldstate)
1557       connection_reset_timeout (c, GNUNET_NO);
1558
1559     /* Change tunnel state */
1560     if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
1561       GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
1562
1563     return GNUNET_OK;
1564   }
1565
1566   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1567   GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
1568   return GNUNET_OK;
1569 }
1570
1571
1572 /**
1573  * Core handler for notifications of broken paths
1574  *
1575  * @param cls Closure (unused).
1576  * @param id Peer identity of sending neighbor.
1577  * @param message Message.
1578  *
1579  * @return GNUNET_OK to keep the connection open,
1580  *         GNUNET_SYSERR to close it (signal serious error)
1581  */
1582 int
1583 GMC_handle_broken (void* cls,
1584                    const struct GNUNET_PeerIdentity* id,
1585                    const struct GNUNET_MessageHeader* message)
1586 {
1587   struct GNUNET_MESH_ConnectionBroken *msg;
1588   struct MeshConnection *c;
1589   int fwd;
1590
1591   LOG (GNUNET_ERROR_TYPE_DEBUG,
1592               "Received a CONNECTION BROKEN msg from %s\n", GNUNET_i2s (id));
1593   msg = (struct GNUNET_MESH_ConnectionBroken *) message;
1594   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1595               GNUNET_i2s (&msg->peer1));
1596   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1597               GNUNET_i2s (&msg->peer2));
1598   c = connection_get (&msg->cid);
1599   if (NULL == c)
1600   {
1601     GNUNET_break_op (0);
1602     return GNUNET_OK;
1603   }
1604
1605   fwd = is_fwd (c, id);
1606   if (GMC_is_terminal (c, fwd))
1607   {
1608     if (0 < c->pending_messages)
1609       c->destroy = GNUNET_YES;
1610     else
1611       GMC_destroy (c);
1612   }
1613   else
1614   {
1615     GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
1616     c->destroy = GNUNET_YES;
1617     connection_cancel_queues (c, !fwd);
1618   }
1619
1620   return GNUNET_OK;
1621
1622 }
1623
1624
1625 /**
1626  * Core handler for tunnel destruction
1627  *
1628  * @param cls Closure (unused).
1629  * @param peer Peer identity of sending neighbor.
1630  * @param message Message.
1631  *
1632  * @return GNUNET_OK to keep the connection open,
1633  *         GNUNET_SYSERR to close it (signal serious error)
1634  */
1635 int
1636 GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
1637                     const struct GNUNET_MessageHeader *message)
1638 {
1639   struct GNUNET_MESH_ConnectionDestroy *msg;
1640   struct MeshConnection *c;
1641   int fwd;
1642
1643   msg = (struct GNUNET_MESH_ConnectionDestroy *) message;
1644   LOG (GNUNET_ERROR_TYPE_DEBUG,
1645               "Got a CONNECTION DESTROY message from %s\n",
1646               GNUNET_i2s (peer));
1647   LOG (GNUNET_ERROR_TYPE_DEBUG,
1648               "  for connection %s\n",
1649               GNUNET_h2s (&msg->cid));
1650   c = connection_get (&msg->cid);
1651   if (NULL == c)
1652   {
1653     /* Probably already got the message from another path,
1654      * destroyed the tunnel and retransmitted to children.
1655      * Safe to ignore.
1656      */
1657     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1658                               1, GNUNET_NO);
1659     LOG (GNUNET_ERROR_TYPE_DEBUG, "  connection unknown: already destroyed?\n");
1660     return GNUNET_OK;
1661   }
1662   fwd = is_fwd (c, peer);
1663   if (GNUNET_SYSERR == fwd)
1664   {
1665     GNUNET_break_op (0); /* FIXME */
1666     return GNUNET_OK;
1667   }
1668   if (GNUNET_NO == GMC_is_terminal (c, fwd))
1669     GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
1670   else if (0 == c->pending_messages)
1671   {
1672     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  directly destroying connection!\n");
1673     GMC_destroy (c);
1674     return GNUNET_OK;
1675   }
1676   c->destroy = GNUNET_YES;
1677   c->state = MESH_CONNECTION_DESTROYED;
1678   if (NULL != c->t)
1679   {
1680     GMT_remove_connection (c->t, c);
1681     c->t = NULL;
1682   }
1683
1684   return GNUNET_OK;
1685 }
1686
1687 /**
1688  * Generic handler for mesh network encrypted traffic.
1689  *
1690  * @param peer Peer identity this notification is about.
1691  * @param msg Encrypted message.
1692  *
1693  * @return GNUNET_OK to keep the connection open,
1694  *         GNUNET_SYSERR to close it (signal serious error)
1695  */
1696 static int
1697 handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
1698                        const struct GNUNET_MESH_Encrypted *msg)
1699 {
1700   struct MeshConnection *c;
1701   struct MeshPeer *neighbor;
1702   struct MeshFlowControl *fc;
1703   GNUNET_PEER_Id peer_id;
1704   uint32_t pid;
1705   uint32_t ttl;
1706   uint16_t type;
1707   size_t size;
1708   int fwd;
1709
1710   /* Check size */
1711   size = ntohs (msg->header.size);
1712   if (size <
1713       sizeof (struct GNUNET_MESH_Encrypted) +
1714       sizeof (struct GNUNET_MessageHeader))
1715   {
1716     GNUNET_break_op (0);
1717     return GNUNET_OK;
1718   }
1719   type = ntohs (msg->header.type);
1720   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1721   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message (#%u) from %s\n",
1722        GM_m2s (type), ntohl (msg->pid), GNUNET_i2s (peer));
1723
1724   /* Check connection */
1725   c = connection_get (&msg->cid);
1726   if (NULL == c)
1727   {
1728     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1729     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING enc on unknown connection %s\n",
1730          GNUNET_h2s (&msg->cid));
1731     return GNUNET_OK;
1732   }
1733
1734   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on connection %s\n", GMC_2s (c));
1735
1736   /* Check if origin is as expected */
1737   neighbor = get_prev_hop (c);
1738   peer_id = GNUNET_PEER_search (peer);
1739   if (peer_id == GMP_get_short_id (neighbor))
1740   {
1741     fwd = GNUNET_YES;
1742   }
1743   else
1744   {
1745     neighbor = get_next_hop (c);
1746     if (peer_id == GMP_get_short_id (neighbor))
1747     {
1748       fwd = GNUNET_NO;
1749     }
1750     else
1751     {
1752       /* Unexpected peer sending traffic on a connection. */
1753       GNUNET_break_op (0);
1754       return GNUNET_OK;
1755     }
1756   }
1757
1758   /* Check PID */
1759   fc = fwd ? &c->bck_fc : &c->fwd_fc;
1760   pid = ntohl (msg->pid);
1761   if (GM_is_pid_bigger (pid, fc->last_ack_sent))
1762   {
1763     GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
1764     LOG (GNUNET_ERROR_TYPE_DEBUG,
1765                 "WARNING Received PID %u, (prev %u), ACK %u\n",
1766                 pid, fc->last_pid_recv, fc->last_ack_sent);
1767     return GNUNET_OK;
1768   }
1769   if (GNUNET_NO == GM_is_pid_bigger (pid, fc->last_pid_recv))
1770   {
1771     GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
1772     LOG (GNUNET_ERROR_TYPE_DEBUG,
1773                 " Pid %u not expected (%u+), dropping!\n",
1774                 pid, fc->last_pid_recv + 1);
1775     return GNUNET_OK;
1776   }
1777   if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
1778     connection_change_state (c, MESH_CONNECTION_READY);
1779   connection_reset_timeout (c, fwd);
1780   fc->last_pid_recv = pid;
1781
1782   /* Is this message for us? */
1783   if (GMC_is_terminal (c, fwd))
1784   {
1785     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
1786     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
1787
1788     if (NULL == c->t)
1789     {
1790       GNUNET_break (GNUNET_NO != c->destroy);
1791       return GNUNET_OK;
1792     }
1793     fc->last_pid_recv = pid;
1794     GMT_handle_encrypted (c->t, msg);
1795     GMC_send_ack (c, fwd, GNUNET_NO);
1796     return GNUNET_OK;
1797   }
1798
1799   /* Message not for us: forward to next hop */
1800   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1801   ttl = ntohl (msg->ttl);
1802   LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
1803   if (ttl == 0)
1804   {
1805     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
1806     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
1807     GMC_send_ack (c, fwd, GNUNET_NO);
1808     return GNUNET_OK;
1809   }
1810
1811   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
1812   GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_NO, NULL, NULL);
1813
1814   return GNUNET_OK;
1815 }
1816
1817 /**
1818  * Generic handler for mesh network encrypted traffic.
1819  *
1820  * @param peer Peer identity this notification is about.
1821  * @param msg Encrypted message.
1822  *
1823  * @return GNUNET_OK to keep the connection open,
1824  *         GNUNET_SYSERR to close it (signal serious error)
1825  */
1826 static int
1827 handle_mesh_kx (const struct GNUNET_PeerIdentity *peer,
1828                 const struct GNUNET_MESH_KX *msg)
1829 {
1830   struct MeshConnection *c;
1831   struct MeshPeer *neighbor;
1832   GNUNET_PEER_Id peer_id;
1833   size_t size;
1834   uint16_t type;
1835   int fwd;
1836
1837   /* Check size */
1838   size = ntohs (msg->header.size);
1839   if (size <
1840       sizeof (struct GNUNET_MESH_Encrypted) +
1841       sizeof (struct GNUNET_MessageHeader))
1842   {
1843     GNUNET_break_op (0);
1844     return GNUNET_OK;
1845   }
1846   type = ntohs (msg->header.type);
1847   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1848   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message from %s\n",
1849        GM_m2s (type), GNUNET_i2s (peer));
1850
1851   /* Check connection */
1852   c = connection_get (&msg->cid);
1853   if (NULL == c)
1854   {
1855     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1856     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING kx on unknown connection %s\n",
1857          GNUNET_h2s (&msg->cid));
1858     return GNUNET_OK;
1859   }
1860   LOG (GNUNET_ERROR_TYPE_DEBUG, " on connection %s\n", GMC_2s (c));
1861
1862   /* Check if origin is as expected */
1863   neighbor = get_prev_hop (c);
1864   peer_id = GNUNET_PEER_search (peer);
1865   if (peer_id == GMP_get_short_id (neighbor))
1866   {
1867     fwd = GNUNET_YES;
1868   }
1869   else
1870   {
1871     neighbor = get_next_hop (c);
1872     if (peer_id == GMP_get_short_id (neighbor))
1873     {
1874       fwd = GNUNET_NO;
1875     }
1876     else
1877     {
1878       /* Unexpected peer sending traffic on a connection. */
1879       GNUNET_break_op (0);
1880       return GNUNET_OK;
1881     }
1882   }
1883
1884   /* Count as connection confirmation. */
1885   if (MESH_CONNECTION_SENT == c->state || MESH_CONNECTION_ACK == c->state)
1886   {
1887     connection_change_state (c, MESH_CONNECTION_READY);
1888     if (NULL != c->t)
1889     {
1890       if (MESH_TUNNEL3_WAITING == GMT_get_cstate (c->t))
1891         GMT_change_cstate (c->t, MESH_TUNNEL3_READY);
1892     }
1893   }
1894   connection_reset_timeout (c, fwd);
1895
1896   /* Is this message for us? */
1897   if (GMC_is_terminal (c, fwd))
1898   {
1899     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
1900     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
1901     if (NULL == c->t)
1902     {
1903       GNUNET_break (0);
1904       return GNUNET_OK;
1905     }
1906     GMT_handle_kx (c->t, &msg[1].header);
1907     return GNUNET_OK;
1908   }
1909
1910   /* Message not for us: forward to next hop */
1911   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1912   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
1913   GMC_send_prebuilt_message (&msg->header, c, fwd, GNUNET_NO, NULL, NULL);
1914
1915   return GNUNET_OK;
1916 }
1917
1918
1919 /**
1920  * Core handler for encrypted mesh network traffic (channel mgmt, data).
1921  *
1922  * @param cls Closure (unused).
1923  * @param message Message received.
1924  * @param peer Peer who sent the message.
1925  *
1926  * @return GNUNET_OK to keep the connection open,
1927  *         GNUNET_SYSERR to close it (signal serious error)
1928  */
1929 int
1930 GMC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
1931                       const struct GNUNET_MessageHeader *message)
1932 {
1933   return handle_mesh_encrypted (peer,
1934                                 (struct GNUNET_MESH_Encrypted *)message);
1935 }
1936
1937
1938 /**
1939  * Core handler for key exchange traffic (ephemeral key, ping, pong).
1940  *
1941  * @param cls Closure (unused).
1942  * @param message Message received.
1943  * @param peer Peer who sent the message.
1944  *
1945  * @return GNUNET_OK to keep the connection open,
1946  *         GNUNET_SYSERR to close it (signal serious error)
1947  */
1948 int
1949 GMC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
1950                const struct GNUNET_MessageHeader *message)
1951 {
1952   return handle_mesh_kx (peer,
1953                          (struct GNUNET_MESH_KX *) message);
1954 }
1955
1956
1957 /**
1958  * Core handler for mesh network traffic point-to-point acks.
1959  *
1960  * @param cls closure
1961  * @param message message
1962  * @param peer peer identity this notification is about
1963  *
1964  * @return GNUNET_OK to keep the connection open,
1965  *         GNUNET_SYSERR to close it (signal serious error)
1966  */
1967 int
1968 GMC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
1969                 const struct GNUNET_MessageHeader *message)
1970 {
1971   struct GNUNET_MESH_ACK *msg;
1972   struct MeshConnection *c;
1973   struct MeshFlowControl *fc;
1974   GNUNET_PEER_Id id;
1975   uint32_t ack;
1976   int fwd;
1977
1978   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1979   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK packet from %s!\n",
1980               GNUNET_i2s (peer));
1981   msg = (struct GNUNET_MESH_ACK *) message;
1982
1983   c = connection_get (&msg->cid);
1984
1985   if (NULL == c)
1986   {
1987     GNUNET_STATISTICS_update (stats, "# ack on unknown connection", 1,
1988                               GNUNET_NO);
1989     return GNUNET_OK;
1990   }
1991
1992   /* Is this a forward or backward ACK? */
1993   id = GNUNET_PEER_search (peer);
1994   if (GMP_get_short_id (get_next_hop (c)) == id)
1995   {
1996     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
1997     fc = &c->fwd_fc;
1998     fwd = GNUNET_YES;
1999   }
2000   else if (GMP_get_short_id (get_prev_hop (c)) == id)
2001   {
2002     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
2003     fc = &c->bck_fc;
2004     fwd = GNUNET_NO;
2005   }
2006   else
2007   {
2008     GNUNET_break_op (0);
2009     return GNUNET_OK;
2010   }
2011
2012   ack = ntohl (msg->ack);
2013   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK %u (was %u)\n",
2014               ack, fc->last_ack_recv);
2015   if (GM_is_pid_bigger (ack, fc->last_ack_recv))
2016     fc->last_ack_recv = ack;
2017
2018   /* Cancel polling if the ACK is big enough. */
2019   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
2020       GM_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2021   {
2022     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
2023     GNUNET_SCHEDULER_cancel (fc->poll_task);
2024     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2025     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
2026   }
2027
2028   connection_unlock_queue (c, fwd);
2029
2030   return GNUNET_OK;
2031 }
2032
2033
2034 /**
2035  * Core handler for mesh network traffic point-to-point ack polls.
2036  *
2037  * @param cls closure
2038  * @param message message
2039  * @param peer peer identity this notification is about
2040  *
2041  * @return GNUNET_OK to keep the connection open,
2042  *         GNUNET_SYSERR to close it (signal serious error)
2043  */
2044 int
2045 GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
2046                  const struct GNUNET_MessageHeader *message)
2047 {
2048   struct GNUNET_MESH_Poll *msg;
2049   struct MeshConnection *c;
2050   struct MeshFlowControl *fc;
2051   GNUNET_PEER_Id id;
2052   uint32_t pid;
2053   int fwd;
2054
2055   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
2056   LOG (GNUNET_ERROR_TYPE_DEBUG,
2057        "Got a POLL message from %s!\n",
2058        GNUNET_i2s (peer));
2059
2060   msg = (struct GNUNET_MESH_Poll *) message;
2061
2062   c = connection_get (&msg->cid);
2063
2064   if (NULL == c)
2065   {
2066     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
2067                               GNUNET_NO);
2068     LOG (GNUNET_ERROR_TYPE_DEBUG,
2069          "WARNING POLL message on unknown connection %s!\n",
2070          GNUNET_h2s (&msg->cid));
2071     return GNUNET_OK;
2072   }
2073
2074   /* Is this a forward or backward ACK?
2075    * Note: a poll should never be needed in a loopback case,
2076    * since there is no possiblility of packet loss there, so
2077    * this way of discerining FWD/BCK should not be a problem.
2078    */
2079   id = GNUNET_PEER_search (peer);
2080   if (GMP_get_short_id (get_next_hop (c)) == id)
2081   {
2082     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
2083     fc = &c->fwd_fc;
2084   }
2085   else if (GMP_get_short_id (get_prev_hop (c)) == id)
2086   {
2087     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
2088     fc = &c->bck_fc;
2089   }
2090   else
2091   {
2092     GNUNET_break_op (0);
2093     return GNUNET_OK;
2094   }
2095
2096   pid = ntohl (msg->pid);
2097   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
2098   fc->last_pid_recv = pid;
2099   fwd = fc == &c->bck_fc;
2100   GMC_send_ack (c, fwd, GNUNET_YES);
2101
2102   return GNUNET_OK;
2103 }
2104
2105
2106 /**
2107  * Core handler for mesh keepalives.
2108  *
2109  * @param cls closure
2110  * @param message message
2111  * @param peer peer identity this notification is about
2112  * @return GNUNET_OK to keep the connection open,
2113  *         GNUNET_SYSERR to close it (signal serious error)
2114  *
2115  * TODO: Check who we got this from, to validate route.
2116  */
2117 int
2118 GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer,
2119                       const struct GNUNET_MessageHeader *message)
2120 {
2121   struct GNUNET_MESH_ConnectionKeepAlive *msg;
2122   struct MeshConnection *c;
2123   struct MeshPeer *neighbor;
2124   GNUNET_PEER_Id peer_id;
2125   int fwd;
2126
2127   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) message;
2128   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a keepalive packet from %s\n",
2129               GNUNET_i2s (peer));
2130
2131   c = connection_get (&msg->cid);
2132   if (NULL == c)
2133   {
2134     GNUNET_STATISTICS_update (stats, "# keepalive on unknown connection", 1,
2135                               GNUNET_NO);
2136     return GNUNET_OK;
2137   }
2138
2139   /* Check if origin is as expected TODO refactor and reuse */
2140   peer_id = GNUNET_PEER_search (peer);
2141   neighbor = get_prev_hop (c);
2142   if (peer_id == GMP_get_short_id (neighbor))
2143   {
2144     fwd = GNUNET_YES;
2145   }
2146   else
2147   {
2148     neighbor = get_next_hop (c);
2149     if (peer_id == GMP_get_short_id (neighbor))
2150     {
2151       fwd = GNUNET_NO;
2152     }
2153     else
2154     {
2155       GNUNET_break_op (0);
2156       return GNUNET_OK;
2157     }
2158   }
2159
2160   connection_change_state (c, MESH_CONNECTION_READY);
2161   connection_reset_timeout (c, fwd);
2162
2163   if (GMC_is_terminal (c, fwd))
2164     return GNUNET_OK;
2165
2166   GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO);
2167   GMC_send_prebuilt_message (message, c, fwd, GNUNET_YES, NULL, NULL);
2168
2169   return GNUNET_OK;
2170 }
2171
2172
2173 /**
2174  * Send an ACK on the appropriate connection/channel, depending on
2175  * the direction and the position of the peer.
2176  *
2177  * @param c Which connection to send the hop-by-hop ACK.
2178  * @param fwd Is this a fwd ACK? (will go dest->root).
2179  * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
2180  */
2181 void
2182 GMC_send_ack (struct MeshConnection *c, int fwd, int force)
2183 {
2184   unsigned int buffer;
2185
2186   LOG (GNUNET_ERROR_TYPE_DEBUG,
2187        "GMC send %s ACK on %s\n",
2188        GM_f2s (fwd), GMC_2s (c));
2189
2190   if (NULL == c)
2191   {
2192     GNUNET_break (0);
2193     return;
2194   }
2195
2196   if (GNUNET_NO != c->destroy)
2197   {
2198     LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
2199     return;
2200   }
2201
2202   /* Get available buffer space */
2203   if (GMC_is_terminal (c, fwd))
2204   {
2205     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
2206     buffer = GMT_get_channels_buffer (c->t);
2207   }
2208   else
2209   {
2210     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
2211     buffer = GMC_get_buffer (c, fwd);
2212   }
2213   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
2214   if (0 == buffer && GNUNET_NO == force)
2215     return;
2216
2217   /* Send available buffer space */
2218   if (GMC_is_origin (c, fwd))
2219   {
2220     GNUNET_assert (NULL != c->t);
2221     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
2222     GMT_unchoke_channels (c->t);
2223   }
2224   else
2225   {
2226     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
2227     send_ack (c, buffer, fwd, force);
2228   }
2229 }
2230
2231
2232 /**
2233  * Initialize the connections subsystem
2234  *
2235  * @param c Configuration handle.
2236  */
2237 void
2238 GMC_init (const struct GNUNET_CONFIGURATION_Handle *c)
2239 {
2240   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
2241   if (GNUNET_OK !=
2242       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_MSGS_QUEUE",
2243                                              &max_msgs_queue))
2244   {
2245     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2246                                "MESH", "MAX_MSGS_QUEUE", "MISSING");
2247     GNUNET_SCHEDULER_shutdown ();
2248     return;
2249   }
2250
2251   if (GNUNET_OK !=
2252       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_CONNECTIONS",
2253                                              &max_connections))
2254   {
2255     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2256                                "MESH", "MAX_CONNECTIONS", "MISSING");
2257     GNUNET_SCHEDULER_shutdown ();
2258     return;
2259   }
2260
2261   if (GNUNET_OK !=
2262       GNUNET_CONFIGURATION_get_value_time (c, "MESH", "REFRESH_CONNECTION_TIME",
2263                                            &refresh_connection_time))
2264   {
2265     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
2266                                "MESH", "REFRESH_CONNECTION_TIME", "MISSING");
2267     GNUNET_SCHEDULER_shutdown ();
2268     return;
2269   }
2270   create_connection_time = GNUNET_TIME_UNIT_SECONDS;
2271   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES);
2272 }
2273
2274
2275 /**
2276  * Destroy each connection on shutdown.
2277  *
2278  * @param cls Closure (unused).
2279  * @param key Current key code (CID, unused).
2280  * @param value Value in the hash map (connection)
2281  *
2282  * @return #GNUNET_YES, because we should continue to iterate,
2283  */
2284 static int
2285 shutdown_iterator (void *cls,
2286                    const struct GNUNET_HashCode *key,
2287                    void *value)
2288 {
2289   struct MeshConnection *c = value;
2290
2291   GMC_destroy (c);
2292   return GNUNET_YES;
2293 }
2294
2295
2296 /**
2297  * Shut down the connections subsystem.
2298  */
2299 void
2300 GMC_shutdown (void)
2301 {
2302   GNUNET_CONTAINER_multihashmap_iterate (connections, &shutdown_iterator, NULL);
2303   GNUNET_CONTAINER_multihashmap_destroy (connections);
2304   connections = NULL;
2305 }
2306
2307
2308 struct MeshConnection *
2309 GMC_new (const struct GNUNET_HashCode *cid,
2310          struct MeshTunnel3 *t,
2311          struct MeshPeerPath *p,
2312          unsigned int own_pos)
2313 {
2314   struct MeshConnection *c;
2315
2316   c = GNUNET_new (struct MeshConnection);
2317   c->id = *cid;
2318   GNUNET_assert (GNUNET_OK ==
2319                  GNUNET_CONTAINER_multihashmap_put (connections,
2320                                                     &c->id, c,
2321                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2322   fc_init (&c->fwd_fc);
2323   fc_init (&c->bck_fc);
2324   c->fwd_fc.c = c;
2325   c->bck_fc.c = c;
2326
2327   c->t = t;
2328   GNUNET_assert (own_pos <= p->length - 1);
2329   c->own_pos = own_pos;
2330   c->path = p;
2331
2332   if (GNUNET_OK != register_neighbors (c))
2333   {
2334     if (0 == own_pos)
2335     {
2336       path_invalidate (c->path);
2337       c->t = NULL;
2338       c->path = NULL;
2339     }
2340     GMC_destroy (c);
2341     return NULL;
2342   }
2343
2344   if (0 == own_pos)
2345   {
2346     c->fwd_maintenance_task =
2347       GNUNET_SCHEDULER_add_delayed (create_connection_time,
2348                                     &connection_fwd_keepalive, c);
2349   }
2350
2351   return c;
2352 }
2353
2354
2355 void
2356 GMC_destroy (struct MeshConnection *c)
2357 {
2358   if (NULL == c)
2359   {
2360     GNUNET_break (0);
2361     return;
2362   }
2363
2364   if (2 == c->destroy) /* cancel queues -> GMP_queue_cancel -> q_destroy -> */
2365     return;            /* -> message_sent -> GMC_destroy. Don't loop. */
2366   c->destroy = 2;
2367
2368   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n", GMC_2s (c));
2369   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc's f: %p, b: %p\n",
2370        &c->fwd_fc, &c->bck_fc);
2371   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2372        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2373
2374   /* Cancel all traffic */
2375   if (NULL != c->path)
2376   {
2377     connection_cancel_queues (c, GNUNET_YES);
2378     connection_cancel_queues (c, GNUNET_NO);
2379     unregister_neighbors (c);
2380   }
2381
2382   LOG (GNUNET_ERROR_TYPE_DEBUG, " fc tasks f: %u, b: %u\n",
2383        c->fwd_fc.poll_task, c->bck_fc.poll_task);
2384
2385   /* Cancel maintainance task (keepalive/timeout) */
2386   if (NULL != c->fwd_fc.poll_msg)
2387   {
2388     GMC_cancel (c->fwd_fc.poll_msg);
2389     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg FWD canceled\n");
2390   }
2391   if (NULL != c->bck_fc.poll_msg)
2392   {
2393     GMC_cancel (c->bck_fc.poll_msg);
2394     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL msg BCK canceled\n");
2395   }
2396
2397   /* Delete from tunnel */
2398   if (NULL != c->t)
2399     GMT_remove_connection (c->t, c);
2400
2401   if (GNUNET_NO == GMC_is_origin (c, GNUNET_YES) && NULL != c->path)
2402     path_destroy (c->path);
2403   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
2404     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
2405   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
2406     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
2407   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_fc.poll_task)
2408   {
2409     GNUNET_SCHEDULER_cancel (c->fwd_fc.poll_task);
2410     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL FWD canceled\n");
2411   }
2412   if (GNUNET_SCHEDULER_NO_TASK != c->bck_fc.poll_task)
2413   {
2414     GNUNET_SCHEDULER_cancel (c->bck_fc.poll_task);
2415     LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL BCK canceled\n");
2416   }
2417
2418   GNUNET_break (GNUNET_YES ==
2419                 GNUNET_CONTAINER_multihashmap_remove (connections, &c->id, c));
2420
2421   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
2422   GNUNET_free (c);
2423 }
2424
2425 /**
2426  * Get the connection ID.
2427  *
2428  * @param c Connection to get the ID from.
2429  *
2430  * @return ID of the connection.
2431  */
2432 const struct GNUNET_HashCode *
2433 GMC_get_id (const struct MeshConnection *c)
2434 {
2435   return &c->id;
2436 }
2437
2438
2439 /**
2440  * Get the connection path.
2441  *
2442  * @param c Connection to get the path from.
2443  *
2444  * @return path used by the connection.
2445  */
2446 const struct MeshPeerPath *
2447 GMC_get_path (const struct MeshConnection *c)
2448 {
2449   if (GNUNET_NO == c->destroy)
2450     return c->path;
2451   return NULL;
2452 }
2453
2454
2455 /**
2456  * Get the connection state.
2457  *
2458  * @param c Connection to get the state from.
2459  *
2460  * @return state of the connection.
2461  */
2462 enum MeshConnectionState
2463 GMC_get_state (const struct MeshConnection *c)
2464 {
2465   return c->state;
2466 }
2467
2468 /**
2469  * Get the connection tunnel.
2470  *
2471  * @param c Connection to get the tunnel from.
2472  *
2473  * @return tunnel of the connection.
2474  */
2475 struct MeshTunnel3 *
2476 GMC_get_tunnel (const struct MeshConnection *c)
2477 {
2478   return c->t;
2479 }
2480
2481
2482 /**
2483  * Get free buffer space in a connection.
2484  *
2485  * @param c Connection.
2486  * @param fwd Is query about FWD traffic?
2487  *
2488  * @return Free buffer space [0 - max_msgs_queue/max_connections]
2489  */
2490 unsigned int
2491 GMC_get_buffer (struct MeshConnection *c, int fwd)
2492 {
2493   struct MeshFlowControl *fc;
2494
2495   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2496
2497   return (fc->queue_max - fc->queue_n);
2498 }
2499
2500 /**
2501  * Get how many messages have we allowed to send to us from a direction.
2502  *
2503  * @param c Connection.
2504  * @param fwd Are we asking about traffic from FWD (BCK messages)?
2505  *
2506  * @return last_ack_sent - last_pid_recv
2507  */
2508 unsigned int
2509 GMC_get_allowed (struct MeshConnection *c, int fwd)
2510 {
2511   struct MeshFlowControl *fc;
2512
2513   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2514   if (GM_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
2515   {
2516     return 0;
2517   }
2518   return (fc->last_ack_sent - fc->last_pid_recv);
2519 }
2520
2521 /**
2522  * Get messages queued in a connection.
2523  *
2524  * @param c Connection.
2525  * @param fwd Is query about FWD traffic?
2526  *
2527  * @return Number of messages queued.
2528  */
2529 unsigned int
2530 GMC_get_qn (struct MeshConnection *c, int fwd)
2531 {
2532   struct MeshFlowControl *fc;
2533
2534   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2535
2536   return fc->queue_n;
2537 }
2538
2539
2540 /**
2541  * Allow the connection to advertise a buffer of the given size.
2542  *
2543  * The connection will send an @c fwd ACK message (so: in direction !fwd)
2544  * allowing up to last_pid_recv + buffer.
2545  *
2546  * @param c Connection.
2547  * @param buffer How many more messages the connection can accept.
2548  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
2549  */
2550 void
2551 GMC_allow (struct MeshConnection *c, unsigned int buffer, int fwd)
2552 {
2553   LOG (GNUNET_ERROR_TYPE_DEBUG, "  allowing %s %u messages %s\n",
2554        GMC_2s (c), buffer, GM_f2s (fwd));
2555   send_ack (c, buffer, fwd, GNUNET_NO);
2556 }
2557
2558
2559 /**
2560  * Notify other peers on a connection of a broken link. Mark connections
2561  * to destroy after all traffic has been sent.
2562  *
2563  * @param c Connection on which there has been a disconnection.
2564  * @param peer Peer that disconnected.
2565  */
2566 void
2567 GMC_notify_broken (struct MeshConnection *c,
2568                    struct MeshPeer *peer)
2569 {
2570   int fwd;
2571
2572   LOG (GNUNET_ERROR_TYPE_DEBUG,
2573        " notify broken on %s due to %s disconnect\n",
2574        GMC_2s (c), GMP_2s (peer));
2575
2576   fwd = peer == get_prev_hop (c);
2577
2578   if (GNUNET_YES == GMC_is_terminal (c, fwd))
2579   {
2580     /* Local shutdown, no one to notify about this. */
2581     GMC_destroy (c);
2582     return;
2583   }
2584   if (GNUNET_NO == c->destroy)
2585     send_broken (c, &my_full_id, GMP_get_id (peer), fwd);
2586
2587   /* Connection will have at least one pending message
2588    * (the one we just scheduled), so no point in checking whether to
2589    * destroy immediately. */
2590   c->destroy = GNUNET_YES;
2591   c->state = MESH_CONNECTION_DESTROYED;
2592
2593   /**
2594    * Cancel all queues, if no message is left, connection will be destroyed.
2595    */
2596   connection_cancel_queues (c, !fwd);
2597
2598   return;
2599 }
2600
2601
2602 /**
2603  * Is this peer the first one on the connection?
2604  *
2605  * @param c Connection.
2606  * @param fwd Is this about fwd traffic?
2607  *
2608  * @return #GNUNET_YES if origin, #GNUNET_NO if relay/terminal.
2609  */
2610 int
2611 GMC_is_origin (struct MeshConnection *c, int fwd)
2612 {
2613   if (!fwd && c->path->length - 1 == c->own_pos )
2614     return GNUNET_YES;
2615   if (fwd && 0 == c->own_pos)
2616     return GNUNET_YES;
2617   return GNUNET_NO;
2618 }
2619
2620
2621 /**
2622  * Is this peer the last one on the connection?
2623  *
2624  * @param c Connection.
2625  * @param fwd Is this about fwd traffic?
2626  *            Note that the ROOT is the terminal for BCK traffic!
2627  *
2628  * @return #GNUNET_YES if terminal, #GNUNET_NO if relay/origin.
2629  */
2630 int
2631 GMC_is_terminal (struct MeshConnection *c, int fwd)
2632 {
2633   return GMC_is_origin (c, !fwd);
2634 }
2635
2636
2637 /**
2638  * See if we are allowed to send by the next hop in the given direction.
2639  *
2640  * @param c Connection.
2641  * @param fwd Is this about fwd traffic?
2642  *
2643  * @return #GNUNET_YES in case it's OK to send.
2644  */
2645 int
2646 GMC_is_sendable (struct MeshConnection *c, int fwd)
2647 {
2648   struct MeshFlowControl *fc;
2649
2650   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2651   if (GM_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2652     return GNUNET_YES;
2653   return GNUNET_NO;
2654 }
2655
2656 /**
2657  * Sends an already built message on a connection, properly registering
2658  * all used resources.
2659  *
2660  * @param message Message to send. Function makes a copy of it.
2661  *                If message is not hop-by-hop, decrements TTL of copy.
2662  * @param c Connection on which this message is transmitted.
2663  * @param fwd Is this a fwd message?
2664  * @param force Force the connection to accept the message (buffer overfill).
2665  * @param cont Continuation called once message is sent. Can be NULL.
2666  * @param cont_cls Closure for @c cont.
2667  *
2668  * @return Handle to cancel the message before it's sent.
2669  *         NULL on error or if @c cont is NULL.
2670  *         Invalid on @c cont call.
2671  */
2672 struct MeshConnectionQueue *
2673 GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2674                            struct MeshConnection *c, int fwd, int force,
2675                            GMC_sent cont, void *cont_cls)
2676 {
2677   struct MeshFlowControl *fc;
2678   struct MeshConnectionQueue *q;
2679   void *data;
2680   size_t size;
2681   uint16_t type;
2682   int droppable;
2683
2684   size = ntohs (message->size);
2685   data = GNUNET_malloc (size);
2686   memcpy (data, message, size);
2687   type = ntohs (message->type);
2688   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u bytes) on connection %s\n",
2689        GM_m2s (type), size, GMC_2s (c));
2690
2691   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2692   droppable = GNUNET_NO == force;
2693   switch (type)
2694   {
2695     struct GNUNET_MESH_Encrypted *emsg;
2696     struct GNUNET_MESH_KX        *kmsg;
2697     struct GNUNET_MESH_ACK       *amsg;
2698     struct GNUNET_MESH_Poll      *pmsg;
2699     struct GNUNET_MESH_ConnectionDestroy *dmsg;
2700     struct GNUNET_MESH_ConnectionBroken  *bmsg;
2701     uint32_t ttl;
2702
2703     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
2704       emsg = (struct GNUNET_MESH_Encrypted *) data;
2705       ttl = ntohl (emsg->ttl);
2706       if (0 == ttl)
2707       {
2708         GNUNET_break_op (0);
2709         GNUNET_free (data);
2710         return NULL;
2711       }
2712       emsg->cid = c->id;
2713       emsg->ttl = htonl (ttl - 1);
2714       emsg->pid = htonl (fc->next_pid++);
2715       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
2716       if (GNUNET_YES == droppable)
2717       {
2718         fc->queue_n++;
2719         LOG (GNUNET_ERROR_TYPE_DEBUG, "pid %u\n", ntohl (emsg->pid));
2720         LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
2721         LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack recv %u\n", fc->last_ack_recv);
2722       }
2723       else
2724       {
2725         LOG (GNUNET_ERROR_TYPE_DEBUG, "  not droppable, Q_N stays the same\n");
2726       }
2727       if (GM_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
2728       {
2729         GMC_start_poll (c, fwd);
2730       }
2731       break;
2732
2733     case GNUNET_MESSAGE_TYPE_MESH_KX:
2734       kmsg = (struct GNUNET_MESH_KX *) data;
2735       kmsg->cid = c->id;
2736       break;
2737
2738     case GNUNET_MESSAGE_TYPE_MESH_ACK:
2739       amsg = (struct GNUNET_MESH_ACK *) data;
2740       amsg->cid = c->id;
2741       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
2742       droppable = GNUNET_NO;
2743       break;
2744
2745     case GNUNET_MESSAGE_TYPE_MESH_POLL:
2746       pmsg = (struct GNUNET_MESH_Poll *) data;
2747       pmsg->cid = c->id;
2748       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
2749       droppable = GNUNET_NO;
2750       break;
2751
2752     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY:
2753       dmsg = (struct GNUNET_MESH_ConnectionDestroy *) data;
2754       dmsg->cid = c->id;
2755       dmsg->reserved = 0;
2756       break;
2757
2758     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
2759       bmsg = (struct GNUNET_MESH_ConnectionBroken *) data;
2760       bmsg->cid = c->id;
2761       bmsg->reserved = 0;
2762       break;
2763
2764     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
2765     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
2766     case GNUNET_MESSAGE_TYPE_MESH_KEEPALIVE:
2767       break;
2768
2769     default:
2770       GNUNET_break (0);
2771       GNUNET_free (data);
2772       return NULL;
2773   }
2774
2775   if (fc->queue_n > fc->queue_max && droppable)
2776   {
2777     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
2778                               1, GNUNET_NO);
2779     GNUNET_break (0);
2780     LOG (GNUNET_ERROR_TYPE_DEBUG,
2781                 "queue full: %u/%u\n",
2782                 fc->queue_n, fc->queue_max);
2783     if (GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED == type)
2784     {
2785       fc->queue_n--;
2786       fc->next_pid--;
2787     }
2788     GNUNET_free (data);
2789     return NULL; /* Drop this message */
2790   }
2791
2792   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u\n", c, c->pending_messages);
2793   c->pending_messages++;
2794
2795   q = GNUNET_new (struct MeshConnectionQueue);
2796   q->forced = !droppable;
2797   q->q = GMP_queue_add (get_hop (c, fwd), data, type, size, c, fwd,
2798                         &message_sent, q);
2799   if (NULL == q->q)
2800   {
2801     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING dropping msg on %s\n", GMC_2s (c));
2802     GNUNET_free (data);
2803     GNUNET_free (q);
2804     return NULL;
2805   }
2806   q->cont = cont;
2807   q->cont_cls = cont_cls;
2808   return q;
2809 }
2810
2811
2812 /**
2813  * Cancel a previously sent message while it's in the queue.
2814  *
2815  * ONLY can be called before the continuation given to the send function
2816  * is called. Once the continuation is called, the message is no longer in the
2817  * queue.
2818  *
2819  * @param q Handle to the queue.
2820  */
2821 void
2822 GMC_cancel (struct MeshConnectionQueue *q)
2823 {
2824   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GMC cancel message\n");
2825
2826   /* queue destroy calls message_sent, which calls q->cont and frees q */
2827   GMP_queue_destroy (q->q, GNUNET_YES);
2828 }
2829
2830
2831 /**
2832  * Sends a CREATE CONNECTION message for a path to a peer.
2833  * Changes the connection and tunnel states if necessary.
2834  *
2835  * @param connection Connection to create.
2836  */
2837 void
2838 GMC_send_create (struct MeshConnection *connection)
2839 {
2840   enum MeshTunnel3CState state;
2841   size_t size;
2842
2843   size = sizeof (struct GNUNET_MESH_ConnectionCreate);
2844   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
2845
2846   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n");
2847   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
2848        connection, connection->pending_messages);
2849   connection->pending_messages++;
2850
2851   GMP_queue_add (get_next_hop (connection), NULL,
2852                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE,
2853                  size, connection, GNUNET_YES, &message_sent, NULL);
2854
2855   state = GMT_get_cstate (connection->t);
2856   if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state)
2857     GMT_change_cstate (connection->t, MESH_TUNNEL3_WAITING);
2858   if (MESH_CONNECTION_NEW == connection->state)
2859     connection_change_state (connection, MESH_CONNECTION_SENT);
2860 }
2861
2862
2863 /**
2864  * Send a message to all peers in this connection that the connection
2865  * is no longer valid.
2866  *
2867  * If some peer should not receive the message, it should be zero'ed out
2868  * before calling this function.
2869  *
2870  * @param c The connection whose peers to notify.
2871  */
2872 void
2873 GMC_send_destroy (struct MeshConnection *c)
2874 {
2875   struct GNUNET_MESH_ConnectionDestroy msg;
2876
2877   if (GNUNET_YES == c->destroy)
2878     return;
2879
2880   msg.header.size = htons (sizeof (msg));
2881   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY);;
2882   msg.cid = c->id;
2883   LOG (GNUNET_ERROR_TYPE_DEBUG,
2884               "  sending connection destroy for connection %s\n",
2885               GMC_2s (c));
2886
2887   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_YES))
2888     GMC_send_prebuilt_message (&msg.header, c,
2889                                GNUNET_YES, GNUNET_YES, NULL, NULL);
2890   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO))
2891     GMC_send_prebuilt_message (&msg.header, c,
2892                                GNUNET_NO, GNUNET_YES, NULL, NULL);
2893   c->destroy = GNUNET_YES;
2894   c->state = MESH_CONNECTION_DESTROYED;
2895 }
2896
2897
2898 /**
2899  * @brief Start a polling timer for the connection.
2900  *
2901  * When a neighbor does not accept more traffic on the connection it could be
2902  * caused by a simple congestion or by a lost ACK. Polling enables to check
2903  * for the lastest ACK status for a connection.
2904  *
2905  * @param c Connection.
2906  * @param fwd Should we poll in the FWD direction?
2907  */
2908 void
2909 GMC_start_poll (struct MeshConnection *c, int fwd)
2910 {
2911   struct MeshFlowControl *fc;
2912
2913   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2914   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL %s requested\n",
2915        GM_f2s (fwd));
2916   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task || NULL != fc->poll_msg)
2917   {
2918     LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   not needed (%u, %p)\n",
2919          fc->poll_task, fc->poll_msg);
2920     return;
2921   }
2922   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** POLL started on request\n");
2923   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
2924                                                 &connection_poll,
2925                                                 fc);
2926 }
2927
2928
2929 /**
2930  * @brief Stop polling a connection for ACKs.
2931  *
2932  * Once we have enough ACKs for future traffic, polls are no longer necessary.
2933  *
2934  * @param c Connection.
2935  * @param fwd Should we stop the poll in the FWD direction?
2936  */
2937 void
2938 GMC_stop_poll (struct MeshConnection *c, int fwd)
2939 {
2940   struct MeshFlowControl *fc;
2941
2942   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2943   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
2944   {
2945     GNUNET_SCHEDULER_cancel (fc->poll_task);
2946     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2947   }
2948 }
2949
2950 /**
2951  * Get a (static) string for a connection.
2952  *
2953  * @param c Connection.
2954  */
2955 const char *
2956 GMC_2s (const struct MeshConnection *c)
2957 {
2958   if (NULL == c)
2959     return "NULL";
2960
2961   if (NULL != c->t)
2962   {
2963     static char buf[128];
2964
2965     sprintf (buf, "%s (->%s)", GNUNET_h2s (&c->id), GMT_2s (c->t));
2966     return buf;
2967   }
2968   return GNUNET_h2s (&c->id);
2969 }