fix issue with sending BROKEN timeouts early due to wrong timeout calculation
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet_core.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file cadet/gnunet-service-cadet_core.c
23  * @brief cadet service; interaction with CORE service
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
28  *
29  * TODO:
30  * - Optimization: given BROKEN messages, destroy paths (?)
31  */
32 #include "platform.h"
33 #include "gnunet-service-cadet_core.h"
34 #include "gnunet-service-cadet_paths.h"
35 #include "gnunet-service-cadet_peer.h"
36 #include "gnunet-service-cadet_connection.h"
37 #include "gnunet-service-cadet_tunnels.h"
38 #include "gnunet_core_service.h"
39 #include "gnunet_statistics_service.h"
40 #include "cadet_protocol.h"
41
42
43 #define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
44
45 /**
46  * Information we keep per direction for a route.
47  */
48 struct RouteDirection;
49
50
51 /**
52  * Set of CadetRoutes that have exactly the same number of messages
53  * in their buffer.  Used so we can efficiently find all of those
54  * routes that have the current maximum of messages in the buffer (in
55  * case we have to purge).
56  */
57 struct Rung
58 {
59
60   /**
61    * Rung of RouteDirections with one more buffer entry each.
62    */
63   struct Rung *next;
64
65   /**
66    * Rung of RouteDirections with one less buffer entry each.
67    */
68   struct Rung *prev;
69
70   /**
71    * DLL of route directions with a number of buffer entries matching this rung.
72    */
73   struct RouteDirection *rd_head;
74
75   /**
76    * DLL of route directions with a number of buffer entries matching this rung.
77    */
78   struct RouteDirection *rd_tail;
79
80   /**
81    * Total number of route directions in this rung.
82    */
83   unsigned int num_routes;
84
85   /**
86    * Number of messages route directions at this rung have
87    * in their buffer.
88    */
89   unsigned int rung_off;
90 };
91
92
93 /**
94  * Information we keep per direction for a route.
95  */
96 struct RouteDirection
97 {
98
99   /**
100    * DLL of other route directions within the same `struct Rung`.
101    */
102   struct RouteDirection *prev;
103
104   /**
105    * DLL of other route directions within the same `struct Rung`.
106    */
107   struct RouteDirection *next;
108
109   /**
110    * Rung of this route direction (matches length of the buffer DLL).
111    */
112   struct Rung *rung;
113
114   /**
115    * Head of DLL of envelopes we have in the buffer for this direction.
116    */
117   struct GNUNET_MQ_Envelope *env_head;
118
119   /**
120    * Tail of DLL of envelopes we have in the buffer for this direction.
121    */
122   struct GNUNET_MQ_Envelope *env_tail;
123
124   /**
125    * Target peer.
126    */
127   struct CadetPeer *hop;
128
129   /**
130    * Route this direction is part of.
131    */
132   struct CadetRoute *my_route;
133
134   /**
135    * Message queue manager for @e hop.
136    */
137   struct GCP_MessageQueueManager *mqm;
138
139   /**
140    * Is @e mqm currently ready for transmission?
141    */
142   int is_ready;
143
144 };
145
146
147 /**
148  * Description of a segment of a `struct CadetConnection` at the
149  * intermediate peers.  Routes are basically entries in a peer's
150  * routing table for forwarding traffic.  At both endpoints, the
151  * routes are terminated by a `struct CadetConnection`, which knows
152  * the complete `struct CadetPath` that is formed by the individual
153  * routes.
154  */
155 struct CadetRoute
156 {
157
158   /**
159    * Information about the next hop on this route.
160    */
161   struct RouteDirection next;
162
163   /**
164    * Information about the previous hop on this route.
165    */
166   struct RouteDirection prev;
167
168   /**
169    * Unique identifier for the connection that uses this route.
170    */
171   struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
172
173   /**
174    * When was this route last in use?
175    */
176   struct GNUNET_TIME_Absolute last_use;
177
178   /**
179    * Position of this route in the #route_heap.
180    */
181   struct GNUNET_CONTAINER_HeapNode *hn;
182
183   /**
184    * Options for the route, control buffering.
185    */
186   enum GNUNET_CADET_ChannelOption options;
187 };
188
189
190 /**
191  * Handle to the CORE service.
192  */
193 static struct GNUNET_CORE_Handle *core;
194
195 /**
196  * Routes on which this peer is an intermediate.
197  */
198 static struct GNUNET_CONTAINER_MultiShortmap *routes;
199
200 /**
201  * Heap of routes, MIN-sorted by last activity.
202  */
203 static struct GNUNET_CONTAINER_Heap *route_heap;
204
205 /**
206  * Rung zero (always pointed to by #rung_head).
207  */
208 static struct Rung rung_zero;
209
210 /**
211  * DLL of rungs, with the head always point to a rung of
212  * route directions with no messages in the queue.
213  */
214 static struct Rung *rung_head = &rung_zero;
215
216 /**
217  * Tail of the #rung_head DLL.
218  */
219 static struct Rung *rung_tail = &rung_zero;
220
221 /**
222  * Maximum number of concurrent routes this peer will support.
223  */
224 static unsigned long long max_routes;
225
226 /**
227  * Maximum number of envelopes we will buffer at this peer.
228  */
229 static unsigned long long max_buffers;
230
231 /**
232  * Current number of envelopes we have buffered at this peer.
233  */
234 static unsigned long long cur_buffers;
235
236 /**
237  * Task to timeout routes.
238  */
239 static struct GNUNET_SCHEDULER_Task *timeout_task;
240
241
242 /**
243  * Get the route corresponding to a hash.
244  *
245  * @param cid hash generated from the connection identifier
246  */
247 static struct CadetRoute *
248 get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
249 {
250   return GNUNET_CONTAINER_multishortmap_get (routes,
251                                              &cid->connection_of_tunnel);
252 }
253
254
255 /**
256  * Lower the rung in which @a dir is by 1.
257  *
258  * @param dir direction to lower in rung.
259  */
260 static void
261 lower_rung (struct RouteDirection *dir)
262 {
263   struct Rung *rung = dir->rung;
264   struct Rung *prev;
265
266   GNUNET_CONTAINER_DLL_remove (rung->rd_head,
267                                rung->rd_tail,
268                                dir);
269   prev = rung->prev;
270   GNUNET_assert (NULL != prev);
271   if (prev->rung_off != rung->rung_off - 1)
272   {
273     prev = GNUNET_new (struct Rung);
274     prev->rung_off = rung->rung_off - 1;
275     GNUNET_CONTAINER_DLL_insert_after (rung_head,
276                                        rung_tail,
277                                        rung->prev,
278                                        prev);
279   }
280   GNUNET_assert (NULL != prev);
281   GNUNET_CONTAINER_DLL_insert (prev->rd_head,
282                                prev->rd_tail,
283                                dir);
284   dir->rung = prev;
285 }
286
287
288 /**
289  * Discard the buffer @a env from the route direction @a dir and
290  * move @a dir down a rung.
291  *
292  * @param dir direction that contains the @a env in the buffer
293  * @param env envelope to discard
294  */
295 static void
296 discard_buffer (struct RouteDirection *dir,
297                 struct GNUNET_MQ_Envelope *env)
298 {
299   GNUNET_MQ_dll_remove (&dir->env_head,
300                         &dir->env_tail,
301                         env);
302   cur_buffers--;
303   GNUNET_MQ_discard (env);
304   lower_rung (dir);
305   GNUNET_STATISTICS_set (stats,
306                          "# buffer use",
307                          cur_buffers,
308                          GNUNET_NO);
309 }
310
311
312 /**
313  * Discard all messages from the highest rung, to make space.
314  */
315 static void
316 discard_all_from_rung_tail ()
317 {
318   struct Rung *tail = rung_tail;
319   struct RouteDirection *dir;
320
321   while (NULL != (dir = tail->rd_head))
322   {
323     LOG (GNUNET_ERROR_TYPE_DEBUG,
324          "Queue full due new message %s on connection %s, dropping old message\n",
325          GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
326     GNUNET_STATISTICS_update (stats,
327                               "# messages dropped due to full buffer",
328                               1,
329                               GNUNET_NO);
330     discard_buffer (dir,
331                     dir->env_head);
332   }
333   GNUNET_CONTAINER_DLL_remove (rung_head,
334                                rung_tail,
335                                tail);
336   GNUNET_free (tail);
337 }
338
339
340 /**
341  * We message @a msg from @a prev.  Find its route by @a cid and
342  * forward to the next hop.  Drop and signal broken route if we do not
343  * have a route.
344  *
345  * @param prev previous hop (sender)
346  * @param cid connection identifier, tells us which route to use
347  * @param msg the message to forward
348  */
349 static void
350 route_message (struct CadetPeer *prev,
351                const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
352                const struct GNUNET_MessageHeader *msg)
353 {
354   struct CadetRoute *route;
355   struct RouteDirection *dir;
356   struct Rung *rung;
357   struct Rung *nxt;
358   struct GNUNET_MQ_Envelope *env;
359
360   route = get_route (cid);
361   if (NULL == route)
362   {
363     struct GNUNET_MQ_Envelope *env;
364     struct GNUNET_CADET_ConnectionBrokenMessage *bm;
365
366     LOG (GNUNET_ERROR_TYPE_DEBUG,
367          "Failed to route message of type %u from %s on connection %s: no route\n",
368          ntohs (msg->type),
369          GCP_2s (prev),
370          GNUNET_sh2s (&cid->connection_of_tunnel));
371     switch (ntohs (msg->type))
372     {
373     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
374     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
375       /* No need to respond to these! */
376       return;
377     }
378     env = GNUNET_MQ_msg (bm,
379                          GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
380     bm->cid = *cid;
381     bm->peer1 = my_full_id;
382     GCP_send_ooo (prev,
383                   env);
384     return;
385   }
386   route->last_use = GNUNET_TIME_absolute_get ();
387   GNUNET_CONTAINER_heap_update_cost (route->hn,
388                                      route->last_use.abs_value_us);
389   dir = (prev == route->prev.hop) ? &route->next : &route->prev;
390   if (GNUNET_YES == dir->is_ready)
391   {
392     LOG (GNUNET_ERROR_TYPE_DEBUG,
393          "Routing message of type %u from %s to %s on connection %s\n",
394          ntohs (msg->type),
395          GCP_2s (prev),
396          GNUNET_i2s (GCP_get_id (dir->hop)),
397          GNUNET_sh2s (&cid->connection_of_tunnel));
398     dir->is_ready = GNUNET_NO;
399     GCP_send (dir->mqm,
400               GNUNET_MQ_msg_copy (msg));
401     return;
402   }
403   /* Check if buffering is disallowed, and if so, make sure we only queue
404      one message per direction. */
405   if ( (0 != (route->options & GNUNET_CADET_OPTION_NOBUFFER)) &&
406        (NULL != dir->env_head) )
407     discard_buffer (dir,
408                     dir->env_head);
409   rung = dir->rung;
410   if (cur_buffers == max_buffers)
411   {
412     /* Need to make room. */
413     if (NULL != rung->next)
414     {
415       /* Easy case, drop messages from route directions in highest rung */
416       discard_all_from_rung_tail ();
417     }
418     else
419     {
420       /* We are in the highest rung, drop our own! */
421       LOG (GNUNET_ERROR_TYPE_DEBUG,
422            "Queue full due new message %s on connection %s, dropping old message\n",
423            GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
424       GNUNET_STATISTICS_update (stats,
425                                 "# messages dropped due to full buffer",
426                                 1,
427                                 GNUNET_NO);
428       discard_buffer (dir,
429                       dir->env_head);
430       rung = dir->rung;
431     }
432   }
433   /* remove 'dir' from current rung */
434   GNUNET_CONTAINER_DLL_remove (rung->rd_head,
435                                rung->rd_tail,
436                                dir);
437   /* make 'nxt' point to the next higher rung, creat if necessary */
438   nxt = rung->next;
439   if ( (NULL == nxt) ||
440        (rung->rung_off + 1 != nxt->rung_off) )
441   {
442     nxt = GNUNET_new (struct Rung);
443     nxt->rung_off = rung->rung_off + 1;
444     GNUNET_CONTAINER_DLL_insert_after (rung_head,
445                                        rung_tail,
446                                        rung,
447                                        nxt);
448   }
449   /* insert 'dir' into next higher rung */
450   GNUNET_CONTAINER_DLL_insert (nxt->rd_head,
451                                nxt->rd_tail,
452                                dir);
453   dir->rung = nxt;
454
455   /* add message into 'dir' buffer */
456   LOG (GNUNET_ERROR_TYPE_DEBUG,
457        "Queueing new message of type %u from %s to %s on connection %s\n",
458        ntohs (msg->type),
459        GCP_2s (prev),
460        GNUNET_i2s (GCP_get_id (dir->hop)),
461        GNUNET_sh2s (&cid->connection_of_tunnel));
462   env = GNUNET_MQ_msg_copy (msg);
463   GNUNET_MQ_dll_insert_tail (&dir->env_head,
464                              &dir->env_tail,
465                              env);
466   cur_buffers++;
467   GNUNET_STATISTICS_set (stats,
468                          "# buffer use",
469                          cur_buffers,
470                          GNUNET_NO);
471   /* Clean up 'rung' if now empty (and not head) */
472   if ( (NULL == rung->rd_head) &&
473        (rung != rung_head) )
474   {
475     GNUNET_CONTAINER_DLL_remove (rung_head,
476                                  rung_tail,
477                                  rung);
478     GNUNET_free (rung);
479   }
480 }
481
482
483 /**
484  * Check if the create_connection message has the appropriate size.
485  *
486  * @param cls Closure (unused).
487  * @param msg Message to check.
488  *
489  * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
490  */
491 static int
492 check_connection_create (void *cls,
493                          const struct GNUNET_CADET_ConnectionCreateMessage *msg)
494 {
495   uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
496
497   if (0 != (size % sizeof (struct GNUNET_PeerIdentity)))
498   {
499     GNUNET_break_op (0);
500     return GNUNET_NO;
501   }
502   return GNUNET_YES;
503 }
504
505
506 /**
507  * Free internal data of a route direction.
508  *
509  * @param dir direction to destroy (do NOT free memory of 'dir' itself)
510  */
511 static void
512 destroy_direction (struct RouteDirection *dir)
513 {
514   struct GNUNET_MQ_Envelope *env;
515
516   while (NULL != (env = dir->env_head))
517   {
518     GNUNET_STATISTICS_update (stats,
519                               "# messages dropped due to route destruction",
520                               1,
521                               GNUNET_NO);
522     discard_buffer (dir,
523                     env);
524   }
525   if (NULL != dir->mqm)
526   {
527     GCP_request_mq_cancel (dir->mqm,
528                            NULL);
529     dir->mqm = NULL;
530   }
531   GNUNET_CONTAINER_DLL_remove (rung_head->rd_head,
532                                rung_head->rd_tail,
533                                dir);
534 }
535
536
537 /**
538  * Destroy our state for @a route.
539  *
540  * @param route route to destroy
541  */
542 static void
543 destroy_route (struct CadetRoute *route)
544 {
545   LOG (GNUNET_ERROR_TYPE_DEBUG,
546        "Destroying route from %s to %s of connection %s\n",
547        GNUNET_i2s  (GCP_get_id (route->prev.hop)),
548        GNUNET_i2s2 (GCP_get_id (route->next.hop)),
549        GNUNET_sh2s (&route->cid.connection_of_tunnel));
550   GNUNET_assert (route ==
551                  GNUNET_CONTAINER_heap_remove_node (route->hn));
552   GNUNET_assert (GNUNET_YES ==
553                  GNUNET_CONTAINER_multishortmap_remove (routes,
554                                                         &route->cid.connection_of_tunnel,
555                                                         route));
556   GNUNET_STATISTICS_set (stats,
557                          "# routes",
558                          GNUNET_CONTAINER_multishortmap_size (routes),
559                          GNUNET_NO);
560   destroy_direction (&route->prev);
561   destroy_direction (&route->next);
562   GNUNET_free (route);
563 }
564
565
566 /**
567  * Send message that a route is broken between @a peer1 and @a peer2.
568  *
569  * @param target where to send the message
570  * @param cid connection identifier to use
571  * @param peer1 one of the peers where a link is broken
572  * @param peer2 another one of the peers where a link is broken
573  */
574 static void
575 send_broken (struct RouteDirection *target,
576              const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
577              const struct GNUNET_PeerIdentity *peer1,
578              const struct GNUNET_PeerIdentity *peer2)
579 {
580   struct GNUNET_MQ_Envelope *env;
581   struct GNUNET_CADET_ConnectionBrokenMessage *bm;
582
583   if (NULL == target->mqm)
584     return; /* Can't send notification, connection is down! */
585   LOG (GNUNET_ERROR_TYPE_DEBUG,
586        "Notifying %s about BROKEN route at %s-%s of connection %s\n",
587        GCP_2s (target->hop),
588        GNUNET_i2s (peer1),
589        GNUNET_i2s2 (peer2),
590        GNUNET_sh2s (&cid->connection_of_tunnel));
591
592   env = GNUNET_MQ_msg (bm,
593                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
594   bm->cid = *cid;
595   if (NULL != peer1)
596     bm->peer1 = *peer1;
597   if (NULL != peer2)
598     bm->peer2 = *peer2;
599   GCP_request_mq_cancel (target->mqm,
600                          env);
601   target->mqm = NULL;
602 }
603
604
605 /**
606  * Function called to check if any routes have timed out, and if
607  * so, to clean them up.  Finally, schedules itself again at the
608  * earliest time where there might be more work.
609  *
610  * @param cls NULL
611  */
612 static void
613 timeout_cb (void *cls)
614 {
615   struct CadetRoute *r;
616   struct GNUNET_TIME_Relative linger;
617   struct GNUNET_TIME_Absolute exp;
618
619   timeout_task = NULL;
620   linger = GNUNET_TIME_relative_multiply (keepalive_period,
621                                           3);
622   while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
623   {
624     exp = GNUNET_TIME_absolute_add (r->last_use,
625                                     linger);
626     if (0 != GNUNET_TIME_absolute_get_remaining (exp).rel_value_us)
627     {
628       /* Route not yet timed out, wait until it does. */
629       timeout_task = GNUNET_SCHEDULER_add_at (exp,
630                                               &timeout_cb,
631                                               NULL);
632       return;
633     }
634     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
635                 "Sending BROKEN due to timeout (%s was last use, %s linger)\n",
636                 GNUNET_STRINGS_absolute_time_to_string (r->last_use),
637                 GNUNET_STRINGS_relative_time_to_string (linger,
638                                                         GNUNET_YES));
639     send_broken (&r->prev,
640                  &r->cid,
641                  NULL,
642                  NULL);
643     send_broken (&r->next,
644                  &r->cid,
645                  NULL,
646                  NULL);
647     destroy_route (r);
648   }
649   /* No more routes left, so no need for a #timeout_task */
650 }
651
652
653 /**
654  * Function called when the message queue to the previous hop
655  * becomes available/unavailable.  We expect this function to
656  * be called immediately when we register, and then again
657  * later if the connection ever goes down.
658  *
659  * @param cls the `struct RouteDirection`
660  * @param available #GNUNET_YES if sending is now possible,
661  *                  #GNUNET_NO if sending is no longer possible
662  *                  #GNUNET_SYSERR if sending is no longer possible
663  *                                 and the last envelope was discarded
664  */
665 static void
666 dir_ready_cb (void *cls,
667               int ready)
668 {
669   struct RouteDirection *dir = cls;
670   struct CadetRoute *route = dir->my_route;
671   struct RouteDirection *odir;
672
673   if (GNUNET_YES == ready)
674   {
675     struct GNUNET_MQ_Envelope *env;
676
677     dir->is_ready = GNUNET_YES;
678     if (NULL != (env = dir->env_head))
679     {
680       GNUNET_MQ_dll_remove (&dir->env_head,
681                             &dir->env_tail,
682                             env);
683       cur_buffers--;
684       GNUNET_STATISTICS_set (stats,
685                              "# buffer use",
686                              cur_buffers,
687                              GNUNET_NO);
688       lower_rung (dir);
689       dir->is_ready = GNUNET_NO;
690       GCP_send (dir->mqm,
691                 env);
692     }
693     return;
694   }
695   odir = (dir == &route->next) ? &route->prev : &route->next;
696   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
697               "Sending BROKEN due to MQ going down\n");
698   send_broken (&route->next,
699                &route->cid,
700                GCP_get_id (odir->hop),
701                &my_full_id);
702   destroy_route (route);
703 }
704
705
706 /**
707  * Initialize one of the directions of a route.
708  *
709  * @param route route the direction belongs to
710  * @param dir direction to initialize
711  * @param hop next hop on in the @a dir
712  */
713 static void
714 dir_init (struct RouteDirection *dir,
715           struct CadetRoute *route,
716           struct CadetPeer *hop)
717 {
718   dir->hop = hop;
719   dir->my_route = route;
720   dir->mqm = GCP_request_mq (hop,
721                              &dir_ready_cb,
722                              dir);
723   GNUNET_CONTAINER_DLL_insert (rung_head->rd_head,
724                                rung_head->rd_tail,
725                                dir);
726   dir->rung = rung_head;
727   GNUNET_assert (GNUNET_YES == dir->is_ready);
728 }
729
730
731 /**
732  * We could not create the desired route.  Send a
733  * #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
734  * message to @a target.
735  *
736  * @param target who should receive the message
737  * @param cid identifier of the connection/route that failed
738  * @param failure_at neighbour with which we failed to route,
739  *        or NULL.
740  */
741 static void
742 send_broken_without_mqm (struct CadetPeer *target,
743                          const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
744                          const struct GNUNET_PeerIdentity *failure_at)
745 {
746   struct GNUNET_MQ_Envelope *env;
747   struct GNUNET_CADET_ConnectionBrokenMessage *bm;
748
749   env = GNUNET_MQ_msg (bm,
750                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
751   bm->cid = *cid;
752   bm->peer1 = my_full_id;
753   if (NULL != failure_at)
754     bm->peer2 = *failure_at;
755   GCP_send_ooo (target,
756                 env);
757 }
758
759
760 /**
761  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
762  *
763  * @param cls Closure (CadetPeer for neighbor that sent the message).
764  * @param msg Message itself.
765  */
766 static void
767 handle_connection_create (void *cls,
768                           const struct GNUNET_CADET_ConnectionCreateMessage *msg)
769 {
770   struct CadetPeer *sender = cls;
771   struct CadetPeer *next;
772   const struct GNUNET_PeerIdentity *pids = (const struct GNUNET_PeerIdentity *) &msg[1];
773   struct CadetRoute *route;
774   uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
775   unsigned int path_length;
776   unsigned int off;
777   enum GNUNET_CADET_ChannelOption options;
778
779   options = (enum GNUNET_CADET_ChannelOption) ntohl (msg->options);
780   path_length = size / sizeof (struct GNUNET_PeerIdentity);
781   if (0 == path_length)
782   {
783     LOG (GNUNET_ERROR_TYPE_DEBUG,
784       "Dropping CADET_CONNECTION_CREATE with empty path\n");
785     GNUNET_break_op (0);
786     return;
787   }
788   /* Check for loops */
789   struct GNUNET_CONTAINER_MultiPeerMap *map;
790   map = GNUNET_CONTAINER_multipeermap_create (path_length * 2,
791                                               GNUNET_YES);
792   GNUNET_assert (NULL != map);
793   for (off = 0; off < path_length; off++) {
794     if (GNUNET_SYSERR ==
795         GNUNET_CONTAINER_multipeermap_put (map,
796                                            &pids[off],
797                                            NULL,
798                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) {
799       /* bogus request */
800       GNUNET_CONTAINER_multipeermap_destroy (map);
801       LOG (GNUNET_ERROR_TYPE_DEBUG,
802         "Dropping CADET_CONNECTION_CREATE with cyclic path\n");
803       GNUNET_break_op (0);
804       return;
805     }
806   }
807   GNUNET_CONTAINER_multipeermap_destroy (map);
808   /* Initiator is at offset 0. */
809   for (off=1;off<path_length;off++)
810     if (0 == memcmp (&my_full_id,
811                      &pids[off],
812                      sizeof (struct GNUNET_PeerIdentity)))
813       break;
814   if (off == path_length)
815   {
816     LOG (GNUNET_ERROR_TYPE_DEBUG,
817       "Dropping CADET_CONNECTION_CREATE without us in the path\n");
818     GNUNET_break_op (0);
819     return;
820   }
821   /* Check previous hop */
822   if (sender != GCP_get (&pids[off - 1],
823                          GNUNET_NO))
824   {
825     LOG (GNUNET_ERROR_TYPE_DEBUG,
826       "Dropping CADET_CONNECTION_CREATE without sender in the path\n");
827     GNUNET_break_op (0);
828     return;
829   }
830   if (NULL !=
831       get_route (&msg->cid))
832   {
833     /* Duplicate CREATE, pass it on, previous one might have been lost! */
834     LOG (GNUNET_ERROR_TYPE_DEBUG,
835          "Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
836          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
837     route_message (sender,
838                    &msg->cid,
839                    &msg->header);
840     return;
841   }
842   if (off == path_length - 1)
843   {
844     /* We are the destination, create connection */
845     struct CadetConnection *cc;
846     struct CadetPeerPath *path;
847     struct CadetPeer *origin;
848
849     cc = GCC_lookup (&msg->cid);
850     if (NULL != cc)
851     {
852       LOG (GNUNET_ERROR_TYPE_DEBUG,
853            "Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
854            GNUNET_sh2s (&msg->cid.connection_of_tunnel));
855       GCC_handle_duplicate_create (cc);
856       return;
857     }
858
859     origin = GCP_get (&pids[0],
860                       GNUNET_YES);
861     LOG (GNUNET_ERROR_TYPE_DEBUG,
862          "Received CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
863          GCP_2s (origin),
864          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
865     path = GCPP_get_path_from_route (path_length - 1,
866                                      pids);
867     if (GNUNET_OK !=
868         GCT_add_inbound_connection (GCP_get_tunnel (origin,
869                                                     GNUNET_YES),
870                                     &msg->cid,
871                                     (enum GNUNET_CADET_ChannelOption) ntohl (msg->options),
872                                     path))
873     {
874       /* Send back BROKEN: duplicate connection on the same path,
875          we will use the other one. */
876       LOG (GNUNET_ERROR_TYPE_DEBUG,
877            "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
878            GCP_2s (sender),
879            GNUNET_sh2s (&msg->cid.connection_of_tunnel),
880            GCPP_2s (path));
881       send_broken_without_mqm (sender,
882                                &msg->cid,
883                                NULL);
884       return;
885     }
886     return;
887   }
888   /* We are merely a hop on the way, check if we can support the route */
889   next = GCP_get (&pids[off + 1],
890                   GNUNET_NO);
891   if ( (NULL == next) ||
892        (GNUNET_NO == GCP_has_core_connection (next)) )
893   {
894     /* unworkable, send back BROKEN notification */
895     LOG (GNUNET_ERROR_TYPE_DEBUG,
896          "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
897          GCP_2s (sender),
898          GNUNET_sh2s (&msg->cid.connection_of_tunnel),
899          GNUNET_i2s (&pids[off + 1]),
900          off + 1);
901     send_broken_without_mqm (sender,
902                              &msg->cid,
903                              &pids[off + 1]);
904     return;
905   }
906   if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
907   {
908     LOG (GNUNET_ERROR_TYPE_DEBUG,
909          "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
910          GCP_2s (sender),
911          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
912     send_broken_without_mqm (sender,
913                              &msg->cid,
914                              &pids[off - 1]);
915     return;
916   }
917
918   /* Workable route, create routing entry */
919   LOG (GNUNET_ERROR_TYPE_DEBUG,
920        "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
921        GCP_2s (sender),
922        GNUNET_sh2s (&msg->cid.connection_of_tunnel),
923        GNUNET_i2s (&pids[off + 1]),
924        off + 1);
925   route = GNUNET_new (struct CadetRoute);
926   route->options = options;
927   route->cid = msg->cid;
928   route->last_use = GNUNET_TIME_absolute_get ();
929   dir_init (&route->prev,
930             route,
931             sender);
932   dir_init (&route->next,
933             route,
934             next);
935   GNUNET_assert (GNUNET_OK ==
936                  GNUNET_CONTAINER_multishortmap_put (routes,
937                                                      &route->cid.connection_of_tunnel,
938                                                      route,
939                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
940   GNUNET_STATISTICS_set (stats,
941                          "# routes",
942                          GNUNET_CONTAINER_multishortmap_size (routes),
943                          GNUNET_NO);
944   route->hn = GNUNET_CONTAINER_heap_insert (route_heap,
945                                             route,
946                                             route->last_use.abs_value_us);
947   if (NULL == timeout_task)
948     timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
949                                                                                 3),
950                                                  &timeout_cb,
951                                                  NULL);
952 }
953
954
955 /**
956  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK
957  *
958  * @param cls Closure (CadetPeer for neighbor that sent the message).
959  * @param msg Message itself.
960  */
961 static void
962 handle_connection_create_ack (void *cls,
963                               const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
964 {
965   struct CadetPeer *peer = cls;
966   struct CadetConnection *cc;
967
968   /* First, check if ACK belongs to a connection that ends here. */
969   cc = GCC_lookup (&msg->cid);
970   if (NULL != cc)
971   {
972     /* verify ACK came from the right direction */
973     struct CadetPeerPath *path = GCC_get_path (cc);
974
975     if (peer !=
976         GCPP_get_peer_at_offset (path,
977                                  0))
978     {
979       /* received ACK from unexpected direction, ignore! */
980       GNUNET_break_op (0);
981       return;
982     }
983     LOG (GNUNET_ERROR_TYPE_DEBUG,
984          "Received CONNECTION_CREATE_ACK for connection %s.\n",
985          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
986     GCC_handle_connection_create_ack (cc);
987     return;
988   }
989
990   /* We're just an intermediary peer, route the message along its path */
991   route_message (peer,
992                  &msg->cid,
993                  &msg->header);
994 }
995
996
997 /**
998  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
999  *
1000  * @param cls Closure (CadetPeer for neighbor that sent the message).
1001  * @param msg Message itself.
1002  * @deprecated duplicate logic with #handle_destroy(); dedup!
1003  */
1004 static void
1005 handle_connection_broken (void *cls,
1006                           const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
1007 {
1008   struct CadetPeer *peer = cls;
1009   struct CadetConnection *cc;
1010   struct CadetRoute *route;
1011
1012   /* First, check if message belongs to a connection that ends here. */
1013   cc = GCC_lookup (&msg->cid);
1014   if (NULL != cc)
1015   {
1016     /* verify message came from the right direction */
1017     struct CadetPeerPath *path = GCC_get_path (cc);
1018
1019     if (peer !=
1020         GCPP_get_peer_at_offset (path,
1021                                  0))
1022     {
1023       /* received message from unexpected direction, ignore! */
1024       GNUNET_break_op (0);
1025       return;
1026     }
1027     LOG (GNUNET_ERROR_TYPE_DEBUG,
1028          "Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
1029          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1030     GCC_destroy_without_core (cc);
1031
1032     /* FIXME: also destroy the path up to the specified link! */
1033     return;
1034   }
1035
1036   /* We're just an intermediary peer, route the message along its path */
1037   route_message (peer,
1038                  &msg->cid,
1039                  &msg->header);
1040   route = get_route (&msg->cid);
1041   if (NULL != route)
1042     destroy_route (route);
1043   /* FIXME: also destroy paths we MAY have up to the specified link! */
1044 }
1045
1046
1047 /**
1048  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
1049  *
1050  * @param cls Closure (CadetPeer for neighbor that sent the message).
1051  * @param msg Message itself.
1052  */
1053 static void
1054 handle_connection_destroy (void *cls,
1055                            const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
1056 {
1057   struct CadetPeer *peer = cls;
1058   struct CadetConnection *cc;
1059   struct CadetRoute *route;
1060
1061   /* First, check if message belongs to a connection that ends here. */
1062   cc = GCC_lookup (&msg->cid);
1063   if (NULL != cc)
1064   {
1065     /* verify message came from the right direction */
1066     struct CadetPeerPath *path = GCC_get_path (cc);
1067
1068     if (peer !=
1069         GCPP_get_peer_at_offset (path,
1070                                  0))
1071     {
1072       /* received message from unexpected direction, ignore! */
1073       GNUNET_break_op (0);
1074       return;
1075     }
1076     LOG (GNUNET_ERROR_TYPE_DEBUG,
1077          "Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
1078          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1079
1080     GCC_destroy_without_core (cc);
1081     return;
1082   }
1083
1084   /* We're just an intermediary peer, route the message along its path */
1085   LOG (GNUNET_ERROR_TYPE_DEBUG,
1086        "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
1087        GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1088   route_message (peer,
1089                  &msg->cid,
1090                  &msg->header);
1091   route = get_route (&msg->cid);
1092   if (NULL != route)
1093     destroy_route (route);
1094 }
1095
1096
1097 /**
1098  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
1099  *
1100  * @param cls Closure (CadetPeer for neighbor that sent the message).
1101  * @param msg Message itself.
1102  */
1103 static void
1104 handle_tunnel_kx (void *cls,
1105                   const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
1106 {
1107   struct CadetPeer *peer = cls;
1108   struct CadetConnection *cc;
1109
1110   /* First, check if message belongs to a connection that ends here. */
1111   cc = GCC_lookup (&msg->cid);
1112   if (NULL != cc)
1113   {
1114     /* verify message came from the right direction */
1115     struct CadetPeerPath *path = GCC_get_path (cc);
1116
1117     if (peer !=
1118         GCPP_get_peer_at_offset (path,
1119                                  0))
1120     {
1121       /* received message from unexpected direction, ignore! */
1122       GNUNET_break_op (0);
1123       return;
1124     }
1125     GCC_handle_kx (cc,
1126                    msg);
1127     return;
1128   }
1129
1130   /* We're just an intermediary peer, route the message along its path */
1131   route_message (peer,
1132                  &msg->cid,
1133                  &msg->header);
1134 }
1135
1136
1137 /**
1138  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH
1139  *
1140  * @param cls Closure (CadetPeer for neighbor that sent the message).
1141  * @param msg Message itself.
1142  */
1143 static void
1144 handle_tunnel_kx_auth (void *cls,
1145                        const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
1146 {
1147   struct CadetPeer *peer = cls;
1148   struct CadetConnection *cc;
1149
1150   /* First, check if message belongs to a connection that ends here. */
1151   cc = GCC_lookup (&msg->kx.cid);
1152   if (NULL != cc)
1153   {
1154     /* verify message came from the right direction */
1155     struct CadetPeerPath *path = GCC_get_path (cc);
1156
1157     if (peer !=
1158         GCPP_get_peer_at_offset (path,
1159                                  0))
1160     {
1161       /* received message from unexpected direction, ignore! */
1162       GNUNET_break_op (0);
1163       return;
1164     }
1165     GCC_handle_kx_auth (cc,
1166                         msg);
1167     return;
1168   }
1169
1170   /* We're just an intermediary peer, route the message along its path */
1171   route_message (peer,
1172                  &msg->kx.cid,
1173                  &msg->kx.header);
1174 }
1175
1176
1177 /**
1178  * Check if the encrypted message has the appropriate size.
1179  *
1180  * @param cls Closure (unused).
1181  * @param msg Message to check.
1182  *
1183  * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
1184  */
1185 static int
1186 check_tunnel_encrypted (void *cls,
1187                         const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
1188 {
1189   return GNUNET_YES;
1190 }
1191
1192
1193 /**
1194  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED.
1195  *
1196  * @param cls Closure (CadetPeer for neighbor that sent the message).
1197  * @param msg Message itself.
1198  */
1199 static void
1200 handle_tunnel_encrypted (void *cls,
1201                          const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
1202 {
1203   struct CadetPeer *peer = cls;
1204   struct CadetConnection *cc;
1205
1206   /* First, check if message belongs to a connection that ends here. */
1207   cc = GCC_lookup (&msg->cid);
1208   if (NULL != cc)
1209   {
1210     /* verify message came from the right direction */
1211     struct CadetPeerPath *path = GCC_get_path (cc);
1212
1213     if (peer !=
1214         GCPP_get_peer_at_offset (path,
1215                                  0))
1216     {
1217       /* received message from unexpected direction, ignore! */
1218       GNUNET_break_op (0);
1219       return;
1220     }
1221     GCC_handle_encrypted (cc,
1222                           msg);
1223     return;
1224   }
1225   /* We're just an intermediary peer, route the message along its path */
1226   route_message (peer,
1227                  &msg->cid,
1228                  &msg->header);
1229 }
1230
1231
1232 /**
1233  * Function called after #GNUNET_CORE_connect has succeeded (or failed
1234  * for good).  Note that the private key of the peer is intentionally
1235  * not exposed here; if you need it, your process should try to read
1236  * the private key file directly (which should work if you are
1237  * authorized...).  Implementations of this function must not call
1238  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
1239  * do this later).
1240  *
1241  * @param cls closure
1242  * @param my_identity ID of this peer, NULL if we failed
1243  */
1244 static void
1245 core_init_cb (void *cls,
1246               const struct GNUNET_PeerIdentity *my_identity)
1247 {
1248   if (NULL == my_identity)
1249   {
1250     GNUNET_break (0);
1251     return;
1252   }
1253   GNUNET_break (0 ==
1254                 memcmp (my_identity,
1255                         &my_full_id,
1256                         sizeof (struct GNUNET_PeerIdentity)));
1257 }
1258
1259
1260 /**
1261  * Method called whenever a given peer connects.
1262  *
1263  * @param cls closure
1264  * @param peer peer identity this notification is about
1265  */
1266 static void *
1267 core_connect_cb (void *cls,
1268                  const struct GNUNET_PeerIdentity *peer,
1269                  struct GNUNET_MQ_Handle *mq)
1270 {
1271   struct CadetPeer *cp;
1272
1273   LOG (GNUNET_ERROR_TYPE_DEBUG,
1274        "CORE connection to peer %s was established.\n",
1275        GNUNET_i2s (peer));
1276   cp = GCP_get (peer,
1277                 GNUNET_YES);
1278   GCP_set_mq (cp,
1279               mq);
1280   return cp;
1281 }
1282
1283
1284 /**
1285  * Method called whenever a peer disconnects.
1286  *
1287  * @param cls closure
1288  * @param peer peer identity this notification is about
1289  */
1290 static void
1291 core_disconnect_cb (void *cls,
1292                     const struct GNUNET_PeerIdentity *peer,
1293                     void *peer_cls)
1294 {
1295   struct CadetPeer *cp = peer_cls;
1296
1297   LOG (GNUNET_ERROR_TYPE_DEBUG,
1298        "CORE connection to peer %s went down.\n",
1299        GNUNET_i2s (peer));
1300   GCP_set_mq (cp,
1301               NULL);
1302 }
1303
1304
1305 /**
1306  * Initialize the CORE subsystem.
1307  *
1308  * @param c Configuration.
1309  */
1310 void
1311 GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
1312 {
1313   struct GNUNET_MQ_MessageHandler handlers[] = {
1314     GNUNET_MQ_hd_var_size (connection_create,
1315                            GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
1316                            struct GNUNET_CADET_ConnectionCreateMessage,
1317                            NULL),
1318     GNUNET_MQ_hd_fixed_size (connection_create_ack,
1319                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
1320                              struct GNUNET_CADET_ConnectionCreateAckMessage,
1321                              NULL),
1322     GNUNET_MQ_hd_fixed_size (connection_broken,
1323                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1324                              struct GNUNET_CADET_ConnectionBrokenMessage,
1325                              NULL),
1326     GNUNET_MQ_hd_fixed_size (connection_destroy,
1327                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
1328                              struct GNUNET_CADET_ConnectionDestroyMessage,
1329                              NULL),
1330     GNUNET_MQ_hd_fixed_size (tunnel_kx,
1331                              GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
1332                              struct GNUNET_CADET_TunnelKeyExchangeMessage,
1333                              NULL),
1334     GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
1335                              GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
1336                              struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
1337                              NULL),
1338     GNUNET_MQ_hd_var_size (tunnel_encrypted,
1339                            GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
1340                            struct GNUNET_CADET_TunnelEncryptedMessage,
1341                            NULL),
1342     GNUNET_MQ_handler_end ()
1343   };
1344
1345   if (GNUNET_OK !=
1346       GNUNET_CONFIGURATION_get_value_number (c,
1347                                              "CADET",
1348                                              "MAX_ROUTES",
1349                                              &max_routes))
1350     max_routes = 5000;
1351   if (GNUNET_OK !=
1352       GNUNET_CONFIGURATION_get_value_number (c,
1353                                              "CADET",
1354                                              "MAX_MSGS_QUEUE",
1355                                              &max_buffers))
1356     max_buffers = 10000;
1357   routes = GNUNET_CONTAINER_multishortmap_create (1024,
1358                                                   GNUNET_NO);
1359   route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1360   core = GNUNET_CORE_connect (c,
1361                               NULL,
1362                               &core_init_cb,
1363                               &core_connect_cb,
1364                               &core_disconnect_cb,
1365                               handlers);
1366 }
1367
1368
1369 /**
1370  * Shut down the CORE subsystem.
1371  */
1372 void
1373 GCO_shutdown ()
1374 {
1375   if (NULL != core)
1376   {
1377     GNUNET_CORE_disconnect (core);
1378     core = NULL;
1379   }
1380   GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
1381   GNUNET_CONTAINER_multishortmap_destroy (routes);
1382   routes = NULL;
1383   GNUNET_CONTAINER_heap_destroy (route_heap);
1384   route_heap = NULL;
1385   if (NULL != timeout_task)
1386   {
1387     GNUNET_SCHEDULER_cancel (timeout_task);
1388     timeout_task = NULL;
1389   }
1390 }
1391
1392 /* end of gnunet-cadet-service_core.c */