- fix queueing
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh_connection.c
1 /*
2      This file is part of GNUnet.
3      (C) 2001-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file mesh/gnunet-service-mesh_connection.c
23  * @brief GNUnet MESH service connection handling
24  * @author Bartlomiej Polot
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30 #include "gnunet_statistics_service.h"
31
32 #include "mesh_path.h"
33 #include "mesh_protocol_enc.h"
34 #include "mesh_enc.h"
35 #include "gnunet-service-mesh_connection.h"
36 #include "gnunet-service-mesh_peer.h"
37 #include "gnunet-service-mesh_tunnel.h"
38 #include "gnunet-service-mesh_channel.h"
39
40
41 #define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__)
42
43 #define MESH_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
44                                   GNUNET_TIME_UNIT_MINUTES,\
45                                   10)
46 #define AVG_MSGS                32
47
48
49 /******************************************************************************/
50 /********************************   STRUCTS  **********************************/
51 /******************************************************************************/
52
53 /**
54  * Struct to encapsulate all the Flow Control information to a peer to which
55  * we are directly connected (on a core level).
56  */
57 struct MeshFlowControl
58 {
59   /**
60    * Connection this controls.
61    */
62   struct MeshConnection *c;
63
64   /**
65    * How many messages are in the queue on this connection.
66    */
67   unsigned int queue_n;
68
69   /**
70    * How many messages do we accept in the queue.
71    */
72   unsigned int queue_max;
73
74   /**
75    * Next ID to use.
76    */
77   uint32_t next_pid;
78
79   /**
80    * ID of the last packet sent towards the peer.
81    */
82   uint32_t last_pid_sent;
83
84   /**
85    * ID of the last packet received from the peer.
86    */
87   uint32_t last_pid_recv;
88
89   /**
90    * Last ACK sent to the peer (peer can't send more than this PID).
91    */
92   uint32_t last_ack_sent;
93
94   /**
95    * Last ACK sent towards the origin (for traffic towards leaf node).
96    */
97   uint32_t last_ack_recv;
98
99   /**
100    * Task to poll the peer in case of a lost ACK causes stall.
101    */
102   GNUNET_SCHEDULER_TaskIdentifier poll_task;
103
104   /**
105    * How frequently to poll for ACKs.
106    */
107   struct GNUNET_TIME_Relative poll_time;
108 };
109
110 /**
111  * Keep a record of the last messages sent on this connection.
112  */
113 struct MeshConnectionPerformance
114 {
115   /**
116    * Circular buffer for storing measurements.
117    */
118   double usecsperbyte[AVG_MSGS];
119
120   /**
121    * Running average of @c usecsperbyte.
122    */
123   double avg;
124
125   /**
126    * How many values of @c usecsperbyte are valid.
127    */
128   uint16_t size;
129
130   /**
131    * Index of the next "free" position in @c usecsperbyte.
132    */
133   uint16_t idx;
134 };
135
136
137 /**
138  * Struct containing all information regarding a connection to a peer.
139  */
140 struct MeshConnection
141 {
142   /**
143    * Tunnel this connection is part of.
144    */
145   struct MeshTunnel3 *t;
146
147   /**
148    * Flow control information for traffic fwd.
149    */
150   struct MeshFlowControl fwd_fc;
151
152   /**
153    * Flow control information for traffic bck.
154    */
155   struct MeshFlowControl bck_fc;
156
157   /**
158    * Measure connection performance on the endpoint.
159    */
160   struct MeshConnectionPerformance *perf;
161
162   /**
163    * ID of the connection.
164    */
165   struct GNUNET_HashCode id;
166
167   /**
168    * State of the connection.
169    */
170   enum MeshConnectionState state;
171
172   /**
173    * Path being used for the tunnel.
174    */
175   struct MeshPeerPath *path;
176
177   /**
178    * Position of the local peer in the path.
179    */
180   unsigned int own_pos;
181
182   /**
183    * Task to keep the used paths alive at the owner,
184    * time tunnel out on all the other peers.
185    */
186   GNUNET_SCHEDULER_TaskIdentifier fwd_maintenance_task;
187
188   /**
189    * Task to keep the used paths alive at the destination,
190    * time tunnel out on all the other peers.
191    */
192   GNUNET_SCHEDULER_TaskIdentifier bck_maintenance_task;
193
194   /**
195    * Pending message count.
196    */
197   int pending_messages;
198
199   /**
200    * Destroy flag: if true, destroy on last message.
201    */
202   int destroy;
203 };
204
205 /******************************************************************************/
206 /*******************************   GLOBALS  ***********************************/
207 /******************************************************************************/
208
209 /**
210  * Global handle to the statistics service.
211  */
212 extern struct GNUNET_STATISTICS_Handle *stats;
213
214 /**
215  * Local peer own ID (memory efficient handle).
216  */
217 extern GNUNET_PEER_Id myid;
218
219 /**
220  * Connections known, indexed by cid (MeshConnection).
221  */
222 static struct GNUNET_CONTAINER_MultiHashMap *connections;
223
224 /**
225  * How many connections are we willing to maintain.
226  * Local connections are always allowed, even if there are more connections than max.
227  */
228 static unsigned long long max_connections;
229
230 /**
231  * How many messages *in total* are we willing to queue, divide by number of
232  * connections to get connection queue size.
233  */
234 static unsigned long long max_msgs_queue;
235
236 /**
237  * How often to send path keepalives. Paths timeout after 4 missed.
238  */
239 static struct GNUNET_TIME_Relative refresh_connection_time;
240
241
242 /******************************************************************************/
243 /********************************   STATIC  ***********************************/
244 /******************************************************************************/
245
246 #if 0 // avoid compiler warning for unused static function
247 static void
248 fc_debug (struct MeshFlowControl *fc)
249 {
250   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
251               fc->last_pid_recv, fc->last_ack_sent);
252   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
253               fc->last_pid_sent, fc->last_ack_recv);
254   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
255               fc->queue_n, fc->queue_max);
256 }
257
258 static void
259 connection_debug (struct MeshConnection *c)
260 {
261   if (NULL == c)
262   {
263     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CONNECTION ***\n");
264     return;
265   }
266   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
267               peer2s (c->t->peer), GNUNET_h2s (&c->id));
268   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
269               c->state, c->pending_messages);
270   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
271   fc_debug (&c->fwd_fc);
272   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
273   fc_debug (&c->bck_fc);
274 }
275 #endif
276
277 /**
278  * Get string description for tunnel state.
279  *
280  * @param s Tunnel state.
281  *
282  * @return String representation.
283  */
284 static const char *
285 GMC_state2s (enum MeshConnectionState s)
286 {
287   switch (s)
288   {
289     case MESH_CONNECTION_NEW:
290       return "MESH_CONNECTION_NEW";
291     case MESH_CONNECTION_SENT:
292       return "MESH_CONNECTION_SENT";
293     case MESH_CONNECTION_ACK:
294       return "MESH_CONNECTION_ACK";
295     case MESH_CONNECTION_READY:
296       return "MESH_CONNECTION_READY";
297     default:
298       return "MESH_CONNECTION_STATE_ERROR";
299   }
300 }
301
302
303 /**
304  * Initialize a Flow Control structure to the initial state.
305  *
306  * @param fc Flow Control structure to initialize.
307  */
308 static void
309 fc_init (struct MeshFlowControl *fc)
310 {
311   fc->next_pid = 0;
312   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
313   fc->last_pid_recv = (uint32_t) -1;
314   fc->last_ack_sent = (uint32_t) 0;
315   fc->last_ack_recv = (uint32_t) 0;
316   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
317   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
318   fc->queue_n = 0;
319   fc->queue_max = (max_msgs_queue / max_connections) + 1;
320 }
321
322
323 /**
324  * Find a connection.
325  *
326  * @param cid Connection ID.
327  */
328 static struct MeshConnection *
329 connection_get (const struct GNUNET_HashCode *cid)
330 {
331   return GNUNET_CONTAINER_multihashmap_get (connections, cid);
332 }
333
334
335 static void
336 connection_change_state (struct MeshConnection* c,
337                          enum MeshConnectionState state)
338 {
339   LOG (GNUNET_ERROR_TYPE_DEBUG,
340               "Connection %s state was %s\n",
341               GNUNET_h2s (&c->id), GMC_state2s (c->state));
342   LOG (GNUNET_ERROR_TYPE_DEBUG,
343               "Connection %s state is now %s\n",
344               GNUNET_h2s (&c->id), GMC_state2s (state));
345   c->state = state;
346 }
347
348
349 /**
350  * Callback called when a queued message is sent.
351  *
352  * Calculates the average time 
353  *
354  * @param cls Closure.
355  * @param c Connection this message was on.
356  * @param type Type of message sent.
357  * @param fwd Was this a FWD going message?
358  * @param size Size of the message.
359  * @param wait Time spent waiting for core (only the time for THIS message)
360  */
361 static void 
362 message_sent (void *cls,
363               struct MeshConnection *c, uint16_t type,
364               int fwd, size_t size,
365               struct GNUNET_TIME_Relative wait)
366 {
367   struct MeshConnectionPerformance *p;
368   struct MeshFlowControl *fc;
369   double usecsperbyte;
370
371   if (NULL == c->perf)
372     return; /* Only endpoints are interested in this. */
373
374   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
375   p = c->perf;
376   usecsperbyte = ((double) wait.rel_value_us) / size;
377   if (p->size == AVG_MSGS)
378   {
379     /* Array is full. Substract oldest value, add new one and store. */
380     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
381     p->usecsperbyte[p->idx] = usecsperbyte;
382     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
383   }
384   else
385   {
386     /* Array not yet full. Add current value to avg and store. */
387     p->usecsperbyte[p->idx] = usecsperbyte;
388     p->avg *= p->size;
389     p->avg += p->usecsperbyte[p->idx];
390     p->size++;
391     p->avg /= p->size;
392   }
393   p->idx = (p->idx + 1) % AVG_MSGS;
394
395   fc = fwd ? &c->fwd_fc : &c->bck_fc;
396   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
397   fc->queue_n--;
398   c->pending_messages--;
399   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
400   {
401     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
402     GMC_destroy (c);
403   }
404   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
405   switch (type)
406   {
407     case GNUNET_MESSAGE_TYPE_MESH_FWD:
408     case GNUNET_MESSAGE_TYPE_MESH_BCK:
409       fc->last_pid_sent++;
410       LOG (GNUNET_ERROR_TYPE_DEBUG, "!   accounting pid %u\n", fc->last_pid_sent);
411 //       send_ack (c, ch, fwd);
412       break;
413     default:
414       break;
415   }
416 //   if (NULL != c->t)
417 //   {
418 //     c->t->pending_messages--;
419 //     if (GNUNET_YES == c->t->destroy && 0 == t->pending_messages)
420 //     {
421 //       LOG (GNUNET_ERROR_TYPE_DEBUG, "*  destroying tunnel!\n");
422 //       GMT_destroy (c->t);
423 //     }
424 //   }
425 }
426
427
428 /**
429  * Get the previous hop in a connection
430  *
431  * @param c Connection.
432  *
433  * @return Previous peer in the connection.
434  */
435 static struct MeshPeer *
436 get_prev_hop (struct MeshConnection *c)
437 {
438   GNUNET_PEER_Id id;
439
440   if (0 == c->own_pos || c->path->length < 2)
441     id = c->path->peers[0];
442   else
443     id = c->path->peers[c->own_pos - 1];
444
445   return GMP_get_short (id);
446 }
447
448
449 /**
450  * Get the next hop in a connection
451  *
452  * @param c Connection.
453  *
454  * @return Next peer in the connection.
455  */
456 static struct MeshPeer *
457 get_next_hop (struct MeshConnection *c)
458 {
459   GNUNET_PEER_Id id;
460
461   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
462     id = c->path->peers[c->path->length - 1];
463   else
464     id = c->path->peers[c->own_pos + 1];
465
466   return GMP_get_short (id);
467 }
468
469
470 /**
471  * Get the hop in a connection.
472  *
473  * @param c Connection.
474  * @param fwd Next hop?
475  *
476  * @return Next peer in the connection.
477  */
478 static struct MeshPeer *
479 get_hop (struct MeshConnection *c, int fwd)
480 {
481   if (fwd)
482     return get_next_hop (c);
483   return get_prev_hop (c);
484 }
485
486
487 /**
488  * Send an ACK informing the predecessor about the available buffer space.
489  *
490  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
491  * the ACK itself goes "back" (dest->root).
492  *
493  * @param c Connection on which to send the ACK.
494  * @param buffer How much space free to advertise?
495  * @param fwd Is this FWD ACK? (Going dest->owner)
496  */
497 static void
498 connection_send_ack (struct MeshConnection *c, unsigned int buffer, int fwd)
499 {
500   struct MeshFlowControl *next_fc;
501   struct MeshFlowControl *prev_fc;
502   struct GNUNET_MESH_ACK msg;
503   uint32_t ack;
504   int delta;
505
506   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
507   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
508
509   LOG (GNUNET_ERROR_TYPE_DEBUG,
510               "connection send %s ack on %s\n",
511               fwd ? "FWD" : "BCK", GNUNET_h2s (&c->id));
512
513   /* Check if we need to transmit the ACK */
514   if (prev_fc->last_ack_sent - prev_fc->last_pid_recv > 3)
515   {
516     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
517     LOG (GNUNET_ERROR_TYPE_DEBUG,
518                 "  last pid recv: %u, last ack sent: %u\n",
519                 prev_fc->last_pid_recv, prev_fc->last_ack_sent);
520     return;
521   }
522
523   /* Ok, ACK might be necessary, what PID to ACK? */
524   delta = next_fc->queue_max - next_fc->queue_n;
525   ack = prev_fc->last_pid_recv + delta;
526   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
527   LOG (GNUNET_ERROR_TYPE_DEBUG,
528               " last pid %u, last ack %u, qmax %u, q %u\n",
529               prev_fc->last_pid_recv, prev_fc->last_ack_sent,
530               next_fc->queue_max, next_fc->queue_n);
531   if (ack == prev_fc->last_ack_sent)
532   {
533     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
534     return;
535   }
536
537   prev_fc->last_ack_sent = ack;
538
539   /* Build ACK message and send on connection */
540   msg.header.size = htons (sizeof (msg));
541   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
542   msg.ack = htonl (ack);
543   msg.cid = c->id;
544
545   GMC_send_prebuilt_message (&msg.header, c, NULL, !fwd);
546 }
547
548
549 /**
550  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
551  * or a first CONNECTION_ACK directed to us.
552  *
553  * @param connection Connection to confirm.
554  * @param fwd Is this a fwd ACK? (First is bck (SYNACK), second is fwd (ACK))
555  */
556 static void
557 send_connection_ack (struct MeshConnection *connection, int fwd)
558 {
559   struct MeshTunnel3 *t;
560
561   t = connection->t;
562   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection ack\n");
563   GMP_queue_add (get_hop (connection, fwd), NULL,
564                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
565                  sizeof (struct GNUNET_MESH_ConnectionACK),
566                  connection, NULL, fwd,
567                  &message_sent, NULL);
568   if (MESH_TUNNEL3_NEW == GMT_get_state (t))
569     GMT_change_state (t, MESH_TUNNEL3_WAITING);
570   if (MESH_CONNECTION_READY != connection->state)
571     GMC_change_state (connection, MESH_CONNECTION_SENT);
572 }
573
574
575 /**
576  * Send keepalive packets for a connection.
577  *
578  * @param c Connection to keep alive..
579  * @param fwd Is this a FWD keepalive? (owner -> dest).
580  */
581 static void
582 connection_keepalive (struct MeshConnection *c, int fwd)
583 {
584   struct GNUNET_MESH_ConnectionKeepAlive *msg;
585   size_t size = sizeof (struct GNUNET_MESH_ConnectionKeepAlive);
586   char cbuf[size];
587   uint16_t type;
588
589   type = fwd ? GNUNET_MESSAGE_TYPE_MESH_FWD_KEEPALIVE :
590                GNUNET_MESSAGE_TYPE_MESH_BCK_KEEPALIVE;
591
592   LOG (GNUNET_ERROR_TYPE_DEBUG,
593        "sending %s keepalive for connection %s[%d]\n",
594        fwd ? "FWD" : "BCK", GMT_2s (c->t), c->id);
595
596   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) cbuf;
597   msg->header.size = htons (size);
598   msg->header.type = htons (type);
599   msg->cid = c->id;
600
601   GMC_send_prebuilt_message (&msg->header, c, NULL, fwd);
602 }
603
604
605 /**
606  * Send CONNECTION_{CREATE/ACK} packets for a connection.
607  *
608  * @param c Connection for which to send the message.
609  * @param fwd If GNUNET_YES, send CREATE, otherwise send ACK.
610  */
611 static void
612 connection_recreate (struct MeshConnection *c, int fwd)
613 {
614   LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
615   if (fwd)
616     send_connection_create (c);
617   else
618     send_connection_ack (c, GNUNET_NO);
619 }
620
621
622 /**
623  * Generic connection timer management.
624  * Depending on the role of the peer in the connection will send the
625  * appropriate message (build or keepalive)
626  *
627  * @param c Conncetion to maintain.
628  * @param fwd Is FWD?
629  */
630 static void
631 connection_maintain (struct MeshConnection *c, int fwd)
632 {
633   if (MESH_TUNNEL3_SEARCHING == GMT_get_state (c->t))
634   {
635     /* TODO DHT GET with RO_BART */
636     return;
637   }
638   switch (c->state)
639   {
640     case MESH_CONNECTION_NEW:
641       GNUNET_break (0);
642     case MESH_CONNECTION_SENT:
643       connection_recreate (c, fwd);
644       break;
645     case MESH_CONNECTION_READY:
646       connection_keepalive (c, fwd);
647       break;
648     default:
649       break;
650   }
651 }
652
653
654 static void
655 connection_fwd_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
656 {
657   struct MeshConnection *c = cls;
658
659   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
660   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
661     return;
662
663   connection_maintain (c, GNUNET_YES);
664   c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
665                                                           &connection_fwd_keepalive,
666                                                           c);
667 }
668
669
670 static void
671 connection_bck_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
672 {
673   struct MeshConnection *c = cls;
674
675   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
676   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
677     return;
678
679   connection_maintain (c, GNUNET_NO);
680   c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
681                                                           &connection_bck_keepalive,
682                                                           c);
683 }
684
685
686 /**
687  * @brief Re-initiate traffic on this connection if necessary.
688  *
689  * Check if there is traffic queued towards this peer
690  * and the core transmit handle is NULL (traffic was stalled).
691  * If so, call core tmt rdy.
692  *
693  * @param c Connection on which initiate traffic.
694  * @param fwd Is this about fwd traffic?
695  */
696 static void
697 connection_unlock_queue (struct MeshConnection *c, int fwd)
698 {
699   struct MeshPeer *peer;
700
701   LOG (GNUNET_ERROR_TYPE_DEBUG,
702               "connection_unlock_queue %s on %s\n",
703               fwd ? "FWD" : "BCK", GNUNET_h2s (&c->id));
704
705   if (GMC_is_terminal (c, fwd))
706   {
707     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal!\n");
708     return;
709   }
710
711   peer = get_hop (c, fwd);
712   GMP_queue_unlock (peer, c);
713 }
714
715
716 /**
717  * Cancel all transmissions that belong to a certain connection.
718  *
719  * @param c Connection which to cancel.
720  * @param fwd Cancel fwd traffic?
721  */
722 static void
723 connection_cancel_queues (struct MeshConnection *c, int fwd)
724 {
725
726   struct MeshFlowControl *fc;
727   struct MeshPeer *peer;
728
729   if (NULL == c)
730   {
731     GNUNET_break (0);
732     return;
733   }
734
735   peer = get_hop (c, fwd);
736   GMP_queue_cancel (peer, c);
737
738   fc = fwd ? &c->fwd_fc : &c->bck_fc;
739   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
740   {
741     GNUNET_SCHEDULER_cancel (fc->poll_task);
742     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
743   }
744 }
745
746
747 /**
748  * Function called if a connection has been stalled for a while,
749  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
750  *
751  * @param cls Closure (poll ctx).
752  * @param tc TaskContext.
753  */
754 static void
755 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
756 {
757   struct MeshFlowControl *fc = cls;
758   struct GNUNET_MESH_Poll msg;
759   struct MeshConnection *c;
760
761   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
762   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
763   {
764     return;
765   }
766
767   c = fc->c;
768   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
769   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%X]\n",
770               GNUNET_h2s (&c->id));
771   LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   %s\n",
772               fc == &c->fwd_fc ? "FWD" : "BCK");
773
774   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
775   msg.header.size = htons (sizeof (msg));
776   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", fc->last_pid_sent);
777   GMC_send_prebuilt_message (&msg.header, c, NULL, fc == &c->fwd_fc);
778   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
779   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
780                                                 &connection_poll, fc);
781 }
782
783
784 /**
785  * Timeout function due to lack of keepalive/traffic from the owner.
786  * Destroys connection if called.
787  *
788  * @param cls Closure (connection to destroy).
789  * @param tc TaskContext.
790  */
791 static void
792 connection_fwd_timeout (void *cls,
793                         const struct GNUNET_SCHEDULER_TaskContext *tc)
794 {
795   struct MeshConnection *c = cls;
796
797   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
798   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
799     return;
800   LOG (GNUNET_ERROR_TYPE_DEBUG,
801               "Connection %s[%X] FWD timed out. Destroying.\n",
802               GMT_2s (c->t),
803               c->id);
804
805   if (GMC_is_origin (c, GNUNET_YES)) /* If local, leave. */
806     return;
807
808   GMC_destroy (c);
809 }
810
811
812 /**
813  * Timeout function due to lack of keepalive/traffic from the destination.
814  * Destroys connection if called.
815  *
816  * @param cls Closure (connection to destroy).
817  * @param tc TaskContext
818  */
819 static void
820 connection_bck_timeout (void *cls,
821                         const struct GNUNET_SCHEDULER_TaskContext *tc)
822 {
823   struct MeshConnection *c = cls;
824
825   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
826   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
827     return;
828
829   LOG (GNUNET_ERROR_TYPE_DEBUG,
830               "Connection %s[%X] FWD timed out. Destroying.\n",
831               GMT_2s (c->t), c->id);
832
833   if (GMC_is_origin (c, GNUNET_NO)) /* If local, leave. */
834     return;
835
836   GMC_destroy (c);
837 }
838
839
840 /**
841  * Resets the connection timeout task, some other message has done the
842  * task's job.
843  * - For the first peer on the direction this means to send
844  *   a keepalive or a path confirmation message (either create or ACK).
845  * - For all other peers, this means to destroy the connection,
846  *   due to lack of activity.
847  * Starts the tiemout if no timeout was running (connection just created).
848  *
849  * @param c Connection whose timeout to reset.
850  * @param fwd Is this forward?
851  *
852  * TODO use heap to improve efficiency of scheduler.
853  */
854 static void
855 connection_reset_timeout (struct MeshConnection *c, int fwd)
856 {
857   GNUNET_SCHEDULER_TaskIdentifier *ti;
858   GNUNET_SCHEDULER_Task f;
859
860   ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
861
862   if (GNUNET_SCHEDULER_NO_TASK != *ti)
863     GNUNET_SCHEDULER_cancel (*ti);
864
865   if (GMC_is_origin (c, fwd)) /* Endpoint */
866   {
867     f  = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
868     *ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
869   }
870   else /* Relay */
871   {
872     struct GNUNET_TIME_Relative delay;
873
874     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
875     f  = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
876     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
877   }
878 }
879
880
881 /**
882  * Add the connection to the list of both neighbors.
883  *
884  * @param c Connection.
885  */
886 static void
887 register_neighbors (struct MeshConnection *c)
888 {
889   struct MeshPeer *peer;
890
891   peer = get_next_hop (c);
892   if (GNUNET_NO == GMP_is_neighbor (peer))
893   {
894     GMC_destroy (c);
895     return;
896   }
897   GMP_add_connection (peer, c);
898   peer = get_prev_hop (c);
899   if (GNUNET_NO == GMP_is_neighbor (peer))
900   {
901     GMC_destroy (c);
902     return;
903   }
904   GMP_add_connection (peer, c);
905 }
906
907
908 /**
909  * Remove the connection from the list of both neighbors.
910  *
911  * @param c Connection.
912  */
913 static void
914 unregister_neighbors (struct MeshConnection *c)
915 {
916   struct MeshPeer *peer;
917
918   peer = get_next_hop (c);
919   GMP_remove_connection (peer, c);
920
921   peer = get_prev_hop (c);
922   GMP_remove_connection (peer, c);
923
924 }
925
926
927 /******************************************************************************/
928 /********************************    API    ***********************************/
929 /******************************************************************************/
930
931 /**
932  * Core handler for connection creation.
933  *
934  * @param cls Closure (unused).
935  * @param peer Sender (neighbor).
936  * @param message Message.
937  *
938  * @return GNUNET_OK to keep the connection open,
939  *         GNUNET_SYSERR to close it (signal serious error)
940  */
941 int
942 GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
943                    const struct GNUNET_MessageHeader *message)
944 {
945   struct GNUNET_MESH_ConnectionCreate *msg;
946   struct GNUNET_PeerIdentity *id;
947   struct GNUNET_HashCode *cid;
948   struct MeshPeerPath *path;
949   struct MeshPeer *dest_peer;
950   struct MeshPeer *orig_peer;
951   struct MeshConnection *c;
952   unsigned int own_pos;
953   uint16_t size;
954   uint16_t i;
955
956   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
957   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a connection create msg\n");
958
959   /* Check size */
960   size = ntohs (message->size);
961   if (size < sizeof (struct GNUNET_MESH_ConnectionCreate))
962   {
963     GNUNET_break_op (0);
964     return GNUNET_OK;
965   }
966
967   /* Calculate hops */
968   size -= sizeof (struct GNUNET_MESH_ConnectionCreate);
969   if (size % sizeof (struct GNUNET_PeerIdentity))
970   {
971     GNUNET_break_op (0);
972     return GNUNET_OK;
973   }
974   size /= sizeof (struct GNUNET_PeerIdentity);
975   if (1 > size)
976   {
977     GNUNET_break_op (0);
978     return GNUNET_OK;
979   }
980   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
981
982   /* Get parameters */
983   msg = (struct GNUNET_MESH_ConnectionCreate *) message;
984   cid = &msg->cid;
985   id = (struct GNUNET_PeerIdentity *) &msg[1];
986   LOG (GNUNET_ERROR_TYPE_DEBUG,
987               "    connection %s (%s).\n",
988               GNUNET_h2s (cid), GNUNET_i2s (id));
989
990   /* Create connection */
991   c = connection_get (cid);
992   if (NULL == c)
993   {
994     /* Create path */
995     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating path...\n");
996     path = path_new (size);
997     own_pos = 0;
998     for (i = 0; i < size; i++)
999     {
1000       LOG (GNUNET_ERROR_TYPE_DEBUG, "  ... adding %s\n",
1001                   GNUNET_i2s (&id[i]));
1002       path->peers[i] = GNUNET_PEER_intern (&id[i]);
1003       if (path->peers[i] == myid)
1004         own_pos = i;
1005     }
1006     if (own_pos == 0 && path->peers[own_pos] != myid)
1007     {
1008       /* create path: self not found in path through self */
1009       GNUNET_break_op (0);
1010       path_destroy (path);
1011       GMC_destroy (c);
1012       return GNUNET_OK;
1013     }
1014     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1015     GMP_add_path_to_all (path, GNUNET_NO);
1016         LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1017     c = GMC_new (cid, NULL, path_duplicate (path), own_pos);
1018     if (NULL == c)
1019       return GNUNET_OK;
1020     connection_reset_timeout (c, GNUNET_YES);
1021   }
1022   else
1023   {
1024     path = NULL;
1025   }
1026   if (MESH_CONNECTION_NEW == c->state)
1027     connection_change_state (c, MESH_CONNECTION_SENT);
1028
1029   /* Remember peers */
1030   dest_peer = GMP_get (&id[size - 1]);
1031   orig_peer = GMP_get (&id[0]);
1032
1033   /* Is it a connection to us? */
1034   if (c->own_pos == size - 1)
1035   {
1036     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
1037     GMP_add_path_to_origin (orig_peer, path, GNUNET_YES);
1038
1039     GMP_add_tunnel (orig_peer);
1040     GMP_add_connection (orig_peer, c);
1041     if (MESH_TUNNEL3_NEW == GMT_get_state (c->t))
1042       GMT_change_state (c->t,  MESH_TUNNEL3_WAITING);
1043
1044     send_connection_ack (c, GNUNET_NO);
1045     if (MESH_CONNECTION_SENT == c->state)
1046       connection_change_state (c, MESH_CONNECTION_ACK);
1047
1048     /* Keep tunnel alive in direction dest->owner*/
1049     connection_reset_timeout (c, GNUNET_NO);
1050   }
1051   else
1052   {
1053     /* It's for somebody else! Retransmit. */
1054     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
1055     GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
1056     GMP_add_path_to_origin (orig_peer, path, GNUNET_NO);
1057     GMC_send_prebuilt_message (message, c, NULL, GNUNET_YES);
1058   }
1059   return GNUNET_OK;
1060 }
1061
1062
1063 /**
1064  * Core handler for path confirmations.
1065  *
1066  * @param cls closure
1067  * @param message message
1068  * @param peer peer identity this notification is about
1069  *
1070  * @return GNUNET_OK to keep the connection open,
1071  *         GNUNET_SYSERR to close it (signal serious error)
1072  */
1073 int
1074 GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
1075                     const struct GNUNET_MessageHeader *message)
1076 {
1077   struct GNUNET_MESH_ConnectionACK *msg;
1078   struct MeshConnection *c;
1079   struct MeshPeerPath *p;
1080   struct MeshPeer *pi;
1081   int fwd;
1082
1083   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1084   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a connection ACK msg\n");
1085   msg = (struct GNUNET_MESH_ConnectionACK *) message;
1086   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on connection %s\n",
1087               GNUNET_h2s (&msg->cid));
1088   c = connection_get (&msg->cid);
1089   if (NULL == c)
1090   {
1091     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1092                               1, GNUNET_NO);
1093     LOG (GNUNET_ERROR_TYPE_DEBUG, "  don't know the connection!\n");
1094     return GNUNET_OK;
1095   }
1096
1097
1098   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n",
1099               GNUNET_i2s (peer));
1100   pi = peer_get (peer);
1101   if (get_next_hop (c) == pi)
1102   {
1103     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
1104     fwd = GNUNET_NO;
1105     if (MESH_CONNECTION_SENT == c->state)
1106       connection_change_state (c, MESH_CONNECTION_ACK);
1107   }
1108   else if (get_prev_hop (c) == pi)
1109   {
1110     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK\n");
1111     fwd = GNUNET_YES;
1112     connection_change_state (c, MESH_CONNECTION_READY);
1113   }
1114   else
1115   {
1116     GNUNET_break_op (0);
1117     return GNUNET_OK;
1118   }
1119   connection_reset_timeout (c, fwd);
1120
1121   /* Add path to peers? */
1122   p = c->path;
1123   if (NULL != p)
1124   {
1125     path_add_to_peers (p, GNUNET_YES);
1126   }
1127   else
1128   {
1129     GNUNET_break (0);
1130   }
1131
1132   /* Message for us as creator? */
1133   if (GMC_is_origin (c, GNUNET_YES))
1134   {
1135     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
1136     connection_change_state (c, MESH_CONNECTION_READY);
1137     GMT_change_state (c->t, MESH_TUNNEL3_READY);
1138     send_connection_ack (c, GNUNET_YES);
1139     GMT_send_queued_data (c->t, GNUNET_YES);
1140     return GNUNET_OK;
1141   }
1142
1143   /* Message for us as destination? */
1144   if (GMC_is_terminal (c, GNUNET_YES))
1145   {
1146     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
1147     GMC_change_state (c, MESH_CONNECTION_READY);
1148     GMT_change_state (c->t, MESH_TUNNEL3_READY);
1149     GMT_send_queued_data (c->t, GNUNET_NO);
1150     return GNUNET_OK;
1151   }
1152
1153   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1154   GMC_send_prebuilt_message (message, c, NULL, fwd);
1155   return GNUNET_OK;
1156 }
1157
1158
1159 /**
1160  * Core handler for notifications of broken paths
1161  *
1162  * @param cls Closure (unused).
1163  * @param peer Peer identity of sending neighbor.
1164  * @param message Message.
1165  *
1166  * @return GNUNET_OK to keep the connection open,
1167  *         GNUNET_SYSERR to close it (signal serious error)
1168  */
1169 int
1170 GMC_handle_broken (void *cls, const struct GNUNET_PeerIdentity *peer,
1171                    const struct GNUNET_MessageHeader *message)
1172 {
1173   struct GNUNET_MESH_ConnectionBroken *msg;
1174   struct MeshConnection *c;
1175
1176   LOG (GNUNET_ERROR_TYPE_DEBUG,
1177               "Received a CONNECTION BROKEN msg from %s\n", GNUNET_i2s (peer));
1178   msg = (struct GNUNET_MESH_ConnectionBroken *) message;
1179   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1180               GNUNET_i2s (&msg->peer1));
1181   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1182               GNUNET_i2s (&msg->peer2));
1183   c = connection_get (&msg->cid);
1184   if (NULL == c)
1185   {
1186     GNUNET_break_op (0);
1187     return GNUNET_OK;
1188   }
1189   tunnel_notify_connection_broken (c->t, GNUNET_PEER_search (&msg->peer1),
1190                                    GNUNET_PEER_search (&msg->peer2));
1191   return GNUNET_OK;
1192
1193 }
1194
1195
1196 /**
1197  * Core handler for tunnel destruction
1198  *
1199  * @param cls Closure (unused).
1200  * @param peer Peer identity of sending neighbor.
1201  * @param message Message.
1202  *
1203  * @return GNUNET_OK to keep the connection open,
1204  *         GNUNET_SYSERR to close it (signal serious error)
1205  */
1206 int
1207 GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
1208                     const struct GNUNET_MessageHeader *message)
1209 {
1210   struct GNUNET_MESH_ConnectionDestroy *msg;
1211   struct MeshConnection *c;
1212   GNUNET_PEER_Id id;
1213   int fwd;
1214
1215   msg = (struct GNUNET_MESH_ConnectionDestroy *) message;
1216   LOG (GNUNET_ERROR_TYPE_DEBUG,
1217               "Got a CONNECTION DESTROY message from %s\n",
1218               GNUNET_i2s (peer));
1219   LOG (GNUNET_ERROR_TYPE_DEBUG,
1220               "  for connection %s\n",
1221               GNUNET_h2s (&msg->cid));
1222   c = connection_get (&msg->cid);
1223   if (NULL == c)
1224   {
1225     /* Probably already got the message from another path,
1226      * destroyed the tunnel and retransmitted to children.
1227      * Safe to ignore.
1228      */
1229     GNUNET_STATISTICS_update (stats, "# control on unknown tunnel",
1230                               1, GNUNET_NO);
1231     return GNUNET_OK;
1232   }
1233   id = GNUNET_PEER_search (peer);
1234   if (id == GMP_get_short_id (get_prev_hop (c)))
1235     fwd = GNUNET_YES;
1236   else if (id == GMP_get_short_id (get_next_hop (c)))
1237     fwd = GNUNET_NO;
1238   else
1239   {
1240     GNUNET_break_op (0);
1241     return GNUNET_OK;
1242   }
1243   GMC_send_prebuilt_message (message, c, NULL, fwd);
1244   c->destroy = GNUNET_YES;
1245
1246   return GNUNET_OK;
1247 }
1248
1249 /**
1250  * Generic handler for mesh network encrypted traffic.
1251  *
1252  * @param peer Peer identity this notification is about.
1253  * @param message Encrypted message.
1254  * @param fwd Is this FWD traffic? GNUNET_YES : GNUNET_NO;
1255  *
1256  * @return GNUNET_OK to keep the connection open,
1257  *         GNUNET_SYSERR to close it (signal serious error)
1258  */
1259 static int
1260 handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
1261                        const struct GNUNET_MESH_Encrypted *msg,
1262                        int fwd)
1263 {
1264   struct MeshConnection *c;
1265   struct MeshTunnel3 *t;
1266   struct MeshPeer *neighbor;
1267   struct MeshFlowControl *fc;
1268   uint32_t pid;
1269   uint32_t ttl;
1270   uint16_t type;
1271   size_t size;
1272
1273   /* Check size */
1274   size = ntohs (msg->header.size);
1275   if (size <
1276       sizeof (struct GNUNET_MESH_Encrypted) +
1277       sizeof (struct GNUNET_MessageHeader))
1278   {
1279     GNUNET_break_op (0);
1280     return GNUNET_OK;
1281   }
1282   type = ntohs (msg->header.type);
1283   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1284   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message from %s\n",
1285               GNUNET_MESH_DEBUG_M2S (type), GNUNET_i2s (peer));
1286
1287   /* Check connection */
1288   c = connection_get (&msg->cid);
1289   if (NULL == c)
1290   {
1291     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1292     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n");
1293     return GNUNET_OK;
1294   }
1295   t = c->t;
1296   fc = fwd ? &c->bck_fc : &c->fwd_fc;
1297
1298   /* Check if origin is as expected */
1299   neighbor = get_hop (c, !fwd);
1300   if (GNUNET_PEER_search (peer) != GMP_get_short_id (neighbor))
1301   {
1302     GNUNET_break_op (0);
1303     return GNUNET_OK;
1304   }
1305
1306   /* Check PID */
1307   pid = ntohl (msg->pid);
1308   if (GMC_is_pid_bigger (pid, fc->last_ack_sent))
1309   {
1310     GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
1311     LOG (GNUNET_ERROR_TYPE_DEBUG,
1312                 "WARNING Received PID %u, (prev %u), ACK %u\n",
1313                 pid, fc->last_pid_recv, fc->last_ack_sent);
1314     return GNUNET_OK;
1315   }
1316   if (GNUNET_NO == GMC_is_pid_bigger (pid, fc->last_pid_recv))
1317   {
1318     GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
1319     LOG (GNUNET_ERROR_TYPE_DEBUG,
1320                 " Pid %u not expected (%u+), dropping!\n",
1321                 pid, fc->last_pid_recv + 1);
1322     return GNUNET_OK;
1323   }
1324   if (MESH_CONNECTION_SENT == c->state)
1325     connection_change_state (c, MESH_CONNECTION_READY);
1326   connection_reset_timeout (c, fwd);
1327   fc->last_pid_recv = pid;
1328
1329   /* Is this message for us? */
1330   if (GMC_is_terminal (c, fwd))
1331   {
1332     size_t dsize = size - sizeof (struct GNUNET_MESH_Encrypted);
1333     char cbuf[dsize];
1334     struct GNUNET_MessageHeader *msgh;
1335     unsigned int off;
1336
1337     /* TODO signature verification */
1338     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
1339     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
1340
1341     fc->last_pid_recv = pid;
1342     tunnel_decrypt (t, cbuf, &msg[1], dsize, msg->iv, fwd);
1343     off = 0;
1344     while (off < dsize)
1345     {
1346       msgh = (struct GNUNET_MessageHeader *) &cbuf[off];
1347       handle_decrypted (t, msgh, fwd);
1348       off += ntohs (msgh->size);
1349     }
1350     send_ack (c, NULL, fwd);
1351     return GNUNET_OK;
1352   }
1353
1354   /* Message not for us: forward to next hop */
1355   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1356   ttl = ntohl (msg->ttl);
1357   LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
1358   if (ttl == 0)
1359   {
1360     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
1361     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
1362     send_ack (c, NULL, fwd);
1363     return GNUNET_OK;
1364   }
1365   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
1366
1367   send_prebuilt_message_connection (&msg->header, c, NULL, fwd);
1368
1369   return GNUNET_OK;
1370 }
1371
1372
1373 /**
1374  * Core handler for mesh network traffic going orig->dest.
1375  *
1376  * @param cls Closure (unused).
1377  * @param message Message received.
1378  * @param peer Peer who sent the message.
1379  *
1380  * @return GNUNET_OK to keep the connection open,
1381  *         GNUNET_SYSERR to close it (signal serious error)
1382  */
1383 int
1384 GMC_handle_fwd (void *cls, const struct GNUNET_PeerIdentity *peer,
1385                 const struct GNUNET_MessageHeader *message)
1386 {
1387   return handle_mesh_encrypted (peer,
1388                                 (struct GNUNET_MESH_Encrypted *)message,
1389                                 GNUNET_YES);
1390 }
1391
1392 /**
1393  * Core handler for mesh network traffic going dest->orig.
1394  *
1395  * @param cls Closure (unused).
1396  * @param message Message received.
1397  * @param peer Peer who sent the message.
1398  *
1399  * @return GNUNET_OK to keep the connection open,
1400  *         GNUNET_SYSERR to close it (signal serious error)
1401  */
1402 int
1403 GMC_handle_bck (void *cls, const struct GNUNET_PeerIdentity *peer,
1404                 const struct GNUNET_MessageHeader *message)
1405 {
1406   return handle_mesh_encrypted (peer,
1407                                 (struct GNUNET_MESH_Encrypted *)message,
1408                                 GNUNET_NO);
1409 }
1410
1411
1412 /**
1413  * Core handler for mesh network traffic point-to-point acks.
1414  *
1415  * @param cls closure
1416  * @param message message
1417  * @param peer peer identity this notification is about
1418  *
1419  * @return GNUNET_OK to keep the connection open,
1420  *         GNUNET_SYSERR to close it (signal serious error)
1421  */
1422 int
1423 GMC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
1424                 const struct GNUNET_MessageHeader *message)
1425 {
1426   struct GNUNET_MESH_ACK *msg;
1427   struct MeshConnection *c;
1428   struct MeshFlowControl *fc;
1429   GNUNET_PEER_Id id;
1430   uint32_t ack;
1431   int fwd;
1432
1433   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1434   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK packet from %s!\n",
1435               GNUNET_i2s (peer));
1436   msg = (struct GNUNET_MESH_ACK *) message;
1437
1438   c = connection_get (&msg->cid);
1439
1440   if (NULL == c)
1441   {
1442     GNUNET_STATISTICS_update (stats, "# ack on unknown connection", 1,
1443                               GNUNET_NO);
1444     return GNUNET_OK;
1445   }
1446
1447   /* Is this a forward or backward ACK? */
1448   id = GNUNET_PEER_search (peer);
1449   if (GMP_get_short_id (get_next_hop (c)) == id)
1450   {
1451     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
1452     fc = &c->fwd_fc;
1453     fwd = GNUNET_YES;
1454   }
1455   else if (GMP_get_short_id (get_prev_hop (c)) == id)
1456   {
1457     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
1458     fc = &c->bck_fc;
1459     fwd = GNUNET_NO;
1460   }
1461   else
1462   {
1463     GNUNET_break_op (0);
1464     return GNUNET_OK;
1465   }
1466
1467   ack = ntohl (msg->ack);
1468   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK %u (was %u)\n",
1469               ack, fc->last_ack_recv);
1470   if (GMC_is_pid_bigger (ack, fc->last_ack_recv))
1471     fc->last_ack_recv = ack;
1472
1473   /* Cancel polling if the ACK is big enough. */
1474   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
1475       GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
1476   {
1477     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
1478     GNUNET_SCHEDULER_cancel (fc->poll_task);
1479     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1480     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
1481   }
1482
1483   connection_unlock_queue (c, fwd);
1484
1485   return GNUNET_OK;
1486 }
1487
1488
1489 /**
1490  * Core handler for mesh network traffic point-to-point ack polls.
1491  *
1492  * @param cls closure
1493  * @param message message
1494  * @param peer peer identity this notification is about
1495  *
1496  * @return GNUNET_OK to keep the connection open,
1497  *         GNUNET_SYSERR to close it (signal serious error)
1498  */
1499 int
1500 GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
1501                  const struct GNUNET_MessageHeader *message)
1502 {
1503   struct GNUNET_MESH_Poll *msg;
1504   struct MeshConnection *c;
1505   struct MeshFlowControl *fc;
1506   GNUNET_PEER_Id id;
1507   uint32_t pid;
1508   int fwd;
1509
1510   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1511   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a POLL packet from %s!\n",
1512               GNUNET_i2s (peer));
1513
1514   msg = (struct GNUNET_MESH_Poll *) message;
1515
1516   c = connection_get (&msg->cid);
1517
1518   if (NULL == c)
1519   {
1520     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
1521                               GNUNET_NO);
1522     GNUNET_break_op (0);
1523     return GNUNET_OK;
1524   }
1525
1526   /* Is this a forward or backward ACK?
1527    * Note: a poll should never be needed in a loopback case,
1528    * since there is no possiblility of packet loss there, so
1529    * this way of discerining FWD/BCK should not be a problem.
1530    */
1531   id = GNUNET_PEER_search (peer);
1532   if (GMP_get_short_id (get_next_hop (c)) == id)
1533   {
1534     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
1535     fc = &c->fwd_fc;
1536   }
1537   else if (GMP_get_short_id (get_prev_hop (c)) == id)
1538   {
1539     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
1540     fc = &c->bck_fc;
1541   }
1542   else
1543   {
1544     GNUNET_break_op (0);
1545     return GNUNET_OK;
1546   }
1547
1548   pid = ntohl (msg->pid);
1549   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n",
1550               pid, fc->last_pid_recv);
1551   fc->last_pid_recv = pid;
1552   fwd = fc == &c->fwd_fc;
1553   send_ack (c, NULL, fwd);
1554
1555   return GNUNET_OK;
1556 }
1557
1558
1559 /**
1560  * Core handler for mesh keepalives.
1561  *
1562  * @param cls closure
1563  * @param message message
1564  * @param peer peer identity this notification is about
1565  * @return GNUNET_OK to keep the connection open,
1566  *         GNUNET_SYSERR to close it (signal serious error)
1567  *
1568  * TODO: Check who we got this from, to validate route.
1569  */
1570 int
1571 GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer,
1572                       const struct GNUNET_MessageHeader *message)
1573 {
1574   struct GNUNET_MESH_ConnectionKeepAlive *msg;
1575   struct MeshConnection *c;
1576   struct MeshPeer *neighbor;
1577   int fwd;
1578
1579   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) message;
1580   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a keepalive packet from %s\n",
1581               GNUNET_i2s (peer));
1582
1583   c = connection_get (&msg->cid);
1584   if (NULL == c)
1585   {
1586     GNUNET_STATISTICS_update (stats, "# keepalive on unknown connection", 1,
1587                               GNUNET_NO);
1588     return GNUNET_OK;
1589   }
1590
1591   fwd = GNUNET_MESSAGE_TYPE_MESH_FWD_KEEPALIVE == ntohs (message->type) ?
1592         GNUNET_YES : GNUNET_NO;
1593
1594   /* Check if origin is as expected */
1595   neighbor = get_hop (c, fwd);
1596   if (GNUNET_PEER_search (peer) != GMP_get_short_id (neighbor))
1597   {
1598     GNUNET_break_op (0);
1599     return GNUNET_OK;
1600   }
1601
1602   connection_change_state (c, MESH_CONNECTION_READY);
1603   connection_reset_timeout (c, fwd);
1604
1605   if (GMC_is_terminal (c, fwd))
1606     return GNUNET_OK;
1607
1608   GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO);
1609   GMC_send_prebuilt_message (message, c, NULL, fwd);
1610
1611   return GNUNET_OK;
1612 }
1613
1614
1615 /**
1616  * Send an ACK on the appropriate connection/channel, depending on
1617  * the direction and the position of the peer.
1618  *
1619  * @param c Which connection to send the hop-by-hop ACK.
1620  * @param ch Channel, if any.
1621  * @param fwd Is this a fwd ACK? (will go dest->root)
1622  */
1623 void
1624 GMC_send_ack (struct MeshConnection *c, struct MeshChannel *ch, int fwd)
1625 {
1626   unsigned int buffer;
1627
1628   LOG (GNUNET_ERROR_TYPE_DEBUG,
1629               "send ack %s on %p %p\n",
1630               fwd ? "FWD" : "BCK", c, ch);
1631   if (NULL == c || GMC_is_terminal (c, fwd))
1632   {
1633     struct MeshTunnel3 *t;
1634     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all connections\n");
1635     t = (NULL == c) ? GMCH_get_tunnel (ch) : GMC_get_tunnel (c);
1636     buffer = GMT_get_buffer (t, fwd);
1637   }
1638   else
1639   {
1640     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
1641     buffer = GMC_get_buffer (c, fwd);
1642   }
1643   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
1644
1645   if ( (NULL != ch && GMCH_is_origin (ch, fwd)) ||
1646        (NULL != c && GMC_is_origin (c, fwd)) )
1647   {
1648     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channel...\n");
1649     if (0 < buffer)
1650     {
1651       GNUNET_assert (NULL != ch);
1652       LOG (GNUNET_ERROR_TYPE_DEBUG, "  really sending!\n");
1653       send_local_ack (ch, fwd);
1654     }
1655   }
1656   else if (NULL == c)
1657   {
1658     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on all connections\n");
1659     GNUNET_assert (NULL != ch);
1660     channel_send_connections_ack (ch, buffer, fwd);
1661   }
1662   else
1663   {
1664     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
1665     connection_send_ack (c, buffer, fwd);
1666   }
1667 }
1668
1669
1670 /**
1671  * Initialize the connections subsystem
1672  *
1673  * @param c Configuration handle.
1674  */
1675 void
1676 GMC_init (const struct GNUNET_CONFIGURATION_Handle *c)
1677 {
1678   if (GNUNET_OK !=
1679       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_MSGS_QUEUE",
1680                                              &max_msgs_queue))
1681   {
1682     LOG_config_invalid (GNUNET_ERROR_TYPE_ERROR,
1683                                "MESH", "MAX_MSGS_QUEUE", "MISSING");
1684     GNUNET_SCHEDULER_shutdown ();
1685     return;
1686   }
1687
1688   if (GNUNET_OK !=
1689       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_CONNECTIONS",
1690                                              &max_connections))
1691   {
1692     LOG_config_invalid (GNUNET_ERROR_TYPE_ERROR,
1693                                "MESH", "MAX_CONNECTIONS", "MISSING");
1694     GNUNET_SCHEDULER_shutdown ();
1695     return;
1696   }
1697
1698   if (GNUNET_OK !=
1699       GNUNET_CONFIGURATION_get_value_time (c, "MESH", "REFRESH_CONNECTION_TIME",
1700                                            &refresh_connection_time))
1701   {
1702     LOG_config_invalid (GNUNET_ERROR_TYPE_ERROR,
1703                                "MESH", "REFRESH_CONNECTION_TIME", "MISSING");
1704     GNUNET_SCHEDULER_shutdown ();
1705     return;
1706   }
1707   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES);
1708 }
1709
1710 /**
1711  * Shut down the connections subsystem.
1712  */
1713 void
1714 GMC_shutdown (void)
1715 {
1716 }
1717
1718
1719 struct MeshConnection *
1720 GMC_new (const struct GNUNET_HashCode *cid,
1721          struct MeshTunnel3 *t,
1722          struct MeshPeerPath *p,
1723          unsigned int own_pos)
1724 {
1725   struct MeshConnection *c;
1726
1727   c = GNUNET_new (struct MeshConnection);
1728   c->id = *cid;
1729   GNUNET_CONTAINER_multihashmap_put (connections, &c->id, c,
1730                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1731   fc_init (&c->fwd_fc);
1732   fc_init (&c->bck_fc);
1733   c->fwd_fc.c = c;
1734   c->bck_fc.c = c;
1735
1736   c->t = t;
1737   if (own_pos > p->length - 1)
1738   {
1739     GNUNET_break (0);
1740     GMC_destroy (c);
1741     return NULL;
1742   }
1743   c->own_pos = own_pos;
1744   c->path = p;
1745
1746   if (0 == own_pos)
1747   {
1748     c->fwd_maintenance_task =
1749             GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
1750                                           &connection_fwd_keepalive, c);
1751   }
1752   register_neighbors (c);
1753   return c;
1754 }
1755
1756
1757 void
1758 GMC_destroy (struct MeshConnection *c)
1759 {
1760   if (NULL == c)
1761     return;
1762
1763   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n",
1764        GNUNET_h2s (&c->id));
1765
1766   /* Cancel all traffic */
1767   connection_cancel_queues (c, GNUNET_YES);
1768   connection_cancel_queues (c, GNUNET_NO);
1769
1770   /* Cancel maintainance task (keepalive/timeout) */
1771   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
1772     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
1773   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
1774     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
1775
1776   /* Unregister from neighbors */
1777   unregister_neighbors (c);
1778
1779   /* Delete */
1780   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
1781   GMT_remove_connection (c->t, c);
1782   GNUNET_free (c);
1783 }
1784
1785 /**
1786  * Get the connection ID.
1787  *
1788  * @param c Connection to get the ID from.
1789  *
1790  * @return ID of the connection.
1791  */
1792 const struct GNUNET_HashCode *
1793 GMC_get_id (const struct MeshConnection *c)
1794 {
1795   return &c->id;
1796 }
1797
1798
1799 /**
1800  * Get the connection path.
1801  *
1802  * @param c Connection to get the path from.
1803  *
1804  * @return path used by the connection.
1805  */
1806 const struct MeshPeerPath *
1807 GMC_get_path (const struct MeshConnection *c)
1808 {
1809   return c->path;
1810 }
1811
1812
1813 /**
1814  * Get the connection state.
1815  *
1816  * @param c Connection to get the state from.
1817  *
1818  * @return state of the connection.
1819  */
1820 enum MeshConnectionState
1821 GMC_get_state (const struct MeshConnection *c)
1822 {
1823   return c->state;
1824 }
1825
1826 /**
1827  * Get the connection tunnel.
1828  *
1829  * @param c Connection to get the tunnel from.
1830  *
1831  * @return tunnel of the connection.
1832  */
1833 struct MeshTunnel3 *
1834 GMC_get_tunnel (const struct MeshConnection *c)
1835 {
1836   return c->t;
1837 }
1838
1839
1840 /**
1841  * Get free buffer space in a connection.
1842  *
1843  * @param c Connection.
1844  * @param fwd Is query about FWD traffic?
1845  *
1846  * @return Free buffer space [0 - max_msgs_queue/max_connections]
1847  */
1848 unsigned int
1849 GMC_get_buffer (struct MeshConnection *c, int fwd)
1850 {
1851   struct MeshFlowControl *fc;
1852
1853   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1854
1855   return (fc->queue_max - fc->queue_n);
1856 }
1857
1858 /**
1859  * Get messages queued in a connection.
1860  *
1861  * @param c Connection.
1862  * @param fwd Is query about FWD traffic?
1863  *
1864  * @return Number of messages queued.
1865  */
1866 unsigned int
1867 GMC_get_qn (struct MeshConnection *c, int fwd)
1868 {
1869   struct MeshFlowControl *fc;
1870
1871   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1872
1873   return fc->queue_n;
1874 }
1875
1876
1877 /**
1878  * Notify other peers on a connection of a broken link. Mark connections
1879  * to destroy after all traffic has been sent.
1880  *
1881  * @param c Connection on which there has been a disconnection.
1882  * @param peer Peer that disconnected.
1883  * @param my_full_id My ID (to send to other peers).
1884  */
1885 void
1886 GMC_notify_broken (struct MeshConnection *c,
1887                    struct MeshPeer *peer,
1888                    struct GNUNET_PeerIdentity *my_full_id)
1889 {
1890   struct GNUNET_MESH_ConnectionBroken msg;
1891   int fwd;
1892
1893   fwd = peer == get_prev_hop (c);
1894
1895   connection_cancel_queues (c, !fwd);
1896   if (GMC_is_terminal (c, fwd))
1897   {
1898     /* Local shutdown, no one to notify about this. */
1899     GMC_destroy (c);
1900     return;
1901   }
1902
1903   msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
1904   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
1905   msg.cid = c->id;
1906   msg.peer1 = *my_full_id;
1907   msg.peer2 = *GMP_get_id (peer);
1908   GMC_send_prebuilt_message (&msg.header, c, NULL, fwd);
1909   c->destroy = GNUNET_YES;
1910
1911   return;
1912 }
1913
1914
1915 /**
1916  * Is this peer the first one on the connection?
1917  *
1918  * @param c Connection.
1919  * @param fwd Is this about fwd traffic?
1920  *
1921  * @return GNUNET_YES if origin, GNUNET_NO if relay/terminal.
1922  */
1923 int
1924 GMC_is_origin (struct MeshConnection *c, int fwd)
1925 {
1926   if (!fwd && c->path->length - 1 == c->own_pos )
1927     return GNUNET_YES;
1928   if (fwd && 0 == c->own_pos)
1929     return GNUNET_YES;
1930   return GNUNET_NO;
1931 }
1932
1933
1934 /**
1935  * Is this peer the last one on the connection?
1936  *
1937  * @param c Connection.
1938  * @param fwd Is this about fwd traffic?
1939  *            Note that the ROOT is the terminal for BCK traffic!
1940  *
1941  * @return GNUNET_YES if terminal, GNUNET_NO if relay/origin.
1942  */
1943 int
1944 GMC_is_terminal (struct MeshConnection *c, int fwd)
1945 {
1946   return GMC_is_origin (c, !fwd);
1947 }
1948
1949
1950 /**
1951  * See if we are allowed to send by the next hop in the given direction.
1952  *
1953  * @param c Connection.
1954  * @param fwd Is this about fwd traffic?
1955  *
1956  * @return GNUNET_YES in case it's OK.
1957  */
1958 int
1959 GMC_is_sendable (struct MeshConnection *c, int fwd)
1960 {
1961   struct MeshFlowControl *fc;
1962
1963   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1964   if (GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
1965     return GNUNET_YES;
1966   return GNUNET_NO;
1967 }
1968
1969 /**
1970  * Sends an already built message on a connection, properly registering
1971  * all used resources.
1972  *
1973  * @param message Message to send. Function makes a copy of it.
1974  *                If message is not hop-by-hop, decrements TTL of copy.
1975  * @param c Connection on which this message is transmitted.
1976  * @param ch Channel on which this message is transmitted, or NULL.
1977  * @param fwd Is this a fwd message?
1978  */
1979 void
1980 GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
1981                            struct MeshConnection *c,
1982                            struct MeshChannel *ch,
1983                            int fwd)
1984 {
1985   struct MeshFlowControl *fc;
1986   void *data;
1987   size_t size;
1988   uint16_t type;
1989   int droppable;
1990
1991   size = ntohs (message->size);
1992   data = GNUNET_malloc (size);
1993   memcpy (data, message, size);
1994   type = ntohs (message->type);
1995   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u) on connection %s\n",
1996               GNUNET_MESH_DEBUG_M2S (type), size, GNUNET_h2s (&c->id));
1997
1998   droppable = GNUNET_YES;
1999   switch (type)
2000   {
2001     struct GNUNET_MESH_Encrypted *emsg;
2002     struct GNUNET_MESH_ACK       *amsg;
2003     struct GNUNET_MESH_Poll      *pmsg;
2004     struct GNUNET_MESH_ConnectionDestroy *dmsg;
2005     struct GNUNET_MESH_ConnectionBroken  *bmsg;
2006     uint32_t ttl;
2007
2008     case GNUNET_MESSAGE_TYPE_MESH_FWD:
2009     case GNUNET_MESSAGE_TYPE_MESH_BCK:
2010       emsg = (struct GNUNET_MESH_Encrypted *) data;
2011       ttl = ntohl (emsg->ttl);
2012       if (0 == ttl)
2013       {
2014         GNUNET_break_op (0);
2015         return;
2016       }
2017       emsg->cid = c->id;
2018       emsg->ttl = htonl (ttl - 1);
2019       emsg->pid = htonl (fwd ? c->fwd_fc.next_pid++ : c->bck_fc.next_pid++);
2020       LOG (GNUNET_ERROR_TYPE_DEBUG, " pid %u\n", ntohl (emsg->pid));
2021       break;
2022
2023     case GNUNET_MESSAGE_TYPE_MESH_ACK:
2024       amsg = (struct GNUNET_MESH_ACK *) data;
2025       amsg->cid = c->id;
2026       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
2027       droppable = GNUNET_NO;
2028       break;
2029
2030     case GNUNET_MESSAGE_TYPE_MESH_POLL:
2031       pmsg = (struct GNUNET_MESH_Poll *) data;
2032       pmsg->cid = c->id;
2033       pmsg->pid = htonl (fwd ? c->fwd_fc.last_pid_sent : c->bck_fc.last_pid_sent);
2034       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
2035       droppable = GNUNET_NO;
2036       break;
2037
2038     case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY:
2039       dmsg = (struct GNUNET_MESH_ConnectionDestroy *) data;
2040       dmsg->cid = c->id;
2041       dmsg->reserved = 0;
2042       break;
2043
2044     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
2045       bmsg = (struct GNUNET_MESH_ConnectionBroken *) data;
2046       bmsg->cid = c->id;
2047       bmsg->reserved = 0;
2048       break;
2049
2050     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
2051     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
2052       break;
2053
2054     default:
2055       GNUNET_break (0);
2056   }
2057
2058   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2059   if (fc->queue_n >= fc->queue_max && droppable)
2060   {
2061     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
2062                               1, GNUNET_NO);
2063     GNUNET_break (0);
2064     LOG (GNUNET_ERROR_TYPE_DEBUG,
2065                 "queue full: %u/%u\n",
2066                 fc->queue_n, fc->queue_max);
2067     return; /* Drop this message */
2068   }
2069
2070   LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent);
2071   LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack %u\n", fc->last_ack_recv);
2072   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
2073   if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
2074   {
2075     GMC_start_poll (c, fwd);
2076   }
2077   fc->queue_n++;
2078   c->pending_messages++;
2079
2080   GMP_queue_add (get_hop (c, fwd), data, type, size, c, ch, fwd,
2081                  &message_sent, NULL);
2082 }
2083
2084
2085 /**
2086  * Sends a CREATE CONNECTION message for a path to a peer.
2087  * Changes the connection and tunnel states if necessary.
2088  *
2089  * @param connection Connection to create.
2090  */
2091 void
2092 GMC_send_create (struct MeshConnection *connection)
2093 {
2094 enum MeshTunnel3State state;
2095   size_t size;
2096
2097   size = sizeof (struct GNUNET_MESH_ConnectionCreate);
2098   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
2099   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n");
2100   GMP_queue_add (get_next_hop (connection), NULL,
2101                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE,
2102                  size, connection, NULL,
2103                  GNUNET_YES, &message_sent, NULL);
2104   state = GMT_get_state (connection->t);
2105   if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state)
2106     GMT_change_state (connection->t, MESH_TUNNEL3_WAITING);
2107   if (MESH_CONNECTION_NEW == connection->state)
2108     GMC_change_state (connection, MESH_CONNECTION_SENT);
2109 }
2110
2111
2112 /**
2113  * Send a message to all peers in this connection that the connection
2114  * is no longer valid.
2115  *
2116  * If some peer should not receive the message, it should be zero'ed out
2117  * before calling this function.
2118  *
2119  * @param c The connection whose peers to notify.
2120  */
2121 void
2122 GMC_send_destroy (struct MeshConnection *c)
2123 {
2124   struct GNUNET_MESH_ConnectionDestroy msg;
2125
2126   if (GNUNET_YES == c->destroy)
2127     return;
2128
2129   msg.header.size = htons (sizeof (msg));
2130   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY);;
2131   msg.cid = c->id;
2132   LOG (GNUNET_ERROR_TYPE_DEBUG,
2133               "  sending connection destroy for connection %s\n",
2134               GNUNET_h2s (&c->id));
2135
2136   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_YES))
2137     GMC_send_prebuilt_message (&msg.header, c, NULL, GNUNET_YES);
2138   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO))
2139     GMC_send_prebuilt_message (&msg.header, c, NULL, GNUNET_NO);
2140   c->destroy = GNUNET_YES;
2141 }
2142
2143
2144 /**
2145  * @brief Start a polling timer for the connection.
2146  *
2147  * When a neighbor does not accept more traffic on the connection it could be
2148  * caused by a simple congestion or by a lost ACK. Polling enables to check
2149  * for the lastest ACK status for a connection.
2150  *
2151  * @param c Connection.
2152  * @param fwd Should we poll in the FWD direction?
2153  */
2154 void
2155 GMC_start_poll (struct MeshConnection *c, int fwd)
2156 {
2157   struct MeshFlowControl *fc;
2158
2159   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2160   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
2161   {
2162     return;
2163   }
2164   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
2165                                                 &connection_poll,
2166                                                 fc);
2167 }
2168
2169
2170 /**
2171  * @brief Stop polling a connection for ACKs.
2172  *
2173  * Once we have enough ACKs for future traffic, polls are no longer necessary.
2174  *
2175  * @param c Connection.
2176  * @param fwd Should we stop the poll in the FWD direction?
2177  */
2178 void
2179 GMC_stop_poll (struct MeshConnection *c, int fwd)
2180 {
2181   struct MeshFlowControl *fc;
2182
2183   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2184   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
2185   {
2186     GNUNET_SCHEDULER_cancel (fc->poll_task);
2187     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2188   }
2189 }