fair, global message buffer implemented
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_core.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file cadet/gnunet-service-cadet_core.c
23  * @brief cadet service; interaction with CORE service
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
28  *
29  * TODO:
30  * - do NOT use buffering if the route options say no buffer!
31  * - Optimization: given BROKEN messages, destroy paths (?)
32  */
33 #include "platform.h"
34 #include "gnunet-service-cadet-new_core.h"
35 #include "gnunet-service-cadet-new_paths.h"
36 #include "gnunet-service-cadet-new_peer.h"
37 #include "gnunet-service-cadet-new_connection.h"
38 #include "gnunet-service-cadet-new_tunnels.h"
39 #include "gnunet_core_service.h"
40 #include "gnunet_statistics_service.h"
41 #include "cadet_protocol.h"
42
43
44 #define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
45
46 /**
47  * Information we keep per direction for a route.
48  */
49 struct RouteDirection;
50
51
52 /**
53  * Set of CadetRoutes that have exactly the same number of messages
54  * in their buffer.  Used so we can efficiently find all of those
55  * routes that have the current maximum of messages in the buffer (in
56  * case we have to purge).
57  */
58 struct Rung
59 {
60
61   /**
62    * Rung of RouteDirections with one more buffer entry each.
63    */
64   struct Rung *next;
65
66   /**
67    * Rung of RouteDirections with one less buffer entry each.
68    */
69   struct Rung *prev;
70
71   /**
72    * DLL of route directions with a number of buffer entries matching this rung.
73    */
74   struct RouteDirection *rd_head;
75
76   /**
77    * DLL of route directions with a number of buffer entries matching this rung.
78    */
79   struct RouteDirection *rd_tail;
80
81   /**
82    * Total number of route directions in this rung.
83    */
84   unsigned int num_routes;
85
86   /**
87    * Number of messages route directions at this rung have
88    * in their buffer.
89    */
90   unsigned int rung_off;
91 };
92
93
94 /**
95  * Information we keep per direction for a route.
96  */
97 struct RouteDirection
98 {
99
100   /**
101    * DLL of other route directions within the same `struct Rung`.
102    */
103   struct RouteDirection *prev;
104
105   /**
106    * DLL of other route directions within the same `struct Rung`.
107    */
108   struct RouteDirection *next;
109
110   /**
111    * Rung of this route direction (matches length of the buffer DLL).
112    */
113   struct Rung *rung;
114
115   /**
116    * Head of DLL of envelopes we have in the buffer for this direction.
117    */
118   struct GNUNET_MQ_Envelope *env_head;
119
120   /**
121    * Tail of DLL of envelopes we have in the buffer for this direction.
122    */
123   struct GNUNET_MQ_Envelope *env_tail;
124
125   /**
126    * Target peer.
127    */
128   struct CadetPeer *hop;
129
130   /**
131    * Route this direction is part of.
132    */
133   struct CadetRoute *my_route;
134
135   /**
136    * Message queue manager for @e hop.
137    */
138   struct GCP_MessageQueueManager *mqm;
139
140   /**
141    * Is @e mqm currently ready for transmission?
142    */
143   int is_ready;
144
145 };
146
147
148 /**
149  * Description of a segment of a `struct CadetConnection` at the
150  * intermediate peers.  Routes are basically entries in a peer's
151  * routing table for forwarding traffic.  At both endpoints, the
152  * routes are terminated by a `struct CadetConnection`, which knows
153  * the complete `struct CadetPath` that is formed by the individual
154  * routes.
155  */
156 struct CadetRoute
157 {
158
159   /**
160    * Information about the next hop on this route.
161    */
162   struct RouteDirection next;
163
164   /**
165    * Information about the previous hop on this route.
166    */
167   struct RouteDirection prev;
168
169   /**
170    * Unique identifier for the connection that uses this route.
171    */
172   struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
173
174   /**
175    * When was this route last in use?
176    */
177   struct GNUNET_TIME_Absolute last_use;
178
179   /**
180    * Position of this route in the #route_heap.
181    */
182   struct GNUNET_CONTAINER_HeapNode *hn;
183
184   /**
185    * Options for the route, control buffering.
186    */
187   enum GNUNET_CADET_ChannelOption options;
188 };
189
190
191 /**
192  * Handle to the CORE service.
193  */
194 static struct GNUNET_CORE_Handle *core;
195
196 /**
197  * Routes on which this peer is an intermediate.
198  */
199 static struct GNUNET_CONTAINER_MultiShortmap *routes;
200
201 /**
202  * Heap of routes, MIN-sorted by last activity.
203  */
204 static struct GNUNET_CONTAINER_Heap *route_heap;
205
206 /**
207  * Rung zero (always pointed to by #rung_head).
208  */
209 static struct Rung rung_zero;
210
211 /**
212  * DLL of rungs, with the head always point to a rung of
213  * route directions with no messages in the queue.
214  */
215 static struct Rung *rung_head = &rung_zero;
216
217 /**
218  * Tail of the #rung_head DLL.
219  */
220 static struct Rung *rung_tail = &rung_zero;
221
222 /**
223  * Maximum number of concurrent routes this peer will support.
224  */
225 static unsigned long long max_routes;
226
227 /**
228  * Maximum number of envelopes we will buffer at this peer.
229  */
230 static unsigned long long max_buffers;
231
232 /**
233  * Current number of envelopes we have buffered at this peer.
234  */
235 static unsigned long long cur_buffers;
236
237 /**
238  * Task to timeout routes.
239  */
240 static struct GNUNET_SCHEDULER_Task *timeout_task;
241
242
243 /**
244  * Get the route corresponding to a hash.
245  *
246  * @param cid hash generated from the connection identifier
247  */
248 static struct CadetRoute *
249 get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
250 {
251   return GNUNET_CONTAINER_multishortmap_get (routes,
252                                              &cid->connection_of_tunnel);
253 }
254
255
256 /**
257  * Lower the rung in which @a dir is by 1.
258  *
259  * @param dir direction to lower in rung.
260  */
261 static void
262 lower_rung (struct RouteDirection *dir)
263 {
264   struct Rung *rung = dir->rung;
265   struct Rung *prev;
266
267   GNUNET_CONTAINER_DLL_remove (rung->rd_head,
268                                rung->rd_tail,
269                                dir);
270   prev = rung->prev;
271   GNUNET_assert (NULL != prev);
272   if (prev->rung_off != rung->rung_off - 1)
273   {
274     prev = GNUNET_new (struct Rung);
275     prev->rung_off = rung->rung_off - 1;
276     GNUNET_CONTAINER_DLL_insert_after (rung_head,
277                                        rung_tail,
278                                        prev,
279                                        rung);
280   }
281   else
282   {
283     rung = prev;
284   }
285   GNUNET_assert (NULL != rung);
286   GNUNET_CONTAINER_DLL_insert (rung->rd_head,
287                                rung->rd_tail,
288                                dir);
289
290 }
291
292
293 /**
294  * Discard the buffer @a env from the route direction @a dir and
295  * move @a dir down a rung.
296  *
297  * @param dir direction that contains the @a env in the buffer
298  * @param env envelope to discard
299  */
300 static void
301 discard_buffer (struct RouteDirection *dir,
302                 struct GNUNET_MQ_Envelope *env)
303 {
304   GNUNET_MQ_dll_remove (&dir->env_head,
305                         &dir->env_tail,
306                         env);
307   cur_buffers--;
308   GNUNET_MQ_discard (env);
309   lower_rung (dir);
310 }
311
312
313 /**
314  * Discard all messages from the highest rung, to make space.
315  */
316 static void
317 discard_all_from_rung_tail ()
318 {
319   struct Rung *tail = rung_tail;
320   struct RouteDirection *dir;
321
322   while (NULL != (dir = tail->rd_head))
323   {
324     LOG (GNUNET_ERROR_TYPE_DEBUG,
325          "Queue full due new message %s on connection %s, dropping old message\n",
326          GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
327     GNUNET_STATISTICS_update (stats,
328                               "# messages dropped due to full buffer",
329                               1,
330                               GNUNET_NO);
331     discard_buffer (dir,
332                     dir->env_head);
333   }
334   GNUNET_CONTAINER_DLL_remove (rung_head,
335                                rung_tail,
336                                tail);
337   GNUNET_free (tail);
338 }
339
340
341 /**
342  * We message @a msg from @a prev.  Find its route by @a cid and
343  * forward to the next hop.  Drop and signal broken route if we do not
344  * have a route.
345  *
346  * @param prev previous hop (sender)
347  * @param cid connection identifier, tells us which route to use
348  * @param msg the message to forward
349  */
350 static void
351 route_message (struct CadetPeer *prev,
352                const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
353                const struct GNUNET_MessageHeader *msg)
354 {
355   struct CadetRoute *route;
356   struct RouteDirection *dir;
357   struct Rung *rung;
358   struct Rung *nxt;
359   struct GNUNET_MQ_Envelope *env;
360
361   route = get_route (cid);
362   if (NULL == route)
363   {
364     struct GNUNET_MQ_Envelope *env;
365     struct GNUNET_CADET_ConnectionBrokenMessage *bm;
366
367     LOG (GNUNET_ERROR_TYPE_DEBUG,
368          "Failed to route message of type %u from %s on connection %s: no route\n",
369          ntohs (msg->type),
370          GCP_2s (prev),
371          GNUNET_sh2s (&cid->connection_of_tunnel));
372     env = GNUNET_MQ_msg (bm,
373                          GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
374     bm->cid = *cid;
375     bm->peer1 = my_full_id;
376     GCP_send_ooo (prev,
377                   env);
378     return;
379   }
380   route->last_use = GNUNET_TIME_absolute_get ();
381   GNUNET_CONTAINER_heap_update_cost (route->hn,
382                                      route->last_use.abs_value_us);
383   dir = (prev == route->prev.hop) ? &route->next : &route->prev;
384   if (GNUNET_YES == dir->is_ready)
385   {
386     LOG (GNUNET_ERROR_TYPE_DEBUG,
387          "Routing message of type %u from %s to %s on connection %s\n",
388          ntohs (msg->type),
389          GCP_2s (prev),
390          GNUNET_i2s (GCP_get_id (dir->hop)),
391          GNUNET_sh2s (&cid->connection_of_tunnel));
392     dir->is_ready = GNUNET_NO;
393     GCP_send (dir->mqm,
394               GNUNET_MQ_msg_copy (msg));
395     return;
396   }
397   rung = dir->rung;
398   if (cur_buffers == max_buffers)
399   {
400     /* Need to make room. */
401     if (NULL != rung->next)
402     {
403       /* Easy case, drop messages from route directions in highest rung */
404       discard_all_from_rung_tail ();
405     }
406     else
407     {
408       /* We are in the highest rung, drop our own! */
409       LOG (GNUNET_ERROR_TYPE_DEBUG,
410            "Queue full due new message %s on connection %s, dropping old message\n",
411            GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
412       GNUNET_STATISTICS_update (stats,
413                                 "# messages dropped due to full buffer",
414                                 1,
415                                 GNUNET_NO);
416       discard_buffer (dir,
417                       dir->env_head);
418       rung = dir->rung;
419     }
420   }
421   /* remove 'dir' from current rung */
422   GNUNET_CONTAINER_DLL_remove (rung->rd_head,
423                                rung->rd_tail,
424                                dir);
425   /* make 'nxt' point to the next higher rung, creat if necessary */
426   nxt = rung->next;
427   if ( (NULL == nxt) ||
428        (rung->rung_off + 1 != nxt->rung_off) )
429   {
430     nxt = GNUNET_new (struct Rung);
431     nxt->rung_off = rung->rung_off + 1;
432     GNUNET_CONTAINER_DLL_insert_after (rung_head,
433                                        rung_tail,
434                                        rung,
435                                        nxt);
436   }
437   /* insert 'dir' into next higher rung */
438   GNUNET_CONTAINER_DLL_insert (nxt->rd_head,
439                                nxt->rd_tail,
440                                dir);
441   /* add message into 'dir' buffer */
442   LOG (GNUNET_ERROR_TYPE_DEBUG,
443        "Queueing new message of type %u from %s to %s on connection %s\n",
444        ntohs (msg->type),
445        GCP_2s (prev),
446        GNUNET_i2s (GCP_get_id (dir->hop)),
447        GNUNET_sh2s (&cid->connection_of_tunnel));
448   env = GNUNET_MQ_msg_copy (msg);
449   GNUNET_MQ_dll_insert_tail (&dir->env_head,
450                              &dir->env_tail,
451                              env);
452   cur_buffers++;
453   /* Clean up 'rung' if now empty (and not head) */
454   if ( (NULL == rung->rd_head) &&
455        (rung != rung_head) )
456   {
457     GNUNET_CONTAINER_DLL_remove (rung_head,
458                                  rung_tail,
459                                  rung);
460     GNUNET_free (rung);
461   }
462 }
463
464
465 /**
466  * Check if the create_connection message has the appropriate size.
467  *
468  * @param cls Closure (unused).
469  * @param msg Message to check.
470  *
471  * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
472  */
473 static int
474 check_connection_create (void *cls,
475                          const struct GNUNET_CADET_ConnectionCreateMessage *msg)
476 {
477   uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
478
479   if (0 != (size % sizeof (struct GNUNET_PeerIdentity)))
480   {
481     GNUNET_break_op (0);
482     return GNUNET_NO;
483   }
484   return GNUNET_YES;
485 }
486
487
488 /**
489  * Free internal data of a route direction.
490  *
491  * @param dir direction to destroy (do NOT free memory of 'dir' itself)
492  */
493 static void
494 destroy_direction (struct RouteDirection *dir)
495 {
496   struct GNUNET_MQ_Envelope *env;
497
498   while (NULL != (env = dir->env_head))
499   {
500     GNUNET_STATISTICS_update (stats,
501                               "# messages dropped due to route destruction",
502                               1,
503                               GNUNET_NO);
504     discard_buffer (dir,
505                     env);
506   }
507   if (NULL != dir->mqm)
508   {
509     GCP_request_mq_cancel (dir->mqm,
510                            NULL);
511     dir->mqm = NULL;
512   }
513   GNUNET_CONTAINER_DLL_remove (rung_head->rd_head,
514                                rung_head->rd_tail,
515                                dir);
516 }
517
518
519 /**
520  * Destroy our state for @a route.
521  *
522  * @param route route to destroy
523  */
524 static void
525 destroy_route (struct CadetRoute *route)
526 {
527   LOG (GNUNET_ERROR_TYPE_DEBUG,
528        "Destroying route from %s to %s of connection %s\n",
529        GNUNET_i2s  (GCP_get_id (route->prev.hop)),
530        GNUNET_i2s2 (GCP_get_id (route->next.hop)),
531        GNUNET_sh2s (&route->cid.connection_of_tunnel));
532   GNUNET_assert (route ==
533                  GNUNET_CONTAINER_heap_remove_node (route->hn));
534   GNUNET_assert (GNUNET_YES ==
535                  GNUNET_CONTAINER_multishortmap_remove (routes,
536                                                         &route->cid.connection_of_tunnel,
537                                                         route));
538   destroy_direction (&route->prev);
539   destroy_direction (&route->next);
540   GNUNET_free (route);
541 }
542
543
544 /**
545  * Send message that a route is broken between @a peer1 and @a peer2.
546  *
547  * @param target where to send the message
548  * @param cid connection identifier to use
549  * @param peer1 one of the peers where a link is broken
550  * @param peer2 another one of the peers where a link is broken
551  */
552 static void
553 send_broken (struct RouteDirection *target,
554              const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
555              const struct GNUNET_PeerIdentity *peer1,
556              const struct GNUNET_PeerIdentity *peer2)
557 {
558   struct GNUNET_MQ_Envelope *env;
559   struct GNUNET_CADET_ConnectionBrokenMessage *bm;
560
561   if (NULL == target->mqm)
562     return; /* Can't send notification, connection is down! */
563   LOG (GNUNET_ERROR_TYPE_DEBUG,
564        "Notifying %s about BROKEN route at %s-%s of connection %s\n",
565        GCP_2s (target->hop),
566        GNUNET_i2s (peer1),
567        GNUNET_i2s2 (peer2),
568        GNUNET_sh2s (&cid->connection_of_tunnel));
569
570   env = GNUNET_MQ_msg (bm,
571                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
572   bm->cid = *cid;
573   if (NULL != peer1)
574     bm->peer1 = *peer1;
575   if (NULL != peer2)
576     bm->peer2 = *peer2;
577
578   GCP_request_mq_cancel (target->mqm,
579                          env);
580   target->mqm = NULL;
581 }
582
583
584 /**
585  * Function called to check if any routes have timed out, and if
586  * so, to clean them up.  Finally, schedules itself again at the
587  * earliest time where there might be more work.
588  *
589  * @param cls NULL
590  */
591 static void
592 timeout_cb (void *cls)
593 {
594   struct CadetRoute *r;
595   struct GNUNET_TIME_Relative linger;
596   struct GNUNET_TIME_Absolute exp;
597
598   timeout_task = NULL;
599   linger = GNUNET_TIME_relative_multiply (keepalive_period,
600                                           3);
601   while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
602   {
603     exp = GNUNET_TIME_absolute_add (r->last_use,
604                                     linger);
605     if (0 != GNUNET_TIME_absolute_get_duration (exp).rel_value_us)
606     {
607       /* Route not yet timed out, wait until it does. */
608       timeout_task = GNUNET_SCHEDULER_add_at (exp,
609                                               &timeout_cb,
610                                               NULL);
611       return;
612     }
613     send_broken (&r->prev,
614                  &r->cid,
615                  NULL,
616                  NULL);
617     send_broken (&r->next,
618                  &r->cid,
619                  NULL,
620                  NULL);
621     destroy_route (r);
622   }
623   /* No more routes left, so no need for a #timeout_task */
624 }
625
626
627 /**
628  * Function called when the message queue to the previous hop
629  * becomes available/unavailable.  We expect this function to
630  * be called immediately when we register, and then again
631  * later if the connection ever goes down.
632  *
633  * @param cls the `struct RouteDirection`
634  * @param available #GNUNET_YES if sending is now possible,
635  *                  #GNUNET_NO if sending is no longer possible
636  *                  #GNUNET_SYSERR if sending is no longer possible
637  *                                 and the last envelope was discarded
638  */
639 static void
640 dir_ready_cb (void *cls,
641               int ready)
642 {
643   struct RouteDirection *dir = cls;
644   struct CadetRoute *route = dir->my_route;
645   struct RouteDirection *odir;
646
647   if (GNUNET_YES == ready)
648   {
649     struct GNUNET_MQ_Envelope *env;
650
651     dir->is_ready = GNUNET_YES;
652     if (NULL != (env = dir->env_head))
653     {
654       GNUNET_MQ_dll_remove (&dir->env_head,
655                             &dir->env_tail,
656                             env);
657       cur_buffers--;
658       lower_rung (dir);
659       dir->is_ready = GNUNET_NO;
660       GCP_send (dir->mqm,
661                 env);
662     }
663     return;
664   }
665   odir = (dir == &route->next) ? &route->prev : &route->next;
666   send_broken (&route->next,
667                &route->cid,
668                GCP_get_id (odir->hop),
669                &my_full_id);
670   destroy_route (route);
671 }
672
673
674 /**
675  * Initialize one of the directions of a route.
676  *
677  * @param route route the direction belongs to
678  * @param dir direction to initialize
679  * @param hop next hop on in the @a dir
680  */
681 static void
682 dir_init (struct RouteDirection *dir,
683           struct CadetRoute *route,
684           struct CadetPeer *hop)
685 {
686   dir->hop = hop;
687   dir->my_route = route;
688   dir->mqm = GCP_request_mq (hop,
689                              &dir_ready_cb,
690                              dir);
691   GNUNET_assert (GNUNET_YES == dir->is_ready);
692 }
693
694
695 /**
696  * We could not create the desired route.  Send a
697  * #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
698  * message to @a target.
699  *
700  * @param target who should receive the message
701  * @param cid identifier of the connection/route that failed
702  * @param failure_at neighbour with which we failed to route,
703  *        or NULL.
704  */
705 static void
706 send_broken_without_mqm (struct CadetPeer *target,
707                          const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
708                          const struct GNUNET_PeerIdentity *failure_at)
709 {
710   struct GNUNET_MQ_Envelope *env;
711   struct GNUNET_CADET_ConnectionBrokenMessage *bm;
712
713   env = GNUNET_MQ_msg (bm,
714                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
715   bm->cid = *cid;
716   bm->peer1 = my_full_id;
717   if (NULL != failure_at)
718     bm->peer2 = *failure_at;
719   GCP_send_ooo (target,
720                 env);
721 }
722
723
724 /**
725  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
726  *
727  * @param cls Closure (CadetPeer for neighbor that sent the message).
728  * @param msg Message itself.
729  */
730 static void
731 handle_connection_create (void *cls,
732                           const struct GNUNET_CADET_ConnectionCreateMessage *msg)
733 {
734   struct CadetPeer *sender = cls;
735   struct CadetPeer *next;
736   const struct GNUNET_PeerIdentity *pids = (const struct GNUNET_PeerIdentity *) &msg[1];
737   struct CadetRoute *route;
738   uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
739   unsigned int path_length;
740   unsigned int off;
741   enum GNUNET_CADET_ChannelOption options;
742
743   options = (enum GNUNET_CADET_ChannelOption) ntohl (msg->options);
744   path_length = size / sizeof (struct GNUNET_PeerIdentity);
745   /* Initiator is at offset 0. */
746   for (off=1;off<path_length;off++)
747     if (0 == memcmp (&my_full_id,
748                      &pids[off],
749                      sizeof (struct GNUNET_PeerIdentity)))
750       break;
751   if (off == path_length)
752   {
753     /* We are not on the path, bogus request */
754     GNUNET_break_op (0);
755     return;
756   }
757   /* Check previous hop */
758   if (sender != GCP_get (&pids[off - 1],
759                          GNUNET_NO))
760   {
761     /* sender is not on the path, not allowed */
762     GNUNET_break_op (0);
763     return;
764   }
765   if (NULL !=
766       get_route (&msg->cid))
767   {
768     /* Duplicate CREATE, pass it on, previous one might have been lost! */
769     LOG (GNUNET_ERROR_TYPE_DEBUG,
770          "Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
771          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
772     route_message (sender,
773                    &msg->cid,
774                    &msg->header);
775     return;
776   }
777   if (off == path_length - 1)
778   {
779     /* We are the destination, create connection */
780     struct CadetConnection *cc;
781     struct CadetPeerPath *path;
782     struct CadetPeer *origin;
783
784     cc = GNUNET_CONTAINER_multishortmap_get (connections,
785                                              &msg->cid.connection_of_tunnel);
786     if (NULL != cc)
787     {
788       LOG (GNUNET_ERROR_TYPE_DEBUG,
789            "Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
790            GNUNET_sh2s (&msg->cid.connection_of_tunnel));
791       GCC_handle_duplicate_create (cc);
792       return;
793     }
794
795     origin = GCP_get (&pids[0],
796                       GNUNET_YES);
797     LOG (GNUNET_ERROR_TYPE_DEBUG,
798          "Received CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
799          GCP_2s (origin),
800          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
801     path = GCPP_get_path_from_route (path_length - 1,
802                                      pids);
803     if (GNUNET_OK !=
804         GCT_add_inbound_connection (GCP_get_tunnel (origin,
805                                                     GNUNET_YES),
806                                     &msg->cid,
807                                     (enum GNUNET_CADET_ChannelOption) ntohl (msg->options),
808                                     path))
809     {
810       /* Send back BROKEN: duplicate connection on the same path,
811          we will use the other one. */
812       LOG (GNUNET_ERROR_TYPE_DEBUG,
813            "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
814            GCP_2s (sender),
815            GNUNET_sh2s (&msg->cid.connection_of_tunnel),
816            GCPP_2s (path));
817       send_broken_without_mqm (sender,
818                                &msg->cid,
819                                NULL);
820       return;
821     }
822     return;
823   }
824   /* We are merely a hop on the way, check if we can support the route */
825   next = GCP_get (&pids[off + 1],
826                   GNUNET_NO);
827   if ( (NULL == next) ||
828        (GNUNET_NO == GCP_has_core_connection (next)) )
829   {
830     /* unworkable, send back BROKEN notification */
831     LOG (GNUNET_ERROR_TYPE_DEBUG,
832          "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
833          GCP_2s (sender),
834          GNUNET_sh2s (&msg->cid.connection_of_tunnel),
835          GNUNET_i2s (&pids[off + 1]),
836          off + 1);
837     send_broken_without_mqm (sender,
838                              &msg->cid,
839                              &pids[off + 1]);
840     return;
841   }
842   if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
843   {
844     LOG (GNUNET_ERROR_TYPE_DEBUG,
845          "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
846          GCP_2s (sender),
847          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
848     send_broken_without_mqm (sender,
849                              &msg->cid,
850                              &pids[off - 1]);
851     return;
852   }
853
854   /* Workable route, create routing entry */
855   LOG (GNUNET_ERROR_TYPE_DEBUG,
856        "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
857        GCP_2s (sender),
858        GNUNET_sh2s (&msg->cid.connection_of_tunnel),
859        GNUNET_i2s (&pids[off + 1]),
860        off + 1);
861   route = GNUNET_new (struct CadetRoute);
862   route->options = options;
863   route->cid = msg->cid;
864   route->last_use = GNUNET_TIME_absolute_get ();
865   dir_init (&route->prev,
866             route,
867             sender);
868   dir_init (&route->next,
869             route,
870             next);
871   GNUNET_assert (GNUNET_OK ==
872                  GNUNET_CONTAINER_multishortmap_put (routes,
873                                                      &route->cid.connection_of_tunnel,
874                                                      route,
875                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
876   route->hn = GNUNET_CONTAINER_heap_insert (route_heap,
877                                             route,
878                                             route->last_use.abs_value_us);
879   if (NULL == timeout_task)
880     timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
881                                                                                 3),
882                                                  &timeout_cb,
883                                                  NULL);
884 }
885
886
887 /**
888  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK
889  *
890  * @param cls Closure (CadetPeer for neighbor that sent the message).
891  * @param msg Message itself.
892  */
893 static void
894 handle_connection_create_ack (void *cls,
895                               const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
896 {
897   struct CadetPeer *peer = cls;
898   struct CadetConnection *cc;
899
900   /* First, check if ACK belongs to a connection that ends here. */
901   cc = GNUNET_CONTAINER_multishortmap_get (connections,
902                                            &msg->cid.connection_of_tunnel);
903   if (NULL != cc)
904   {
905     /* verify ACK came from the right direction */
906     struct CadetPeerPath *path = GCC_get_path (cc);
907
908     if (peer !=
909         GCPP_get_peer_at_offset (path,
910                                  0))
911     {
912       /* received ACK from unexpected direction, ignore! */
913       GNUNET_break_op (0);
914       return;
915     }
916     LOG (GNUNET_ERROR_TYPE_DEBUG,
917          "Received CONNECTION_CREATE_ACK for connection %s.\n",
918          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
919     GCC_handle_connection_create_ack (cc);
920     return;
921   }
922
923   /* We're just an intermediary peer, route the message along its path */
924   route_message (peer,
925                  &msg->cid,
926                  &msg->header);
927 }
928
929
930 /**
931  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
932  *
933  * @param cls Closure (CadetPeer for neighbor that sent the message).
934  * @param msg Message itself.
935  * @deprecated duplicate logic with #handle_destroy(); dedup!
936  */
937 static void
938 handle_connection_broken (void *cls,
939                           const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
940 {
941   struct CadetPeer *peer = cls;
942   struct CadetConnection *cc;
943   struct CadetRoute *route;
944
945   /* First, check if message belongs to a connection that ends here. */
946   cc = GNUNET_CONTAINER_multishortmap_get (connections,
947                                            &msg->cid.connection_of_tunnel);
948   if (NULL != cc)
949   {
950     /* verify message came from the right direction */
951     struct CadetPeerPath *path = GCC_get_path (cc);
952
953     if (peer !=
954         GCPP_get_peer_at_offset (path,
955                                  0))
956     {
957       /* received message from unexpected direction, ignore! */
958       GNUNET_break_op (0);
959       return;
960     }
961     LOG (GNUNET_ERROR_TYPE_DEBUG,
962          "Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
963          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
964     GCC_destroy_without_core (cc);
965
966     /* FIXME: also destroy the path up to the specified link! */
967     return;
968   }
969
970   /* We're just an intermediary peer, route the message along its path */
971   route_message (peer,
972                  &msg->cid,
973                  &msg->header);
974   route = get_route (&msg->cid);
975   if (NULL != route)
976     destroy_route (route);
977   /* FIXME: also destroy paths we MAY have up to the specified link! */
978 }
979
980
981 /**
982  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
983  *
984  * @param cls Closure (CadetPeer for neighbor that sent the message).
985  * @param msg Message itself.
986  */
987 static void
988 handle_connection_destroy (void *cls,
989                            const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
990 {
991   struct CadetPeer *peer = cls;
992   struct CadetConnection *cc;
993   struct CadetRoute *route;
994
995   /* First, check if message belongs to a connection that ends here. */
996   cc = GNUNET_CONTAINER_multishortmap_get (connections,
997                                            &msg->cid.connection_of_tunnel);
998   if (NULL != cc)
999   {
1000     /* verify message came from the right direction */
1001     struct CadetPeerPath *path = GCC_get_path (cc);
1002
1003     if (peer !=
1004         GCPP_get_peer_at_offset (path,
1005                                  0))
1006     {
1007       /* received message from unexpected direction, ignore! */
1008       GNUNET_break_op (0);
1009       return;
1010     }
1011     LOG (GNUNET_ERROR_TYPE_DEBUG,
1012          "Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
1013          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1014
1015     GCC_destroy_without_core (cc);
1016     return;
1017   }
1018
1019   /* We're just an intermediary peer, route the message along its path */
1020   LOG (GNUNET_ERROR_TYPE_DEBUG,
1021        "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
1022        GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1023   route_message (peer,
1024                  &msg->cid,
1025                  &msg->header);
1026   route = get_route (&msg->cid);
1027   if (NULL != route)
1028     destroy_route (route);
1029 }
1030
1031
1032 /**
1033  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
1034  *
1035  * @param cls Closure (CadetPeer for neighbor that sent the message).
1036  * @param msg Message itself.
1037  */
1038 static void
1039 handle_tunnel_kx (void *cls,
1040                   const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
1041 {
1042   struct CadetPeer *peer = cls;
1043   struct CadetConnection *cc;
1044
1045   /* First, check if message belongs to a connection that ends here. */
1046   cc = GNUNET_CONTAINER_multishortmap_get (connections,
1047                                            &msg->cid.connection_of_tunnel);
1048   if (NULL != cc)
1049   {
1050     /* verify message came from the right direction */
1051     struct CadetPeerPath *path = GCC_get_path (cc);
1052
1053     if (peer !=
1054         GCPP_get_peer_at_offset (path,
1055                                  0))
1056     {
1057       /* received message from unexpected direction, ignore! */
1058       GNUNET_break_op (0);
1059       return;
1060     }
1061     GCC_handle_kx (cc,
1062                    msg);
1063     return;
1064   }
1065
1066   /* We're just an intermediary peer, route the message along its path */
1067   route_message (peer,
1068                  &msg->cid,
1069                  &msg->header);
1070 }
1071
1072
1073 /**
1074  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH
1075  *
1076  * @param cls Closure (CadetPeer for neighbor that sent the message).
1077  * @param msg Message itself.
1078  */
1079 static void
1080 handle_tunnel_kx_auth (void *cls,
1081                        const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
1082 {
1083   struct CadetPeer *peer = cls;
1084   struct CadetConnection *cc;
1085
1086   /* First, check if message belongs to a connection that ends here. */
1087   cc = GNUNET_CONTAINER_multishortmap_get (connections,
1088                                            &msg->kx.cid.connection_of_tunnel);
1089   if (NULL != cc)
1090   {
1091     /* verify message came from the right direction */
1092     struct CadetPeerPath *path = GCC_get_path (cc);
1093
1094     if (peer !=
1095         GCPP_get_peer_at_offset (path,
1096                                  0))
1097     {
1098       /* received message from unexpected direction, ignore! */
1099       GNUNET_break_op (0);
1100       return;
1101     }
1102     GCC_handle_kx_auth (cc,
1103                         msg);
1104     return;
1105   }
1106
1107   /* We're just an intermediary peer, route the message along its path */
1108   route_message (peer,
1109                  &msg->kx.cid,
1110                  &msg->kx.header);
1111 }
1112
1113
1114 /**
1115  * Check if the encrypted message has the appropriate size.
1116  *
1117  * @param cls Closure (unused).
1118  * @param msg Message to check.
1119  *
1120  * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
1121  */
1122 static int
1123 check_tunnel_encrypted (void *cls,
1124                         const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
1125 {
1126   return GNUNET_YES;
1127 }
1128
1129
1130 /**
1131  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED.
1132  *
1133  * @param cls Closure (CadetPeer for neighbor that sent the message).
1134  * @param msg Message itself.
1135  */
1136 static void
1137 handle_tunnel_encrypted (void *cls,
1138                          const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
1139 {
1140   struct CadetPeer *peer = cls;
1141   struct CadetConnection *cc;
1142
1143   /* First, check if message belongs to a connection that ends here. */
1144   cc = GNUNET_CONTAINER_multishortmap_get (connections,
1145                                            &msg->cid.connection_of_tunnel);
1146   if (NULL != cc)
1147   {
1148     /* verify message came from the right direction */
1149     struct CadetPeerPath *path = GCC_get_path (cc);
1150
1151     if (peer !=
1152         GCPP_get_peer_at_offset (path,
1153                                  0))
1154     {
1155       /* received message from unexpected direction, ignore! */
1156       GNUNET_break_op (0);
1157       return;
1158     }
1159     GCC_handle_encrypted (cc,
1160                           msg);
1161     return;
1162   }
1163   /* We're just an intermediary peer, route the message along its path */
1164   route_message (peer,
1165                  &msg->cid,
1166                  &msg->header);
1167 }
1168
1169
1170 /**
1171  * Function called after #GNUNET_CORE_connect has succeeded (or failed
1172  * for good).  Note that the private key of the peer is intentionally
1173  * not exposed here; if you need it, your process should try to read
1174  * the private key file directly (which should work if you are
1175  * authorized...).  Implementations of this function must not call
1176  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
1177  * do this later).
1178  *
1179  * @param cls closure
1180  * @param my_identity ID of this peer, NULL if we failed
1181  */
1182 static void
1183 core_init_cb (void *cls,
1184               const struct GNUNET_PeerIdentity *my_identity)
1185 {
1186   if (NULL == my_identity)
1187   {
1188     GNUNET_break (0);
1189     return;
1190   }
1191   GNUNET_break (0 ==
1192                 memcmp (my_identity,
1193                         &my_full_id,
1194                         sizeof (struct GNUNET_PeerIdentity)));
1195 }
1196
1197
1198 /**
1199  * Method called whenever a given peer connects.
1200  *
1201  * @param cls closure
1202  * @param peer peer identity this notification is about
1203  */
1204 static void *
1205 core_connect_cb (void *cls,
1206                  const struct GNUNET_PeerIdentity *peer,
1207                  struct GNUNET_MQ_Handle *mq)
1208 {
1209   struct CadetPeer *cp;
1210
1211   LOG (GNUNET_ERROR_TYPE_DEBUG,
1212        "CORE connection to peer %s was established.\n",
1213        GNUNET_i2s (peer));
1214   cp = GCP_get (peer,
1215                 GNUNET_YES);
1216   GCP_set_mq (cp,
1217               mq);
1218   return cp;
1219 }
1220
1221
1222 /**
1223  * Method called whenever a peer disconnects.
1224  *
1225  * @param cls closure
1226  * @param peer peer identity this notification is about
1227  */
1228 static void
1229 core_disconnect_cb (void *cls,
1230                     const struct GNUNET_PeerIdentity *peer,
1231                     void *peer_cls)
1232 {
1233   struct CadetPeer *cp = peer_cls;
1234
1235   LOG (GNUNET_ERROR_TYPE_DEBUG,
1236        "CORE connection to peer %s went down.\n",
1237        GNUNET_i2s (peer));
1238   GCP_set_mq (cp,
1239               NULL);
1240 }
1241
1242
1243 /**
1244  * Initialize the CORE subsystem.
1245  *
1246  * @param c Configuration.
1247  */
1248 void
1249 GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
1250 {
1251   struct GNUNET_MQ_MessageHandler handlers[] = {
1252     GNUNET_MQ_hd_var_size (connection_create,
1253                            GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
1254                            struct GNUNET_CADET_ConnectionCreateMessage,
1255                            NULL),
1256     GNUNET_MQ_hd_fixed_size (connection_create_ack,
1257                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
1258                              struct GNUNET_CADET_ConnectionCreateAckMessage,
1259                              NULL),
1260     GNUNET_MQ_hd_fixed_size (connection_broken,
1261                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1262                              struct GNUNET_CADET_ConnectionBrokenMessage,
1263                              NULL),
1264     GNUNET_MQ_hd_fixed_size (connection_destroy,
1265                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
1266                              struct GNUNET_CADET_ConnectionDestroyMessage,
1267                              NULL),
1268     GNUNET_MQ_hd_fixed_size (tunnel_kx,
1269                              GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
1270                              struct GNUNET_CADET_TunnelKeyExchangeMessage,
1271                              NULL),
1272     GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
1273                              GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
1274                              struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
1275                              NULL),
1276     GNUNET_MQ_hd_var_size (tunnel_encrypted,
1277                            GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
1278                            struct GNUNET_CADET_TunnelEncryptedMessage,
1279                            NULL),
1280     GNUNET_MQ_handler_end ()
1281   };
1282
1283   if (GNUNET_OK !=
1284       GNUNET_CONFIGURATION_get_value_number (c,
1285                                              "CADET",
1286                                              "MAX_ROUTES",
1287                                              &max_routes))
1288     max_routes = 5000;
1289   if (GNUNET_OK !=
1290       GNUNET_CONFIGURATION_get_value_number (c,
1291                                              "CADET",
1292                                              "MAX_MSGS_QUEUE",
1293                                              &max_buffers))
1294     max_buffers = 10000;
1295   routes = GNUNET_CONTAINER_multishortmap_create (1024,
1296                                                   GNUNET_NO);
1297   route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1298   core = GNUNET_CORE_connect (c,
1299                               NULL,
1300                               &core_init_cb,
1301                               &core_connect_cb,
1302                               &core_disconnect_cb,
1303                               handlers);
1304 }
1305
1306
1307 /**
1308  * Shut down the CORE subsystem.
1309  */
1310 void
1311 GCO_shutdown ()
1312 {
1313   if (NULL != core)
1314   {
1315     GNUNET_CORE_disconnect (core);
1316     core = NULL;
1317   }
1318   GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
1319   GNUNET_CONTAINER_multishortmap_destroy (routes);
1320   routes = NULL;
1321   GNUNET_CONTAINER_heap_destroy (route_heap);
1322   route_heap = NULL;
1323   if (NULL != timeout_task)
1324   {
1325     GNUNET_SCHEDULER_cancel (timeout_task);
1326     timeout_task = NULL;
1327   }
1328 }
1329
1330 /* end of gnunet-cadet-service_core.c */