- refactor
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh_connection.c
1 /*
2      This file is part of GNUnet.
3      (C) 2001-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file mesh/gnunet-service-mesh_connection.c
23  * @brief GNUnet MESH service connection handling
24  * @author Bartlomiej Polot
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30 #include "gnunet_statistics_service.h"
31
32 #include "mesh_path.h"
33 #include "mesh_protocol_enc.h"
34 #include "mesh_enc.h"
35 #include "gnunet-service-mesh_connection.h"
36 #include "gnunet-service-mesh_peer.h"
37 #include "gnunet-service-mesh_tunnel.h"
38 #include "gnunet-service-mesh_channel.h"
39
40
41 #define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__)
42
43 #define MESH_MAX_POLL_TIME      GNUNET_TIME_relative_multiply (\
44                                   GNUNET_TIME_UNIT_MINUTES,\
45                                   10)
46 #define AVG_MSGS                32
47
48
49 /******************************************************************************/
50 /********************************   STRUCTS  **********************************/
51 /******************************************************************************/
52
53 /**
54  * Struct to encapsulate all the Flow Control information to a peer to which
55  * we are directly connected (on a core level).
56  */
57 struct MeshFlowControl
58 {
59   /**
60    * Connection this controls.
61    */
62   struct MeshConnection *c;
63
64   /**
65    * How many messages are in the queue on this connection.
66    */
67   unsigned int queue_n;
68
69   /**
70    * How many messages do we accept in the queue.
71    */
72   unsigned int queue_max;
73
74   /**
75    * Next ID to use.
76    */
77   uint32_t next_pid;
78
79   /**
80    * ID of the last packet sent towards the peer.
81    */
82   uint32_t last_pid_sent;
83
84   /**
85    * ID of the last packet received from the peer.
86    */
87   uint32_t last_pid_recv;
88
89   /**
90    * Last ACK sent to the peer (peer can't send more than this PID).
91    */
92   uint32_t last_ack_sent;
93
94   /**
95    * Last ACK sent towards the origin (for traffic towards leaf node).
96    */
97   uint32_t last_ack_recv;
98
99   /**
100    * Task to poll the peer in case of a lost ACK causes stall.
101    */
102   GNUNET_SCHEDULER_TaskIdentifier poll_task;
103
104   /**
105    * How frequently to poll for ACKs.
106    */
107   struct GNUNET_TIME_Relative poll_time;
108 };
109
110 /**
111  * Keep a record of the last messages sent on this connection.
112  */
113 struct MeshConnectionPerformance
114 {
115   /**
116    * Circular buffer for storing measurements.
117    */
118   double usecsperbyte[AVG_MSGS];
119
120   /**
121    * Running average of @c usecsperbyte.
122    */
123   double avg;
124
125   /**
126    * How many values of @c usecsperbyte are valid.
127    */
128   uint16_t size;
129
130   /**
131    * Index of the next "free" position in @c usecsperbyte.
132    */
133   uint16_t idx;
134 };
135
136
137 /**
138  * Struct containing all information regarding a connection to a peer.
139  */
140 struct MeshConnection
141 {
142   /**
143    * Tunnel this connection is part of.
144    */
145   struct MeshTunnel3 *t;
146
147   /**
148    * Flow control information for traffic fwd.
149    */
150   struct MeshFlowControl fwd_fc;
151
152   /**
153    * Flow control information for traffic bck.
154    */
155   struct MeshFlowControl bck_fc;
156
157   /**
158    * Measure connection performance on the endpoint.
159    */
160   struct MeshConnectionPerformance *perf;
161
162   /**
163    * ID of the connection.
164    */
165   struct GNUNET_HashCode id;
166
167   /**
168    * State of the connection.
169    */
170   enum MeshConnectionState state;
171
172   /**
173    * Path being used for the tunnel.
174    */
175   struct MeshPeerPath *path;
176
177   /**
178    * Position of the local peer in the path.
179    */
180   unsigned int own_pos;
181
182   /**
183    * Task to keep the used paths alive at the owner,
184    * time tunnel out on all the other peers.
185    */
186   GNUNET_SCHEDULER_TaskIdentifier fwd_maintenance_task;
187
188   /**
189    * Task to keep the used paths alive at the destination,
190    * time tunnel out on all the other peers.
191    */
192   GNUNET_SCHEDULER_TaskIdentifier bck_maintenance_task;
193
194   /**
195    * Pending message count.
196    */
197   int pending_messages;
198
199   /**
200    * Destroy flag: if true, destroy on last message.
201    */
202   int destroy;
203 };
204
205 /******************************************************************************/
206 /*******************************   GLOBALS  ***********************************/
207 /******************************************************************************/
208
209 /**
210  * Global handle to the statistics service.
211  */
212 extern struct GNUNET_STATISTICS_Handle *stats;
213
214 /**
215  * Local peer own ID (memory efficient handle).
216  */
217 extern GNUNET_PEER_Id myid;
218
219 /**
220  * Local peer own ID (full value).
221  */
222 extern struct GNUNET_PeerIdentity my_full_id;
223
224 /**
225  * Connections known, indexed by cid (MeshConnection).
226  */
227 static struct GNUNET_CONTAINER_MultiHashMap *connections;
228
229 /**
230  * How many connections are we willing to maintain.
231  * Local connections are always allowed, even if there are more connections than max.
232  */
233 static unsigned long long max_connections;
234
235 /**
236  * How many messages *in total* are we willing to queue, divide by number of
237  * connections to get connection queue size.
238  */
239 static unsigned long long max_msgs_queue;
240
241 /**
242  * How often to send path keepalives. Paths timeout after 4 missed.
243  */
244 static struct GNUNET_TIME_Relative refresh_connection_time;
245
246
247 /******************************************************************************/
248 /********************************   STATIC  ***********************************/
249 /******************************************************************************/
250
251 #if 0 // avoid compiler warning for unused static function
252 static void
253 fc_debug (struct MeshFlowControl *fc)
254 {
255   LOG (GNUNET_ERROR_TYPE_DEBUG, "    IN: %u/%u\n",
256               fc->last_pid_recv, fc->last_ack_sent);
257   LOG (GNUNET_ERROR_TYPE_DEBUG, "    OUT: %u/%u\n",
258               fc->last_pid_sent, fc->last_ack_recv);
259   LOG (GNUNET_ERROR_TYPE_DEBUG, "    QUEUE: %u/%u\n",
260               fc->queue_n, fc->queue_max);
261 }
262
263 static void
264 connection_debug (struct MeshConnection *c)
265 {
266   if (NULL == c)
267   {
268     LOG (GNUNET_ERROR_TYPE_DEBUG, "*** DEBUG NULL CONNECTION ***\n");
269     return;
270   }
271   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s:%X\n",
272               peer2s (c->t->peer), GNUNET_h2s (&c->id));
273   LOG (GNUNET_ERROR_TYPE_DEBUG, "  state: %u, pending msgs: %u\n",
274               c->state, c->pending_messages);
275   LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
276   fc_debug (&c->fwd_fc);
277   LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
278   fc_debug (&c->bck_fc);
279 }
280 #endif
281
282 /**
283  * Get string description for tunnel state.
284  *
285  * @param s Tunnel state.
286  *
287  * @return String representation.
288  */
289 static const char *
290 GMC_state2s (enum MeshConnectionState s)
291 {
292   switch (s)
293   {
294     case MESH_CONNECTION_NEW:
295       return "MESH_CONNECTION_NEW";
296     case MESH_CONNECTION_SENT:
297       return "MESH_CONNECTION_SENT";
298     case MESH_CONNECTION_ACK:
299       return "MESH_CONNECTION_ACK";
300     case MESH_CONNECTION_READY:
301       return "MESH_CONNECTION_READY";
302     default:
303       return "MESH_CONNECTION_STATE_ERROR";
304   }
305 }
306
307
308 /**
309  * Initialize a Flow Control structure to the initial state.
310  *
311  * @param fc Flow Control structure to initialize.
312  */
313 static void
314 fc_init (struct MeshFlowControl *fc)
315 {
316   fc->next_pid = 0;
317   fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
318   fc->last_pid_recv = (uint32_t) -1;
319   fc->last_ack_sent = (uint32_t) 0;
320   fc->last_ack_recv = (uint32_t) 0;
321   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
322   fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
323   fc->queue_n = 0;
324   fc->queue_max = (max_msgs_queue / max_connections) + 1;
325 }
326
327
328 /**
329  * Find a connection.
330  *
331  * @param cid Connection ID.
332  */
333 static struct MeshConnection *
334 connection_get (const struct GNUNET_HashCode *cid)
335 {
336   return GNUNET_CONTAINER_multihashmap_get (connections, cid);
337 }
338
339
340 static void
341 connection_change_state (struct MeshConnection* c,
342                          enum MeshConnectionState state)
343 {
344   LOG (GNUNET_ERROR_TYPE_DEBUG,
345               "Connection %s state was %s\n",
346               GNUNET_h2s (&c->id), GMC_state2s (c->state));
347   LOG (GNUNET_ERROR_TYPE_DEBUG,
348               "Connection %s state is now %s\n",
349               GNUNET_h2s (&c->id), GMC_state2s (state));
350   c->state = state;
351 }
352
353
354 /**
355  * Callback called when a queued message is sent.
356  *
357  * Calculates the average time and connection packet tracking.
358  *
359  * @param cls Closure.
360  * @param c Connection this message was on.
361  * @param type Type of message sent.
362  * @param fwd Was this a FWD going message?
363  * @param size Size of the message.
364  * @param wait Time spent waiting for core (only the time for THIS message)
365  */
366 static void 
367 message_sent (void *cls,
368               struct MeshConnection *c, uint16_t type,
369               int fwd, size_t size,
370               struct GNUNET_TIME_Relative wait)
371 {
372   struct MeshConnectionPerformance *p;
373   struct MeshFlowControl *fc;
374   double usecsperbyte;
375
376   fc = fwd ? &c->fwd_fc : &c->bck_fc;
377   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
378   fc->queue_n--;
379   c->pending_messages--;
380   if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
381   {
382     LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
383     GMC_destroy (c);
384   }
385   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
386   switch (type)
387   {
388     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
389       fc->last_pid_sent++;
390       LOG (GNUNET_ERROR_TYPE_DEBUG, "!   accounting pid %u\n", fc->last_pid_sent);
391 //       send_ack (c, ch, fwd);
392       break;
393     default:
394       break;
395   }
396
397   if (NULL == c->perf)
398     return; /* Only endpoints are interested in timing. */
399
400   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
401   p = c->perf;
402   usecsperbyte = ((double) wait.rel_value_us) / size;
403   if (p->size == AVG_MSGS)
404   {
405     /* Array is full. Substract oldest value, add new one and store. */
406     p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
407     p->usecsperbyte[p->idx] = usecsperbyte;
408     p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
409   }
410   else
411   {
412     /* Array not yet full. Add current value to avg and store. */
413     p->usecsperbyte[p->idx] = usecsperbyte;
414     p->avg *= p->size;
415     p->avg += p->usecsperbyte[p->idx];
416     p->size++;
417     p->avg /= p->size;
418   }
419   p->idx = (p->idx + 1) % AVG_MSGS;
420
421 //   if (NULL != c->t)
422 //   {
423 //     c->t->pending_messages--;
424 //     if (GNUNET_YES == c->t->destroy && 0 == t->pending_messages)
425 //     {
426 //       LOG (GNUNET_ERROR_TYPE_DEBUG, "*  destroying tunnel!\n");
427 //       GMT_destroy (c->t);
428 //     }
429 //   }
430 }
431
432
433 /**
434  * Get the previous hop in a connection
435  *
436  * @param c Connection.
437  *
438  * @return Previous peer in the connection.
439  */
440 static struct MeshPeer *
441 get_prev_hop (const struct MeshConnection *c)
442 {
443   GNUNET_PEER_Id id;
444
445   if (0 == c->own_pos || c->path->length < 2)
446     id = c->path->peers[0];
447   else
448     id = c->path->peers[c->own_pos - 1];
449
450   return GMP_get_short (id);
451 }
452
453
454 /**
455  * Get the next hop in a connection
456  *
457  * @param c Connection.
458  *
459  * @return Next peer in the connection.
460  */
461 static struct MeshPeer *
462 get_next_hop (const struct MeshConnection *c)
463 {
464   GNUNET_PEER_Id id;
465
466   if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
467     id = c->path->peers[c->path->length - 1];
468   else
469     id = c->path->peers[c->own_pos + 1];
470
471   return GMP_get_short (id);
472 }
473
474
475 /**
476  * Get the hop in a connection.
477  *
478  * @param c Connection.
479  * @param fwd Next hop?
480  *
481  * @return Next peer in the connection.
482  */
483 static struct MeshPeer *
484 get_hop (struct MeshConnection *c, int fwd)
485 {
486   if (fwd)
487     return get_next_hop (c);
488   return get_prev_hop (c);
489 }
490
491
492 /**
493  * Is traffic coming from this sender 'FWD' traffic?
494  *
495  * @param c Connection to check.
496  * @param sender Peer identity of neighbor.
497  *
498  * @return GNUNET_YES in case the sender is the 'prev' hop and therefore
499  *         the traffic is 'FWD'. GNUNET_NO for BCK. GNUNET_SYSERR for errors.
500  */
501 static int 
502 is_fwd (const struct MeshConnection *c,
503         const struct GNUNET_PeerIdentity *sender)
504 {
505   GNUNET_PEER_Id id;
506
507   id = GNUNET_PEER_search (sender);
508   if (GMP_get_short_id (get_prev_hop (c)) == id)
509     return GNUNET_YES;
510
511   if (GMP_get_short_id (get_next_hop (c)) == id)
512     return GNUNET_NO;
513
514   GNUNET_break (0);
515   return GNUNET_SYSERR;
516 }
517
518
519
520 /**
521  * Send an ACK informing the predecessor about the available buffer space.
522  *
523  * Note that for fwd ack, the FWD mean forward *traffic* (root->dest),
524  * the ACK itself goes "back" (dest->root).
525  *
526  * @param c Connection on which to send the ACK.
527  * @param buffer How much space free to advertise?
528  * @param fwd Is this FWD ACK? (Going dest->owner)
529  */
530 static void
531 send_ack (struct MeshConnection *c, unsigned int buffer, int fwd)
532 {
533   struct MeshFlowControl *next_fc;
534   struct MeshFlowControl *prev_fc;
535   struct GNUNET_MESH_ACK msg;
536   uint32_t ack;
537   int delta;
538
539   next_fc = fwd ? &c->fwd_fc : &c->bck_fc;
540   prev_fc = fwd ? &c->bck_fc : &c->fwd_fc;
541
542   LOG (GNUNET_ERROR_TYPE_DEBUG,
543               "connection send %s ack on %s\n",
544               fwd ? "FWD" : "BCK", GNUNET_h2s (&c->id));
545
546   /* Check if we need to transmit the ACK */
547   delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv;
548   if (3 < delta && buffer < delta)
549   {
550     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n");
551     LOG (GNUNET_ERROR_TYPE_DEBUG,
552          "  last pid recv: %u, last ack sent: %u\n",
553          prev_fc->last_pid_recv, prev_fc->last_ack_sent);
554     return;
555   }
556
557   /* Ok, ACK might be necessary, what PID to ACK? */
558   ack = prev_fc->last_pid_recv + buffer;
559   LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack);
560   LOG (GNUNET_ERROR_TYPE_DEBUG,
561        " last pid %u, last ack %u, qmax %u, q %u\n",
562        prev_fc->last_pid_recv, prev_fc->last_ack_sent,
563        next_fc->queue_max, next_fc->queue_n);
564   if (ack == prev_fc->last_ack_sent)
565   {
566     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n");
567     return;
568   }
569
570   prev_fc->last_ack_sent = ack;
571
572   /* Build ACK message and send on connection */
573   msg.header.size = htons (sizeof (msg));
574   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
575   msg.ack = htonl (ack);
576   msg.cid = c->id;
577
578   GMC_send_prebuilt_message (&msg.header, c, NULL, !fwd);
579 }
580
581
582 /**
583  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
584  * or a first CONNECTION_ACK directed to us.
585  *
586  * @param connection Connection to confirm.
587  * @param fwd Is this a fwd ACK? (First is bck (SYNACK), second is fwd (ACK))
588  */
589 static void
590 send_connection_ack (struct MeshConnection *connection, int fwd)
591 {
592   struct MeshTunnel3 *t;
593
594   t = connection->t;
595   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection ack\n");
596   GMP_queue_add (get_hop (connection, fwd), NULL,
597                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
598                  sizeof (struct GNUNET_MESH_ConnectionACK),
599                  connection, NULL, fwd,
600                  &message_sent, NULL);
601   if (MESH_TUNNEL3_NEW == GMT_get_state (t))
602     GMT_change_state (t, MESH_TUNNEL3_WAITING);
603   if (MESH_CONNECTION_READY != connection->state)
604     connection_change_state (connection, MESH_CONNECTION_SENT);
605   
606 }
607
608
609 /**
610  * Send a notification that a connection is broken.
611  *
612  * @param c Connection that is broken.
613  * @param id1 Peer that has disconnected.
614  * @param id2 Peer that has disconnected.
615  * @param fwd Direction towards which to send it.
616  */
617 static void
618 send_broken (struct MeshConnection *c,
619              const struct GNUNET_PeerIdentity *id1,
620              const struct GNUNET_PeerIdentity *id2,
621              int fwd)
622 {
623   struct GNUNET_MESH_ConnectionBroken msg;
624
625   msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
626   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
627   msg.cid = c->id;
628   msg.peer1 = *id1;
629   msg.peer2 = *id2;
630   GMC_send_prebuilt_message (&msg.header, c, NULL, fwd);
631 }
632
633
634
635 /**
636  * Send keepalive packets for a connection.
637  *
638  * @param c Connection to keep alive..
639  * @param fwd Is this a FWD keepalive? (owner -> dest).
640  */
641 static void
642 connection_keepalive (struct MeshConnection *c, int fwd)
643 {
644   struct GNUNET_MESH_ConnectionKeepAlive *msg;
645   size_t size = sizeof (struct GNUNET_MESH_ConnectionKeepAlive);
646   char cbuf[size];
647   uint16_t type;
648
649   type = fwd ? GNUNET_MESSAGE_TYPE_MESH_FWD_KEEPALIVE :
650                GNUNET_MESSAGE_TYPE_MESH_BCK_KEEPALIVE;
651
652   LOG (GNUNET_ERROR_TYPE_DEBUG,
653        "sending %s keepalive for connection %s[%d]\n",
654        fwd ? "FWD" : "BCK", GMT_2s (c->t), c->id);
655
656   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) cbuf;
657   msg->header.size = htons (size);
658   msg->header.type = htons (type);
659   msg->cid = c->id;
660
661   GMC_send_prebuilt_message (&msg->header, c, NULL, fwd);
662 }
663
664
665 /**
666  * Send CONNECTION_{CREATE/ACK} packets for a connection.
667  *
668  * @param c Connection for which to send the message.
669  * @param fwd If GNUNET_YES, send CREATE, otherwise send ACK.
670  */
671 static void
672 connection_recreate (struct MeshConnection *c, int fwd)
673 {
674   LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
675   if (fwd)
676     GMC_send_create (c);
677   else
678     send_connection_ack (c, GNUNET_NO);
679 }
680
681
682 /**
683  * Generic connection timer management.
684  * Depending on the role of the peer in the connection will send the
685  * appropriate message (build or keepalive)
686  *
687  * @param c Conncetion to maintain.
688  * @param fwd Is FWD?
689  */
690 static void
691 connection_maintain (struct MeshConnection *c, int fwd)
692 {
693   if (MESH_TUNNEL3_SEARCHING == GMT_get_state (c->t))
694   {
695     /* TODO DHT GET with RO_BART */
696     return;
697   }
698   switch (c->state)
699   {
700     case MESH_CONNECTION_NEW:
701       GNUNET_break (0);
702     case MESH_CONNECTION_SENT:
703       connection_recreate (c, fwd);
704       break;
705     case MESH_CONNECTION_READY:
706       connection_keepalive (c, fwd);
707       break;
708     default:
709       break;
710   }
711 }
712
713
714 static void
715 connection_fwd_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
716 {
717   struct MeshConnection *c = cls;
718
719   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
720   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
721     return;
722
723   connection_maintain (c, GNUNET_YES);
724   c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
725                                                           &connection_fwd_keepalive,
726                                                           c);
727 }
728
729
730 static void
731 connection_bck_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
732 {
733   struct MeshConnection *c = cls;
734
735   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
736   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
737     return;
738
739   connection_maintain (c, GNUNET_NO);
740   c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
741                                                           &connection_bck_keepalive,
742                                                           c);
743 }
744
745
746 /**
747  * @brief Re-initiate traffic on this connection if necessary.
748  *
749  * Check if there is traffic queued towards this peer
750  * and the core transmit handle is NULL (traffic was stalled).
751  * If so, call core tmt rdy.
752  *
753  * @param c Connection on which initiate traffic.
754  * @param fwd Is this about fwd traffic?
755  */
756 static void
757 connection_unlock_queue (struct MeshConnection *c, int fwd)
758 {
759   struct MeshPeer *peer;
760
761   LOG (GNUNET_ERROR_TYPE_DEBUG,
762               "connection_unlock_queue %s on %s\n",
763               fwd ? "FWD" : "BCK", GNUNET_h2s (&c->id));
764
765   if (GMC_is_terminal (c, fwd))
766   {
767     LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal!\n");
768     return;
769   }
770
771   peer = get_hop (c, fwd);
772   GMP_queue_unlock (peer, c);
773 }
774
775
776 /**
777  * Cancel all transmissions that belong to a certain connection.
778  *
779  * @param c Connection which to cancel.
780  * @param fwd Cancel fwd traffic?
781  */
782 static void
783 connection_cancel_queues (struct MeshConnection *c, int fwd)
784 {
785
786   struct MeshFlowControl *fc;
787   struct MeshPeer *peer;
788
789   if (NULL == c)
790   {
791     GNUNET_break (0);
792     return;
793   }
794
795   peer = get_hop (c, fwd);
796   GMP_queue_cancel (peer, c);
797
798   fc = fwd ? &c->fwd_fc : &c->bck_fc;
799   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
800   {
801     GNUNET_SCHEDULER_cancel (fc->poll_task);
802     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
803   }
804 }
805
806
807 /**
808  * Function called if a connection has been stalled for a while,
809  * possibly due to a missed ACK. Poll the neighbor about its ACK status.
810  *
811  * @param cls Closure (poll ctx).
812  * @param tc TaskContext.
813  */
814 static void
815 connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
816 {
817   struct MeshFlowControl *fc = cls;
818   struct GNUNET_MESH_Poll msg;
819   struct MeshConnection *c;
820
821   fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
822   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
823   {
824     return;
825   }
826
827   c = fc->c;
828   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
829   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%X]\n",
830               GNUNET_h2s (&c->id));
831   LOG (GNUNET_ERROR_TYPE_DEBUG, " ***   %s\n",
832               fc == &c->fwd_fc ? "FWD" : "BCK");
833
834   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
835   msg.header.size = htons (sizeof (msg));
836   LOG (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", fc->last_pid_sent);
837   GMC_send_prebuilt_message (&msg.header, c, NULL, fc == &c->fwd_fc);
838   fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
839   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
840                                                 &connection_poll, fc);
841 }
842
843
844 /**
845  * Timeout function due to lack of keepalive/traffic from the owner.
846  * Destroys connection if called.
847  *
848  * @param cls Closure (connection to destroy).
849  * @param tc TaskContext.
850  */
851 static void
852 connection_fwd_timeout (void *cls,
853                         const struct GNUNET_SCHEDULER_TaskContext *tc)
854 {
855   struct MeshConnection *c = cls;
856
857   c->fwd_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
858   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
859     return;
860   LOG (GNUNET_ERROR_TYPE_DEBUG,
861               "Connection %s[%X] FWD timed out. Destroying.\n",
862               GMT_2s (c->t),
863               c->id);
864
865   if (GMC_is_origin (c, GNUNET_YES)) /* If local, leave. */
866     return;
867
868   GMC_destroy (c);
869 }
870
871
872 /**
873  * Timeout function due to lack of keepalive/traffic from the destination.
874  * Destroys connection if called.
875  *
876  * @param cls Closure (connection to destroy).
877  * @param tc TaskContext
878  */
879 static void
880 connection_bck_timeout (void *cls,
881                         const struct GNUNET_SCHEDULER_TaskContext *tc)
882 {
883   struct MeshConnection *c = cls;
884
885   c->bck_maintenance_task = GNUNET_SCHEDULER_NO_TASK;
886   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
887     return;
888
889   LOG (GNUNET_ERROR_TYPE_DEBUG,
890               "Connection %s[%X] FWD timed out. Destroying.\n",
891               GMT_2s (c->t), c->id);
892
893   if (GMC_is_origin (c, GNUNET_NO)) /* If local, leave. */
894     return;
895
896   GMC_destroy (c);
897 }
898
899
900 /**
901  * Resets the connection timeout task, some other message has done the
902  * task's job.
903  * - For the first peer on the direction this means to send
904  *   a keepalive or a path confirmation message (either create or ACK).
905  * - For all other peers, this means to destroy the connection,
906  *   due to lack of activity.
907  * Starts the tiemout if no timeout was running (connection just created).
908  *
909  * @param c Connection whose timeout to reset.
910  * @param fwd Is this forward?
911  *
912  * TODO use heap to improve efficiency of scheduler.
913  */
914 static void
915 connection_reset_timeout (struct MeshConnection *c, int fwd)
916 {
917   GNUNET_SCHEDULER_TaskIdentifier *ti;
918   GNUNET_SCHEDULER_Task f;
919
920   ti = fwd ? &c->fwd_maintenance_task : &c->bck_maintenance_task;
921
922   if (GNUNET_SCHEDULER_NO_TASK != *ti)
923     GNUNET_SCHEDULER_cancel (*ti);
924
925   if (GMC_is_origin (c, fwd)) /* Endpoint */
926   {
927     f  = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
928     *ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
929   }
930   else /* Relay */
931   {
932     struct GNUNET_TIME_Relative delay;
933
934     delay = GNUNET_TIME_relative_multiply (refresh_connection_time, 4);
935     f  = fwd ? &connection_fwd_timeout : &connection_bck_timeout;
936     *ti = GNUNET_SCHEDULER_add_delayed (delay, f, c);
937   }
938 }
939
940
941 /**
942  * Add the connection to the list of both neighbors.
943  *
944  * @param c Connection.
945  */
946 static void
947 register_neighbors (struct MeshConnection *c)
948 {
949   struct MeshPeer *peer;
950
951   peer = get_next_hop (c);
952   if (GNUNET_NO == GMP_is_neighbor (peer))
953   {
954     GMC_destroy (c);
955     return;
956   }
957   GMP_add_connection (peer, c);
958   peer = get_prev_hop (c);
959   if (GNUNET_NO == GMP_is_neighbor (peer))
960   {
961     GMC_destroy (c);
962     return;
963   }
964   GMP_add_connection (peer, c);
965 }
966
967
968 /**
969  * Remove the connection from the list of both neighbors.
970  *
971  * @param c Connection.
972  */
973 static void
974 unregister_neighbors (struct MeshConnection *c)
975 {
976   struct MeshPeer *peer;
977
978   peer = get_next_hop (c);
979   GMP_remove_connection (peer, c);
980
981   peer = get_prev_hop (c);
982   GMP_remove_connection (peer, c);
983
984 }
985
986
987 /**
988  * Bind the connection to the peer and the tunnel to that peer.
989  *
990  * If the peer has no tunnel, create one. Update tunnel and connection
991  * data structres to reflect new status.
992  *
993  * @param c Connection.
994  * @param peer Peer.
995  */
996 static void
997 add_to_peer (struct MeshConnection *c, struct MeshPeer *peer)
998 {
999   GMP_add_tunnel (peer);
1000   c->t = GMP_get_tunnel (peer);
1001   GMT_add_connection (c->t, c);
1002 }
1003
1004 /******************************************************************************/
1005 /********************************    API    ***********************************/
1006 /******************************************************************************/
1007
1008 /**
1009  * Core handler for connection creation.
1010  *
1011  * @param cls Closure (unused).
1012  * @param peer Sender (neighbor).
1013  * @param message Message.
1014  *
1015  * @return GNUNET_OK to keep the connection open,
1016  *         GNUNET_SYSERR to close it (signal serious error)
1017  */
1018 int
1019 GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer,
1020                    const struct GNUNET_MessageHeader *message)
1021 {
1022   struct GNUNET_MESH_ConnectionCreate *msg;
1023   struct GNUNET_PeerIdentity *id;
1024   struct GNUNET_HashCode *cid;
1025   struct MeshPeerPath *path;
1026   struct MeshPeer *dest_peer;
1027   struct MeshPeer *orig_peer;
1028   struct MeshConnection *c;
1029   unsigned int own_pos;
1030   uint16_t size;
1031   uint16_t i;
1032
1033   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1034   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a connection create msg\n");
1035
1036   /* Check size */
1037   size = ntohs (message->size);
1038   if (size < sizeof (struct GNUNET_MESH_ConnectionCreate))
1039   {
1040     GNUNET_break_op (0);
1041     return GNUNET_OK;
1042   }
1043
1044   /* Calculate hops */
1045   size -= sizeof (struct GNUNET_MESH_ConnectionCreate);
1046   if (size % sizeof (struct GNUNET_PeerIdentity))
1047   {
1048     GNUNET_break_op (0);
1049     return GNUNET_OK;
1050   }
1051   size /= sizeof (struct GNUNET_PeerIdentity);
1052   if (1 > size)
1053   {
1054     GNUNET_break_op (0);
1055     return GNUNET_OK;
1056   }
1057   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
1058
1059   /* Get parameters */
1060   msg = (struct GNUNET_MESH_ConnectionCreate *) message;
1061   cid = &msg->cid;
1062   id = (struct GNUNET_PeerIdentity *) &msg[1];
1063   LOG (GNUNET_ERROR_TYPE_DEBUG,
1064               "    connection %s (%s).\n",
1065               GNUNET_h2s (cid), GNUNET_i2s (id));
1066
1067   /* Create connection */
1068   c = connection_get (cid);
1069   if (NULL == c)
1070   {
1071     /* Create path */
1072     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating path...\n");
1073     path = path_new (size);
1074     own_pos = 0;
1075     for (i = 0; i < size; i++)
1076     {
1077       LOG (GNUNET_ERROR_TYPE_DEBUG, "  ... adding %s\n",
1078                   GNUNET_i2s (&id[i]));
1079       path->peers[i] = GNUNET_PEER_intern (&id[i]);
1080       if (path->peers[i] == myid)
1081         own_pos = i;
1082     }
1083     if (own_pos == 0 && path->peers[own_pos] != myid)
1084     {
1085       /* create path: self not found in path through self */
1086       GNUNET_break_op (0);
1087       path_destroy (path);
1088       return GNUNET_OK;
1089     }
1090     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
1091     GMP_add_path_to_all (path, GNUNET_NO);
1092         LOG (GNUNET_ERROR_TYPE_DEBUG, "  Creating connection\n");
1093     c = GMC_new (cid, NULL, path_duplicate (path), own_pos);
1094     if (NULL == c)
1095       return GNUNET_OK;
1096     connection_reset_timeout (c, GNUNET_YES);
1097   }
1098   else
1099   {
1100     path = NULL;
1101   }
1102   if (MESH_CONNECTION_NEW == c->state)
1103     connection_change_state (c, MESH_CONNECTION_SENT);
1104
1105   /* Remember peers */
1106   dest_peer = GMP_get (&id[size - 1]);
1107   orig_peer = GMP_get (&id[0]);
1108
1109   /* Is it a connection to us? */
1110   if (c->own_pos == size - 1)
1111   {
1112     LOG (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
1113     GMP_add_path_to_origin (orig_peer, path, GNUNET_YES);
1114
1115     add_to_peer (c, orig_peer);
1116     if (MESH_TUNNEL3_NEW == GMT_get_state (c->t))
1117       GMT_change_state (c->t,  MESH_TUNNEL3_WAITING);
1118
1119     send_connection_ack (c, GNUNET_NO);
1120     if (MESH_CONNECTION_SENT == c->state)
1121       connection_change_state (c, MESH_CONNECTION_ACK);
1122
1123     /* Keep tunnel alive in direction dest->owner*/
1124     connection_reset_timeout (c, GNUNET_NO);
1125   }
1126   else
1127   {
1128     /* It's for somebody else! Retransmit. */
1129     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
1130     GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
1131     GMP_add_path_to_origin (orig_peer, path, GNUNET_NO);
1132     GMC_send_prebuilt_message (message, c, NULL, GNUNET_YES);
1133   }
1134   return GNUNET_OK;
1135 }
1136
1137
1138 /**
1139  * Core handler for path confirmations.
1140  *
1141  * @param cls closure
1142  * @param message message
1143  * @param peer peer identity this notification is about
1144  *
1145  * @return GNUNET_OK to keep the connection open,
1146  *         GNUNET_SYSERR to close it (signal serious error)
1147  */
1148 int
1149 GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer,
1150                     const struct GNUNET_MessageHeader *message)
1151 {
1152   struct GNUNET_MESH_ConnectionACK *msg;
1153   struct MeshConnection *c;
1154   struct MeshPeerPath *p;
1155   struct MeshPeer *pi;
1156   int fwd;
1157
1158   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1159   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a connection ACK msg\n");
1160   msg = (struct GNUNET_MESH_ConnectionACK *) message;
1161   LOG (GNUNET_ERROR_TYPE_DEBUG, "  on connection %s\n",
1162               GNUNET_h2s (&msg->cid));
1163   c = connection_get (&msg->cid);
1164   if (NULL == c)
1165   {
1166     GNUNET_STATISTICS_update (stats, "# control on unknown connection",
1167                               1, GNUNET_NO);
1168     LOG (GNUNET_ERROR_TYPE_DEBUG, "  don't know the connection!\n");
1169     return GNUNET_OK;
1170   }
1171
1172
1173   LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n",
1174               GNUNET_i2s (peer));
1175   pi = GMP_get (peer);
1176   if (get_next_hop (c) == pi)
1177   {
1178     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
1179     fwd = GNUNET_NO;
1180     if (MESH_CONNECTION_SENT == c->state)
1181       connection_change_state (c, MESH_CONNECTION_ACK);
1182   }
1183   else if (get_prev_hop (c) == pi)
1184   {
1185     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK\n");
1186     fwd = GNUNET_YES;
1187     connection_change_state (c, MESH_CONNECTION_READY);
1188   }
1189   else
1190   {
1191     GNUNET_break_op (0);
1192     return GNUNET_OK;
1193   }
1194   connection_reset_timeout (c, fwd);
1195
1196   /* Add path to peers? */
1197   p = c->path;
1198   if (NULL != p)
1199   {
1200     GMP_add_path_to_all (p, GNUNET_YES);
1201   }
1202   else
1203   {
1204     GNUNET_break (0);
1205   }
1206
1207   /* Message for us as creator? */
1208   if (GMC_is_origin (c, GNUNET_YES))
1209   {
1210     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
1211     connection_change_state (c, MESH_CONNECTION_READY);
1212     GMT_change_state (c->t, MESH_TUNNEL3_READY);
1213     send_connection_ack (c, GNUNET_YES);
1214     GMT_send_queued_data (c->t, GNUNET_YES);
1215     return GNUNET_OK;
1216   }
1217
1218   /* Message for us as destination? */
1219   if (GMC_is_terminal (c, GNUNET_YES))
1220   {
1221     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
1222     connection_change_state (c, MESH_CONNECTION_READY);
1223     GMT_change_state (c->t, MESH_TUNNEL3_READY);
1224     GMT_send_queued_data (c->t, GNUNET_NO);
1225     return GNUNET_OK;
1226   }
1227
1228   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1229   GMC_send_prebuilt_message (message, c, NULL, fwd);
1230   return GNUNET_OK;
1231 }
1232
1233
1234 /**
1235  * Core handler for notifications of broken paths
1236  *
1237  * @param cls Closure (unused).
1238  * @param id Peer identity of sending neighbor.
1239  * @param message Message.
1240  *
1241  * @return GNUNET_OK to keep the connection open,
1242  *         GNUNET_SYSERR to close it (signal serious error)
1243  */
1244 int
1245 GMC_handle_broken (void* cls,
1246                    const struct GNUNET_PeerIdentity* id,
1247                    const struct GNUNET_MessageHeader* message)
1248 {
1249   struct GNUNET_MESH_ConnectionBroken *msg;
1250   struct MeshConnection *c;
1251   int fwd;
1252
1253   LOG (GNUNET_ERROR_TYPE_DEBUG,
1254               "Received a CONNECTION BROKEN msg from %s\n", GNUNET_i2s (id));
1255   msg = (struct GNUNET_MESH_ConnectionBroken *) message;
1256   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1257               GNUNET_i2s (&msg->peer1));
1258   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
1259               GNUNET_i2s (&msg->peer2));
1260   c = connection_get (&msg->cid);
1261   if (NULL == c)
1262   {
1263     GNUNET_break_op (0);
1264     return GNUNET_OK;
1265   }
1266
1267   fwd = is_fwd (c, id);
1268   connection_cancel_queues (c, !fwd);
1269   if (GMC_is_terminal (c, fwd))
1270   {
1271     if (0 < c->pending_messages)
1272       c->destroy = GNUNET_YES;
1273     else
1274       GMC_destroy (c);
1275   }
1276   else
1277   {
1278     GMC_send_prebuilt_message (message, c, NULL, fwd);
1279     c->destroy = GNUNET_YES;
1280   }
1281
1282   return GNUNET_OK;
1283
1284 }
1285
1286
1287 /**
1288  * Core handler for tunnel destruction
1289  *
1290  * @param cls Closure (unused).
1291  * @param peer Peer identity of sending neighbor.
1292  * @param message Message.
1293  *
1294  * @return GNUNET_OK to keep the connection open,
1295  *         GNUNET_SYSERR to close it (signal serious error)
1296  */
1297 int
1298 GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
1299                     const struct GNUNET_MessageHeader *message)
1300 {
1301   struct GNUNET_MESH_ConnectionDestroy *msg;
1302   struct MeshConnection *c;
1303   int fwd;
1304
1305   msg = (struct GNUNET_MESH_ConnectionDestroy *) message;
1306   LOG (GNUNET_ERROR_TYPE_DEBUG,
1307               "Got a CONNECTION DESTROY message from %s\n",
1308               GNUNET_i2s (peer));
1309   LOG (GNUNET_ERROR_TYPE_DEBUG,
1310               "  for connection %s\n",
1311               GNUNET_h2s (&msg->cid));
1312   c = connection_get (&msg->cid);
1313   if (NULL == c)
1314   {
1315     /* Probably already got the message from another path,
1316      * destroyed the tunnel and retransmitted to children.
1317      * Safe to ignore.
1318      */
1319     GNUNET_STATISTICS_update (stats, "# control on unknown tunnel",
1320                               1, GNUNET_NO);
1321     return GNUNET_OK;
1322   }
1323   fwd = is_fwd (c, peer);
1324   if (GNUNET_SYSERR == fwd)
1325   {
1326     GNUNET_break_op (0);
1327     return GNUNET_OK;
1328   }
1329   GMC_send_prebuilt_message (message, c, NULL, fwd);
1330   c->destroy = GNUNET_YES;
1331
1332   return GNUNET_OK;
1333 }
1334
1335 /**
1336  * Generic handler for mesh network encrypted traffic.
1337  *
1338  * @param peer Peer identity this notification is about.
1339  * @param message Encrypted message.
1340  *
1341  * @return GNUNET_OK to keep the connection open,
1342  *         GNUNET_SYSERR to close it (signal serious error)
1343  */
1344 static int
1345 handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer,
1346                        const struct GNUNET_MESH_Encrypted *msg)
1347 {
1348   struct MeshConnection *c;
1349   struct MeshPeer *neighbor;
1350   struct MeshFlowControl *fc;
1351   GNUNET_PEER_Id peer_id;
1352   uint32_t pid;
1353   uint32_t ttl;
1354   uint16_t type;
1355   size_t size;
1356   int fwd;
1357
1358   /* Check size */
1359   size = ntohs (msg->header.size);
1360   if (size <
1361       sizeof (struct GNUNET_MESH_Encrypted) +
1362       sizeof (struct GNUNET_MessageHeader))
1363   {
1364     GNUNET_break_op (0);
1365     return GNUNET_OK;
1366   }
1367   type = ntohs (msg->header.type);
1368   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1369   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a %s message from %s\n",
1370               GNUNET_MESH_DEBUG_M2S (type), GNUNET_i2s (peer));
1371
1372   /* Check connection */
1373   c = connection_get (&msg->cid);
1374   if (NULL == c)
1375   {
1376     GNUNET_STATISTICS_update (stats, "# unknown connection", 1, GNUNET_NO);
1377     LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n");
1378     return GNUNET_OK;
1379   }
1380
1381   /* Check if origin is as expected */
1382   neighbor = get_prev_hop (c);
1383   peer_id = GNUNET_PEER_search (peer);
1384   if (peer_id == GMP_get_short_id (neighbor))
1385   {
1386     fwd = GNUNET_YES;
1387   }
1388   else
1389   {
1390     neighbor = get_next_hop (c);
1391     if (peer_id == GMP_get_short_id (neighbor))
1392     {
1393       fwd = GNUNET_NO;
1394     }
1395     else
1396     {
1397       GNUNET_break_op (0);
1398       return GNUNET_OK;
1399     }
1400   }
1401   fc = fwd ? &c->bck_fc : &c->fwd_fc;
1402
1403   /* Check PID */
1404   pid = ntohl (msg->pid);
1405   if (GMC_is_pid_bigger (pid, fc->last_ack_sent))
1406   {
1407     GNUNET_STATISTICS_update (stats, "# unsolicited message", 1, GNUNET_NO);
1408     LOG (GNUNET_ERROR_TYPE_DEBUG,
1409                 "WARNING Received PID %u, (prev %u), ACK %u\n",
1410                 pid, fc->last_pid_recv, fc->last_ack_sent);
1411     return GNUNET_OK;
1412   }
1413   if (GNUNET_NO == GMC_is_pid_bigger (pid, fc->last_pid_recv))
1414   {
1415     GNUNET_STATISTICS_update (stats, "# duplicate PID", 1, GNUNET_NO);
1416     LOG (GNUNET_ERROR_TYPE_DEBUG,
1417                 " Pid %u not expected (%u+), dropping!\n",
1418                 pid, fc->last_pid_recv + 1);
1419     return GNUNET_OK;
1420   }
1421   if (MESH_CONNECTION_SENT == c->state)
1422     connection_change_state (c, MESH_CONNECTION_READY);
1423   connection_reset_timeout (c, fwd);
1424   fc->last_pid_recv = pid;
1425
1426   /* Is this message for us? */
1427   if (GMC_is_terminal (c, fwd))
1428   {
1429     /* TODO signature verification */
1430     LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
1431     GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO);
1432
1433     if (NULL == c->t)
1434     {
1435       GNUNET_break (0);
1436       return GNUNET_OK;
1437     }
1438     fc->last_pid_recv = pid;
1439     GMT_handle_encrypted (c->t, msg, fwd);
1440     GMC_send_ack (c, NULL, fwd);
1441     return GNUNET_OK;
1442   }
1443
1444   /* Message not for us: forward to next hop */
1445   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
1446   ttl = ntohl (msg->ttl);
1447   LOG (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ttl);
1448   if (ttl == 0)
1449   {
1450     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
1451     LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
1452     GMC_send_ack (c, NULL, fwd);
1453     return GNUNET_OK;
1454   }
1455   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
1456
1457   GMC_send_prebuilt_message (&msg->header, c, NULL, fwd);
1458
1459   return GNUNET_OK;
1460 }
1461
1462
1463 /**
1464  * Core handler for encrypted mesh network traffic (channel mgmt, data).
1465  *
1466  * @param cls Closure (unused).
1467  * @param message Message received.
1468  * @param peer Peer who sent the message.
1469  *
1470  * @return GNUNET_OK to keep the connection open,
1471  *         GNUNET_SYSERR to close it (signal serious error)
1472  */
1473 int
1474 GMC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
1475                       const struct GNUNET_MessageHeader *message)
1476 {
1477   return handle_mesh_encrypted (peer,
1478                                 (struct GNUNET_MESH_Encrypted *)message);
1479 }
1480
1481
1482 /**
1483  * Core handler for mesh network traffic point-to-point acks.
1484  *
1485  * @param cls closure
1486  * @param message message
1487  * @param peer peer identity this notification is about
1488  *
1489  * @return GNUNET_OK to keep the connection open,
1490  *         GNUNET_SYSERR to close it (signal serious error)
1491  */
1492 int
1493 GMC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
1494                 const struct GNUNET_MessageHeader *message)
1495 {
1496   struct GNUNET_MESH_ACK *msg;
1497   struct MeshConnection *c;
1498   struct MeshFlowControl *fc;
1499   GNUNET_PEER_Id id;
1500   uint32_t ack;
1501   int fwd;
1502
1503   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1504   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK packet from %s!\n",
1505               GNUNET_i2s (peer));
1506   msg = (struct GNUNET_MESH_ACK *) message;
1507
1508   c = connection_get (&msg->cid);
1509
1510   if (NULL == c)
1511   {
1512     GNUNET_STATISTICS_update (stats, "# ack on unknown connection", 1,
1513                               GNUNET_NO);
1514     return GNUNET_OK;
1515   }
1516
1517   /* Is this a forward or backward ACK? */
1518   id = GNUNET_PEER_search (peer);
1519   if (GMP_get_short_id (get_next_hop (c)) == id)
1520   {
1521     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
1522     fc = &c->fwd_fc;
1523     fwd = GNUNET_YES;
1524   }
1525   else if (GMP_get_short_id (get_prev_hop (c)) == id)
1526   {
1527     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
1528     fc = &c->bck_fc;
1529     fwd = GNUNET_NO;
1530   }
1531   else
1532   {
1533     GNUNET_break_op (0);
1534     return GNUNET_OK;
1535   }
1536
1537   ack = ntohl (msg->ack);
1538   LOG (GNUNET_ERROR_TYPE_DEBUG, "  ACK %u (was %u)\n",
1539               ack, fc->last_ack_recv);
1540   if (GMC_is_pid_bigger (ack, fc->last_ack_recv))
1541     fc->last_ack_recv = ack;
1542
1543   /* Cancel polling if the ACK is big enough. */
1544   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task &&
1545       GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
1546   {
1547     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
1548     GNUNET_SCHEDULER_cancel (fc->poll_task);
1549     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
1550     fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
1551   }
1552
1553   connection_unlock_queue (c, fwd);
1554
1555   return GNUNET_OK;
1556 }
1557
1558
1559 /**
1560  * Core handler for mesh network traffic point-to-point ack polls.
1561  *
1562  * @param cls closure
1563  * @param message message
1564  * @param peer peer identity this notification is about
1565  *
1566  * @return GNUNET_OK to keep the connection open,
1567  *         GNUNET_SYSERR to close it (signal serious error)
1568  */
1569 int
1570 GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
1571                  const struct GNUNET_MessageHeader *message)
1572 {
1573   struct GNUNET_MESH_Poll *msg;
1574   struct MeshConnection *c;
1575   struct MeshFlowControl *fc;
1576   GNUNET_PEER_Id id;
1577   uint32_t pid;
1578   int fwd;
1579
1580   LOG (GNUNET_ERROR_TYPE_DEBUG, "\n\n");
1581   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a POLL packet from %s!\n",
1582               GNUNET_i2s (peer));
1583
1584   msg = (struct GNUNET_MESH_Poll *) message;
1585
1586   c = connection_get (&msg->cid);
1587
1588   if (NULL == c)
1589   {
1590     GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
1591                               GNUNET_NO);
1592     GNUNET_break_op (0);
1593     return GNUNET_OK;
1594   }
1595
1596   /* Is this a forward or backward ACK?
1597    * Note: a poll should never be needed in a loopback case,
1598    * since there is no possiblility of packet loss there, so
1599    * this way of discerining FWD/BCK should not be a problem.
1600    */
1601   id = GNUNET_PEER_search (peer);
1602   if (GMP_get_short_id (get_next_hop (c)) == id)
1603   {
1604     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
1605     fc = &c->fwd_fc;
1606   }
1607   else if (GMP_get_short_id (get_prev_hop (c)) == id)
1608   {
1609     LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
1610     fc = &c->bck_fc;
1611   }
1612   else
1613   {
1614     GNUNET_break_op (0);
1615     return GNUNET_OK;
1616   }
1617
1618   pid = ntohl (msg->pid);
1619   LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n",
1620               pid, fc->last_pid_recv);
1621   fc->last_pid_recv = pid;
1622   fwd = fc == &c->fwd_fc;
1623   GMC_send_ack (c, NULL, fwd);
1624
1625   return GNUNET_OK;
1626 }
1627
1628
1629 /**
1630  * Core handler for mesh keepalives.
1631  *
1632  * @param cls closure
1633  * @param message message
1634  * @param peer peer identity this notification is about
1635  * @return GNUNET_OK to keep the connection open,
1636  *         GNUNET_SYSERR to close it (signal serious error)
1637  *
1638  * TODO: Check who we got this from, to validate route.
1639  */
1640 int
1641 GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer,
1642                       const struct GNUNET_MessageHeader *message)
1643 {
1644   struct GNUNET_MESH_ConnectionKeepAlive *msg;
1645   struct MeshConnection *c;
1646   struct MeshPeer *neighbor;
1647   int fwd;
1648
1649   msg = (struct GNUNET_MESH_ConnectionKeepAlive *) message;
1650   LOG (GNUNET_ERROR_TYPE_DEBUG, "got a keepalive packet from %s\n",
1651               GNUNET_i2s (peer));
1652
1653   c = connection_get (&msg->cid);
1654   if (NULL == c)
1655   {
1656     GNUNET_STATISTICS_update (stats, "# keepalive on unknown connection", 1,
1657                               GNUNET_NO);
1658     return GNUNET_OK;
1659   }
1660
1661   fwd = GNUNET_MESSAGE_TYPE_MESH_FWD_KEEPALIVE == ntohs (message->type) ?
1662         GNUNET_YES : GNUNET_NO;
1663
1664   /* Check if origin is as expected */
1665   neighbor = get_hop (c, fwd);
1666   if (GNUNET_PEER_search (peer) != GMP_get_short_id (neighbor))
1667   {
1668     GNUNET_break_op (0);
1669     return GNUNET_OK;
1670   }
1671
1672   connection_change_state (c, MESH_CONNECTION_READY);
1673   connection_reset_timeout (c, fwd);
1674
1675   if (GMC_is_terminal (c, fwd))
1676     return GNUNET_OK;
1677
1678   GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO);
1679   GMC_send_prebuilt_message (message, c, NULL, fwd);
1680
1681   return GNUNET_OK;
1682 }
1683
1684
1685 /**
1686  * Send an ACK on the appropriate connection/channel, depending on
1687  * the direction and the position of the peer.
1688  *
1689  * @param c Which connection to send the hop-by-hop ACK.
1690  * @param ch Channel, if any.
1691  * @param fwd Is this a fwd ACK? (will go dest->root)
1692  */
1693 void
1694 GMC_send_ack (struct MeshConnection *c, struct MeshChannel *ch, int fwd)
1695 {
1696   unsigned int buffer;
1697
1698   LOG (GNUNET_ERROR_TYPE_DEBUG,
1699               "send ack %s on %p %p\n",
1700               fwd ? "FWD" : "BCK", c, ch);
1701
1702   /* Get available bufffer space */
1703   if (NULL == c || GMC_is_terminal (c, fwd))
1704   {
1705     struct MeshTunnel3 *t;
1706     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all connections\n");
1707     t = (NULL == c) ? GMCH_get_tunnel (ch) : GMC_get_tunnel (c);
1708     buffer = GMT_get_buffer (t, fwd);
1709   }
1710   else
1711   {
1712     LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
1713     buffer = GMC_get_buffer (c, fwd);
1714   }
1715   LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
1716
1717   /* Send available buffer space */
1718   if ( (NULL != ch && GMCH_is_origin (ch, fwd)) ||
1719        (NULL != c && GMC_is_origin (c, fwd)) )
1720   {
1721     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channel...\n");
1722     if (0 < buffer)
1723     {
1724       GNUNET_assert (NULL != ch);
1725       LOG (GNUNET_ERROR_TYPE_DEBUG, "  really sending!\n");
1726       GMCH_send_data_ack (ch, fwd);
1727     }
1728   }
1729   else if (NULL == c)
1730   {
1731     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on all connections\n");
1732     GNUNET_assert (NULL != ch);
1733     GMT_send_acks (GMCH_get_tunnel (ch), buffer, fwd);
1734   }
1735   else
1736   {
1737     LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
1738     send_ack (c, buffer, fwd);
1739   }
1740 }
1741
1742
1743 /**
1744  * Initialize the connections subsystem
1745  *
1746  * @param c Configuration handle.
1747  */
1748 void
1749 GMC_init (const struct GNUNET_CONFIGURATION_Handle *c)
1750 {
1751   LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n");
1752   if (GNUNET_OK !=
1753       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_MSGS_QUEUE",
1754                                              &max_msgs_queue))
1755   {
1756     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
1757                                "MESH", "MAX_MSGS_QUEUE", "MISSING");
1758     GNUNET_SCHEDULER_shutdown ();
1759     return;
1760   }
1761
1762   if (GNUNET_OK !=
1763       GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_CONNECTIONS",
1764                                              &max_connections))
1765   {
1766     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
1767                                "MESH", "MAX_CONNECTIONS", "MISSING");
1768     GNUNET_SCHEDULER_shutdown ();
1769     return;
1770   }
1771
1772   if (GNUNET_OK !=
1773       GNUNET_CONFIGURATION_get_value_time (c, "MESH", "REFRESH_CONNECTION_TIME",
1774                                            &refresh_connection_time))
1775   {
1776     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
1777                                "MESH", "REFRESH_CONNECTION_TIME", "MISSING");
1778     GNUNET_SCHEDULER_shutdown ();
1779     return;
1780   }
1781   connections = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES);
1782 }
1783
1784 /**
1785  * Shut down the connections subsystem.
1786  */
1787 void
1788 GMC_shutdown (void)
1789 {
1790   GNUNET_CONTAINER_multihashmap_destroy (connections);
1791 }
1792
1793
1794 struct MeshConnection *
1795 GMC_new (const struct GNUNET_HashCode *cid,
1796          struct MeshTunnel3 *t,
1797          struct MeshPeerPath *p,
1798          unsigned int own_pos)
1799 {
1800   struct MeshConnection *c;
1801
1802   c = GNUNET_new (struct MeshConnection);
1803   c->id = *cid;
1804   GNUNET_CONTAINER_multihashmap_put (connections, &c->id, c,
1805                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1806   fc_init (&c->fwd_fc);
1807   fc_init (&c->bck_fc);
1808   c->fwd_fc.c = c;
1809   c->bck_fc.c = c;
1810
1811   c->t = t;
1812   if (own_pos > p->length - 1)
1813   {
1814     GNUNET_break (0);
1815     GMC_destroy (c);
1816     return NULL;
1817   }
1818   c->own_pos = own_pos;
1819   c->path = p;
1820
1821   if (0 == own_pos)
1822   {
1823     c->fwd_maintenance_task =
1824             GNUNET_SCHEDULER_add_delayed (refresh_connection_time,
1825                                           &connection_fwd_keepalive, c);
1826   }
1827   register_neighbors (c);
1828   return c;
1829 }
1830
1831
1832 void
1833 GMC_destroy (struct MeshConnection *c)
1834 {
1835   if (NULL == c)
1836     return;
1837
1838   LOG (GNUNET_ERROR_TYPE_DEBUG, "destroying connection %s\n",
1839        GNUNET_h2s (&c->id));
1840
1841   /* Cancel all traffic */
1842   connection_cancel_queues (c, GNUNET_YES);
1843   connection_cancel_queues (c, GNUNET_NO);
1844
1845   /* Cancel maintainance task (keepalive/timeout) */
1846   if (GNUNET_SCHEDULER_NO_TASK != c->fwd_maintenance_task)
1847     GNUNET_SCHEDULER_cancel (c->fwd_maintenance_task);
1848   if (GNUNET_SCHEDULER_NO_TASK != c->bck_maintenance_task)
1849     GNUNET_SCHEDULER_cancel (c->bck_maintenance_task);
1850
1851   /* Unregister from neighbors */
1852   unregister_neighbors (c);
1853
1854   /* Delete */
1855   GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO);
1856   if (NULL != c->t)
1857     GMT_remove_connection (c->t, c);
1858   GNUNET_free (c);
1859 }
1860
1861 /**
1862  * Get the connection ID.
1863  *
1864  * @param c Connection to get the ID from.
1865  *
1866  * @return ID of the connection.
1867  */
1868 const struct GNUNET_HashCode *
1869 GMC_get_id (const struct MeshConnection *c)
1870 {
1871   return &c->id;
1872 }
1873
1874
1875 /**
1876  * Get the connection path.
1877  *
1878  * @param c Connection to get the path from.
1879  *
1880  * @return path used by the connection.
1881  */
1882 const struct MeshPeerPath *
1883 GMC_get_path (const struct MeshConnection *c)
1884 {
1885   return c->path;
1886 }
1887
1888
1889 /**
1890  * Get the connection state.
1891  *
1892  * @param c Connection to get the state from.
1893  *
1894  * @return state of the connection.
1895  */
1896 enum MeshConnectionState
1897 GMC_get_state (const struct MeshConnection *c)
1898 {
1899   return c->state;
1900 }
1901
1902 /**
1903  * Get the connection tunnel.
1904  *
1905  * @param c Connection to get the tunnel from.
1906  *
1907  * @return tunnel of the connection.
1908  */
1909 struct MeshTunnel3 *
1910 GMC_get_tunnel (const struct MeshConnection *c)
1911 {
1912   return c->t;
1913 }
1914
1915
1916 /**
1917  * Get free buffer space in a connection.
1918  *
1919  * @param c Connection.
1920  * @param fwd Is query about FWD traffic?
1921  *
1922  * @return Free buffer space [0 - max_msgs_queue/max_connections]
1923  */
1924 unsigned int
1925 GMC_get_buffer (struct MeshConnection *c, int fwd)
1926 {
1927   struct MeshFlowControl *fc;
1928
1929   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1930
1931   return (fc->queue_max - fc->queue_n);
1932 }
1933
1934 /**
1935  * Get how many messages have we allowed to send to us from a direction..
1936  *
1937  * @param c Connection.
1938  * @param fwd Are we asking about traffic from FWD (BCK messages)?
1939  *
1940  * @return last_ack_sent - last_pid_recv
1941  */
1942 unsigned int
1943 GMC_get_allowed (struct MeshConnection *c, int fwd)
1944 {
1945   struct MeshFlowControl *fc;
1946
1947   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1948   if (GMC_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
1949   {
1950     return 0;
1951   }
1952   return (fc->last_ack_sent - fc->last_pid_recv);
1953 }
1954
1955 /**
1956  * Get messages queued in a connection.
1957  *
1958  * @param c Connection.
1959  * @param fwd Is query about FWD traffic?
1960  *
1961  * @return Number of messages queued.
1962  */
1963 unsigned int
1964 GMC_get_qn (struct MeshConnection *c, int fwd)
1965 {
1966   struct MeshFlowControl *fc;
1967
1968   fc = fwd ? &c->fwd_fc : &c->bck_fc;
1969
1970   return fc->queue_n;
1971 }
1972
1973
1974 /**
1975  * Allow the connection to advertise a buffer of the given size.
1976  *
1977  * The connection will send an @c fwd ACK message (so: in direction !fwd)
1978  * allowing up to last_pid_recv + buffer.
1979  *
1980  * @param c Connection.
1981  * @param buffer How many more messages the connection can accept.
1982  * @param fwd Is this about FWD traffic? (The ack will go dest->root).
1983  */
1984 void
1985 GMC_allow (struct MeshConnection *c, unsigned int buffer, int fwd)
1986 {
1987   send_ack (c, buffer, fwd);
1988 }
1989
1990
1991 /**
1992  * Notify other peers on a connection of a broken link. Mark connections
1993  * to destroy after all traffic has been sent.
1994  *
1995  * @param c Connection on which there has been a disconnection.
1996  * @param peer Peer that disconnected.
1997  */
1998 void
1999 GMC_notify_broken (struct MeshConnection *c,
2000                    struct MeshPeer *peer)
2001 {
2002   int fwd;
2003
2004   fwd = peer == get_prev_hop (c);
2005
2006   connection_cancel_queues (c, !fwd);
2007   if (GMC_is_terminal (c, fwd))
2008   {
2009     /* Local shutdown, no one to notify about this. */
2010     GMC_destroy (c);
2011     return;
2012   }
2013
2014   send_broken (c, &my_full_id, GMP_get_id (peer), fwd);
2015
2016   /* Connection will have at least one pending message
2017    * (the one we just scheduled), so no point in checking whether to
2018    * destroy immediately. */
2019   c->destroy = GNUNET_YES;
2020
2021   return;
2022 }
2023
2024
2025 /**
2026  * Is this peer the first one on the connection?
2027  *
2028  * @param c Connection.
2029  * @param fwd Is this about fwd traffic?
2030  *
2031  * @return GNUNET_YES if origin, GNUNET_NO if relay/terminal.
2032  */
2033 int
2034 GMC_is_origin (struct MeshConnection *c, int fwd)
2035 {
2036   if (!fwd && c->path->length - 1 == c->own_pos )
2037     return GNUNET_YES;
2038   if (fwd && 0 == c->own_pos)
2039     return GNUNET_YES;
2040   return GNUNET_NO;
2041 }
2042
2043
2044 /**
2045  * Is this peer the last one on the connection?
2046  *
2047  * @param c Connection.
2048  * @param fwd Is this about fwd traffic?
2049  *            Note that the ROOT is the terminal for BCK traffic!
2050  *
2051  * @return GNUNET_YES if terminal, GNUNET_NO if relay/origin.
2052  */
2053 int
2054 GMC_is_terminal (struct MeshConnection *c, int fwd)
2055 {
2056   return GMC_is_origin (c, !fwd);
2057 }
2058
2059
2060 /**
2061  * See if we are allowed to send by the next hop in the given direction.
2062  *
2063  * @param c Connection.
2064  * @param fwd Is this about fwd traffic?
2065  *
2066  * @return GNUNET_YES in case it's OK.
2067  */
2068 int
2069 GMC_is_sendable (struct MeshConnection *c, int fwd)
2070 {
2071   struct MeshFlowControl *fc;
2072
2073   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2074   if (GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
2075     return GNUNET_YES;
2076   return GNUNET_NO;
2077 }
2078
2079 /**
2080  * Sends an already built message on a connection, properly registering
2081  * all used resources.
2082  *
2083  * @param message Message to send. Function makes a copy of it.
2084  *                If message is not hop-by-hop, decrements TTL of copy.
2085  * @param c Connection on which this message is transmitted.
2086  * @param ch Channel on which this message is transmitted, or NULL.
2087  * @param fwd Is this a fwd message?
2088  */
2089 void
2090 GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
2091                            struct MeshConnection *c,
2092                            struct MeshChannel *ch,
2093                            int fwd)
2094 {
2095   struct MeshFlowControl *fc;
2096   void *data;
2097   size_t size;
2098   uint16_t type;
2099   int droppable;
2100
2101   size = ntohs (message->size);
2102   data = GNUNET_malloc (size);
2103   memcpy (data, message, size);
2104   type = ntohs (message->type);
2105   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u) on connection %s\n",
2106               GNUNET_MESH_DEBUG_M2S (type), size, GNUNET_h2s (&c->id));
2107
2108   droppable = GNUNET_YES;
2109   switch (type)
2110   {
2111     struct GNUNET_MESH_Encrypted *emsg;
2112     struct GNUNET_MESH_ACK       *amsg;
2113     struct GNUNET_MESH_Poll      *pmsg;
2114     struct GNUNET_MESH_ConnectionDestroy *dmsg;
2115     struct GNUNET_MESH_ConnectionBroken  *bmsg;
2116     uint32_t ttl;
2117
2118     case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED:
2119       emsg = (struct GNUNET_MESH_Encrypted *) data;
2120       ttl = ntohl (emsg->ttl);
2121       if (0 == ttl)
2122       {
2123         GNUNET_break_op (0);
2124         return;
2125       }
2126       emsg->cid = c->id;
2127       emsg->ttl = htonl (ttl - 1);
2128       emsg->pid = htonl (fwd ? c->fwd_fc.next_pid++ : c->bck_fc.next_pid++);
2129       LOG (GNUNET_ERROR_TYPE_DEBUG, " pid %u\n", ntohl (emsg->pid));
2130       break;
2131
2132     case GNUNET_MESSAGE_TYPE_MESH_ACK:
2133       amsg = (struct GNUNET_MESH_ACK *) data;
2134       amsg->cid = c->id;
2135       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
2136       droppable = GNUNET_NO;
2137       break;
2138
2139     case GNUNET_MESSAGE_TYPE_MESH_POLL:
2140       pmsg = (struct GNUNET_MESH_Poll *) data;
2141       pmsg->cid = c->id;
2142       pmsg->pid = htonl (fwd ? c->fwd_fc.last_pid_sent : c->bck_fc.last_pid_sent);
2143       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
2144       droppable = GNUNET_NO;
2145       break;
2146
2147     case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY:
2148       dmsg = (struct GNUNET_MESH_ConnectionDestroy *) data;
2149       dmsg->cid = c->id;
2150       dmsg->reserved = 0;
2151       break;
2152
2153     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
2154       bmsg = (struct GNUNET_MESH_ConnectionBroken *) data;
2155       bmsg->cid = c->id;
2156       bmsg->reserved = 0;
2157       break;
2158
2159     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
2160     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
2161       break;
2162
2163     default:
2164       GNUNET_break (0);
2165   }
2166
2167   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2168   if (fc->queue_n >= fc->queue_max && droppable)
2169   {
2170     GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
2171                               1, GNUNET_NO);
2172     GNUNET_break (0);
2173     LOG (GNUNET_ERROR_TYPE_DEBUG,
2174                 "queue full: %u/%u\n",
2175                 fc->queue_n, fc->queue_max);
2176     return; /* Drop this message */
2177   }
2178
2179   LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent);
2180   LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack %u\n", fc->last_ack_recv);
2181   LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
2182   if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
2183   {
2184     GMC_start_poll (c, fwd);
2185   }
2186   fc->queue_n++;
2187   c->pending_messages++;
2188
2189   GMP_queue_add (get_hop (c, fwd), data, type, size, c, ch, fwd,
2190                  &message_sent, NULL);
2191 }
2192
2193
2194 /**
2195  * Sends a CREATE CONNECTION message for a path to a peer.
2196  * Changes the connection and tunnel states if necessary.
2197  *
2198  * @param connection Connection to create.
2199  */
2200 void
2201 GMC_send_create (struct MeshConnection *connection)
2202 {
2203   enum MeshTunnel3State state;
2204   size_t size;
2205
2206   size = sizeof (struct GNUNET_MESH_ConnectionCreate);
2207   size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
2208   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n");
2209   GMP_queue_add (get_next_hop (connection), NULL,
2210                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE,
2211                  size, connection, NULL,
2212                  GNUNET_YES, &message_sent, NULL);
2213   state = GMT_get_state (connection->t);
2214   if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state)
2215     GMT_change_state (connection->t, MESH_TUNNEL3_WAITING);
2216   if (MESH_CONNECTION_NEW == connection->state)
2217     connection_change_state (connection, MESH_CONNECTION_SENT);
2218   connection->fwd_fc.queue_n++;
2219 }
2220
2221
2222 /**
2223  * Send a message to all peers in this connection that the connection
2224  * is no longer valid.
2225  *
2226  * If some peer should not receive the message, it should be zero'ed out
2227  * before calling this function.
2228  *
2229  * @param c The connection whose peers to notify.
2230  */
2231 void
2232 GMC_send_destroy (struct MeshConnection *c)
2233 {
2234   struct GNUNET_MESH_ConnectionDestroy msg;
2235
2236   if (GNUNET_YES == c->destroy)
2237     return;
2238
2239   msg.header.size = htons (sizeof (msg));
2240   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY);;
2241   msg.cid = c->id;
2242   LOG (GNUNET_ERROR_TYPE_DEBUG,
2243               "  sending connection destroy for connection %s\n",
2244               GNUNET_h2s (&c->id));
2245
2246   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_YES))
2247     GMC_send_prebuilt_message (&msg.header, c, NULL, GNUNET_YES);
2248   if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO))
2249     GMC_send_prebuilt_message (&msg.header, c, NULL, GNUNET_NO);
2250   c->destroy = GNUNET_YES;
2251 }
2252
2253
2254 /**
2255  * @brief Start a polling timer for the connection.
2256  *
2257  * When a neighbor does not accept more traffic on the connection it could be
2258  * caused by a simple congestion or by a lost ACK. Polling enables to check
2259  * for the lastest ACK status for a connection.
2260  *
2261  * @param c Connection.
2262  * @param fwd Should we poll in the FWD direction?
2263  */
2264 void
2265 GMC_start_poll (struct MeshConnection *c, int fwd)
2266 {
2267   struct MeshFlowControl *fc;
2268
2269   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2270   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
2271   {
2272     return;
2273   }
2274   fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
2275                                                 &connection_poll,
2276                                                 fc);
2277 }
2278
2279
2280 /**
2281  * @brief Stop polling a connection for ACKs.
2282  *
2283  * Once we have enough ACKs for future traffic, polls are no longer necessary.
2284  *
2285  * @param c Connection.
2286  * @param fwd Should we stop the poll in the FWD direction?
2287  */
2288 void
2289 GMC_stop_poll (struct MeshConnection *c, int fwd)
2290 {
2291   struct MeshFlowControl *fc;
2292
2293   fc = fwd ? &c->fwd_fc : &c->bck_fc;
2294   if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
2295   {
2296     GNUNET_SCHEDULER_cancel (fc->poll_task);
2297     fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
2298   }
2299 }