introduce buffering options on the route level
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_core.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file cadet/gnunet-service-cadet_core.c
23  * @brief cadet service; interaction with CORE service
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
28  *
29  * TODO:
30  * - properly implement GLOBAL message buffer, instead of per-route buffers
31  * - do NOT use buffering if the route options say no buffer!
32  * - Optimization: given BROKEN messages, destroy paths (?)
33  */
34 #include "platform.h"
35 #include "gnunet-service-cadet-new_core.h"
36 #include "gnunet-service-cadet-new_paths.h"
37 #include "gnunet-service-cadet-new_peer.h"
38 #include "gnunet-service-cadet-new_connection.h"
39 #include "gnunet-service-cadet-new_tunnels.h"
40 #include "gnunet_core_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "cadet_protocol.h"
43
44
45 #define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
46
47
48 /**
49  * Number of messages we are willing to buffer per route.
50  */
51 #define ROUTE_BUFFER_SIZE 8
52
53
54 /**
55  * Information we keep per direction for a route.
56  */
57 struct RouteDirection
58 {
59   /**
60    * Target peer.
61    */
62   struct CadetPeer *hop;
63
64   /**
65    * Route this direction is part of.
66    */
67   struct CadetRoute *my_route;
68
69   /**
70    * Message queue manager for @e hop.
71    */
72   struct GCP_MessageQueueManager *mqm;
73
74   /**
75    * Cyclic message buffer to @e hop.
76    */
77   struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
78
79   /**
80    * Next write offset to use to append messages to @e out_buffer.
81    */
82   unsigned int out_wpos;
83
84   /**
85    * Next read offset to use to retrieve messages from @e out_buffer.
86    */
87   unsigned int out_rpos;
88
89   /**
90    * Is @e mqm currently ready for transmission?
91    */
92   int is_ready;
93
94 };
95
96
97 /**
98  * Description of a segment of a `struct CadetConnection` at the
99  * intermediate peers.  Routes are basically entries in a peer's
100  * routing table for forwarding traffic.  At both endpoints, the
101  * routes are terminated by a `struct CadetConnection`, which knows
102  * the complete `struct CadetPath` that is formed by the individual
103  * routes.
104  */
105 struct CadetRoute
106 {
107
108   /**
109    * Information about the next hop on this route.
110    */
111   struct RouteDirection next;
112
113   /**
114    * Information about the previous hop on this route.
115    */
116   struct RouteDirection prev;
117
118   /**
119    * Unique identifier for the connection that uses this route.
120    */
121   struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
122
123   /**
124    * When was this route last in use?
125    */
126   struct GNUNET_TIME_Absolute last_use;
127
128   /**
129    * Position of this route in the #route_heap.
130    */
131   struct GNUNET_CONTAINER_HeapNode *hn;
132
133   /**
134    * Options for the route, control buffering.
135    */
136   enum GNUNET_CADET_ChannelOption options;
137 };
138
139
140 /**
141  * Handle to the CORE service.
142  */
143 static struct GNUNET_CORE_Handle *core;
144
145 /**
146  * Routes on which this peer is an intermediate.
147  */
148 static struct GNUNET_CONTAINER_MultiShortmap *routes;
149
150 /**
151  * Heap of routes, MIN-sorted by last activity.
152  */
153 static struct GNUNET_CONTAINER_Heap *route_heap;
154
155 /**
156  * Maximum number of concurrent routes this peer will support.
157  */
158 static unsigned long long max_routes;
159
160 /**
161  * Task to timeout routes.
162  */
163 static struct GNUNET_SCHEDULER_Task *timeout_task;
164
165
166 /**
167  * Get the route corresponding to a hash.
168  *
169  * @param cid hash generated from the connection identifier
170  */
171 static struct CadetRoute *
172 get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
173 {
174   return GNUNET_CONTAINER_multishortmap_get (routes,
175                                              &cid->connection_of_tunnel);
176 }
177
178
179 /**
180  * We message @a msg from @a prev.  Find its route by @a cid and
181  * forward to the next hop.  Drop and signal broken route if we do not
182  * have a route.
183  *
184  * @param prev previous hop (sender)
185  * @param cid connection identifier, tells us which route to use
186  * @param msg the message to forward
187  */
188 static void
189 route_message (struct CadetPeer *prev,
190                const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
191                const struct GNUNET_MessageHeader *msg)
192 {
193   struct CadetRoute *route;
194   struct RouteDirection *dir;
195   struct GNUNET_MQ_Envelope *env;
196
197   route = get_route (cid);
198   if (NULL == route)
199   {
200     struct GNUNET_MQ_Envelope *env;
201     struct GNUNET_CADET_ConnectionBrokenMessage *bm;
202
203     LOG (GNUNET_ERROR_TYPE_DEBUG,
204          "Failed to route message of type %u from %s on connection %s: no route\n",
205          ntohs (msg->type),
206          GCP_2s (prev),
207          GNUNET_sh2s (&cid->connection_of_tunnel));
208     env = GNUNET_MQ_msg (bm,
209                          GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
210     bm->cid = *cid;
211     bm->peer1 = my_full_id;
212     GCP_send_ooo (prev,
213                   env);
214     return;
215   }
216   route->last_use = GNUNET_TIME_absolute_get ();
217   GNUNET_CONTAINER_heap_update_cost (route->hn,
218                                      route->last_use.abs_value_us);
219   dir = (prev == route->prev.hop) ? &route->next : &route->prev;
220   if (GNUNET_YES == dir->is_ready)
221   {
222     LOG (GNUNET_ERROR_TYPE_DEBUG,
223          "Routing message of type %u from %s to %s on connection %s\n",
224          ntohs (msg->type),
225          GCP_2s (prev),
226          GNUNET_i2s (GCP_get_id (dir->hop)),
227          GNUNET_sh2s (&cid->connection_of_tunnel));
228     dir->is_ready = GNUNET_NO;
229     GCP_send (dir->mqm,
230               GNUNET_MQ_msg_copy (msg));
231     return;
232   }
233   env = dir->out_buffer[dir->out_wpos];
234   if (NULL != env)
235   {
236     /* Queue full, drop earliest message in queue */
237     LOG (GNUNET_ERROR_TYPE_DEBUG,
238          "Queue full due to new message of type %u from %s to %s on connection %s, dropping old message\n",
239          ntohs (msg->type),
240          GCP_2s (prev),
241          GNUNET_i2s (GCP_get_id (dir->hop)),
242          GNUNET_sh2s (&cid->connection_of_tunnel));
243     GNUNET_STATISTICS_update (stats,
244                               "# messages dropped due to full buffer",
245                               1,
246                               GNUNET_NO);
247   GNUNET_assert (dir->out_rpos == dir->out_wpos);
248     GNUNET_MQ_discard (env);
249     dir->out_rpos++;
250     if (ROUTE_BUFFER_SIZE == dir->out_rpos)
251       dir->out_rpos = 0;
252   }
253   LOG (GNUNET_ERROR_TYPE_DEBUG,
254        "Queueing new message of type %u from %s to %s on connection %s\n",
255        ntohs (msg->type),
256        GCP_2s (prev),
257        GNUNET_i2s (GCP_get_id (dir->hop)),
258        GNUNET_sh2s (&cid->connection_of_tunnel));
259   env = GNUNET_MQ_msg_copy (msg);
260   dir->out_buffer[dir->out_wpos] = env;
261   dir->out_wpos++;
262   if (ROUTE_BUFFER_SIZE == dir->out_wpos)
263     dir->out_wpos = 0;
264 }
265
266
267 /**
268  * Check if the create_connection message has the appropriate size.
269  *
270  * @param cls Closure (unused).
271  * @param msg Message to check.
272  *
273  * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
274  */
275 static int
276 check_connection_create (void *cls,
277                          const struct GNUNET_CADET_ConnectionCreateMessage *msg)
278 {
279   uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
280
281   if (0 != (size % sizeof (struct GNUNET_PeerIdentity)))
282   {
283     GNUNET_break_op (0);
284     return GNUNET_NO;
285   }
286   return GNUNET_YES;
287 }
288
289
290 /**
291  * Free internal data of a route direction.
292  *
293  * @param dir direction to destroy (do NOT free memory of 'dir' itself)
294  */
295 static void
296 destroy_direction (struct RouteDirection *dir)
297 {
298   for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
299     if (NULL != dir->out_buffer[i])
300     {
301       GNUNET_MQ_discard (dir->out_buffer[i]);
302       dir->out_buffer[i] = NULL;
303     }
304   if (NULL != dir->mqm)
305   {
306     GCP_request_mq_cancel (dir->mqm,
307                            NULL);
308     dir->mqm = NULL;
309   }
310 }
311
312
313 /**
314  * Destroy our state for @a route.
315  *
316  * @param route route to destroy
317  */
318 static void
319 destroy_route (struct CadetRoute *route)
320 {
321   LOG (GNUNET_ERROR_TYPE_DEBUG,
322        "Destroying route from %s to %s of connection %s\n",
323        GNUNET_i2s  (GCP_get_id (route->prev.hop)),
324        GNUNET_i2s2 (GCP_get_id (route->next.hop)),
325        GNUNET_sh2s (&route->cid.connection_of_tunnel));
326   GNUNET_assert (route ==
327                  GNUNET_CONTAINER_heap_remove_node (route->hn));
328   destroy_direction (&route->prev);
329   destroy_direction (&route->next);
330   GNUNET_free (route);
331 }
332
333
334 /**
335  * Send message that a route is broken between @a peer1 and @a peer2.
336  *
337  * @param target where to send the message
338  * @param cid connection identifier to use
339  * @param peer1 one of the peers where a link is broken
340  * @param peer2 another one of the peers where a link is broken
341  */
342 static void
343 send_broken (struct RouteDirection *target,
344              const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
345              const struct GNUNET_PeerIdentity *peer1,
346              const struct GNUNET_PeerIdentity *peer2)
347 {
348   struct GNUNET_MQ_Envelope *env;
349   struct GNUNET_CADET_ConnectionBrokenMessage *bm;
350
351   if (NULL == target->mqm)
352     return; /* Can't send notification, connection is down! */
353   LOG (GNUNET_ERROR_TYPE_DEBUG,
354        "Notifying %s about BROKEN route at %s-%s of connection %s\n",
355        GCP_2s (target->hop),
356        GNUNET_i2s (peer1),
357        GNUNET_i2s2 (peer2),
358        GNUNET_sh2s (&cid->connection_of_tunnel));
359
360   env = GNUNET_MQ_msg (bm,
361                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
362   bm->cid = *cid;
363   if (NULL != peer1)
364     bm->peer1 = *peer1;
365   if (NULL != peer2)
366     bm->peer2 = *peer2;
367
368   GCP_request_mq_cancel (target->mqm,
369                          env);
370   target->mqm = NULL;
371 }
372
373
374 /**
375  * Function called to check if any routes have timed out, and if
376  * so, to clean them up.  Finally, schedules itself again at the
377  * earliest time where there might be more work.
378  *
379  * @param cls NULL
380  */
381 static void
382 timeout_cb (void *cls)
383 {
384   struct CadetRoute *r;
385   struct GNUNET_TIME_Relative linger;
386   struct GNUNET_TIME_Absolute exp;
387
388   timeout_task = NULL;
389   linger = GNUNET_TIME_relative_multiply (keepalive_period,
390                                           3);
391   while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
392   {
393     exp = GNUNET_TIME_absolute_add (r->last_use,
394                                     linger);
395     if (0 != GNUNET_TIME_absolute_get_duration (exp).rel_value_us)
396     {
397       /* Route not yet timed out, wait until it does. */
398       timeout_task = GNUNET_SCHEDULER_add_at (exp,
399                                               &timeout_cb,
400                                               NULL);
401       return;
402     }
403     send_broken (&r->prev,
404                  &r->cid,
405                  NULL,
406                  NULL);
407     send_broken (&r->next,
408                  &r->cid,
409                  NULL,
410                  NULL);
411     destroy_route (r);
412   }
413   /* No more routes left, so no need for a #timeout_task */
414 }
415
416
417 /**
418  * Function called when the message queue to the previous hop
419  * becomes available/unavailable.  We expect this function to
420  * be called immediately when we register, and then again
421  * later if the connection ever goes down.
422  *
423  * @param cls the `struct RouteDirection`
424  * @param available #GNUNET_YES if sending is now possible,
425  *                  #GNUNET_NO if sending is no longer possible
426  *                  #GNUNET_SYSERR if sending is no longer possible
427  *                                 and the last envelope was discarded
428  */
429 static void
430 dir_ready_cb (void *cls,
431               int ready)
432 {
433   struct RouteDirection *dir = cls;
434   struct CadetRoute *route = dir->my_route;
435   struct RouteDirection *odir;
436
437   if (GNUNET_YES == ready)
438   {
439     struct GNUNET_MQ_Envelope *env;
440
441     dir->is_ready = GNUNET_YES;
442     if (NULL != (env = dir->out_buffer[dir->out_rpos]))
443     {
444       dir->out_buffer[dir->out_rpos] = NULL;
445       dir->out_rpos++;
446       if (ROUTE_BUFFER_SIZE == dir->out_rpos)
447         dir->out_rpos = 0;
448       dir->is_ready = GNUNET_NO;
449       GCP_send (dir->mqm,
450                 env);
451     }
452     return;
453   }
454   odir = (dir == &route->next) ? &route->prev : &route->next;
455   send_broken (&route->next,
456                &route->cid,
457                GCP_get_id (odir->hop),
458                &my_full_id);
459   destroy_route (route);
460 }
461
462
463 /**
464  * Initialize one of the directions of a route.
465  *
466  * @param route route the direction belongs to
467  * @param dir direction to initialize
468  * @param hop next hop on in the @a dir
469  */
470 static void
471 dir_init (struct RouteDirection *dir,
472           struct CadetRoute *route,
473           struct CadetPeer *hop)
474 {
475   dir->hop = hop;
476   dir->my_route = route;
477   dir->mqm = GCP_request_mq (hop,
478                              &dir_ready_cb,
479                              dir);
480   GNUNET_assert (GNUNET_YES == dir->is_ready);
481 }
482
483
484 /**
485  * We could not create the desired route.  Send a
486  * #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
487  * message to @a target.
488  *
489  * @param target who should receive the message
490  * @param cid identifier of the connection/route that failed
491  * @param failure_at neighbour with which we failed to route,
492  *        or NULL.
493  */
494 static void
495 send_broken_without_mqm (struct CadetPeer *target,
496                          const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
497                          const struct GNUNET_PeerIdentity *failure_at)
498 {
499   struct GNUNET_MQ_Envelope *env;
500   struct GNUNET_CADET_ConnectionBrokenMessage *bm;
501
502   env = GNUNET_MQ_msg (bm,
503                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
504   bm->cid = *cid;
505   bm->peer1 = my_full_id;
506   if (NULL != failure_at)
507     bm->peer2 = *failure_at;
508   GCP_send_ooo (target,
509                 env);
510 }
511
512
513 /**
514  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
515  *
516  * @param cls Closure (CadetPeer for neighbor that sent the message).
517  * @param msg Message itself.
518  */
519 static void
520 handle_connection_create (void *cls,
521                           const struct GNUNET_CADET_ConnectionCreateMessage *msg)
522 {
523   struct CadetPeer *sender = cls;
524   struct CadetPeer *next;
525   const struct GNUNET_PeerIdentity *pids = (const struct GNUNET_PeerIdentity *) &msg[1];
526   struct CadetRoute *route;
527   uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
528   unsigned int path_length;
529   unsigned int off;
530   enum GNUNET_CADET_ChannelOption options;
531
532   options = (enum GNUNET_CADET_ChannelOption) ntohl (msg->options);
533   path_length = size / sizeof (struct GNUNET_PeerIdentity);
534   /* Initiator is at offset 0. */
535   for (off=1;off<path_length;off++)
536     if (0 == memcmp (&my_full_id,
537                      &pids[off],
538                      sizeof (struct GNUNET_PeerIdentity)))
539       break;
540   if (off == path_length)
541   {
542     /* We are not on the path, bogus request */
543     GNUNET_break_op (0);
544     return;
545   }
546   /* Check previous hop */
547   if (sender != GCP_get (&pids[off - 1],
548                          GNUNET_NO))
549   {
550     /* sender is not on the path, not allowed */
551     GNUNET_break_op (0);
552     return;
553   }
554   if (NULL !=
555       get_route (&msg->cid))
556   {
557     /* Duplicate CREATE, pass it on, previous one might have been lost! */
558     LOG (GNUNET_ERROR_TYPE_DEBUG,
559          "Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
560          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
561     route_message (sender,
562                    &msg->cid,
563                    &msg->header);
564     return;
565   }
566   if (off == path_length - 1)
567   {
568     /* We are the destination, create connection */
569     struct CadetConnection *cc;
570     struct CadetPeerPath *path;
571     struct CadetPeer *origin;
572
573     cc = GNUNET_CONTAINER_multishortmap_get (connections,
574                                              &msg->cid.connection_of_tunnel);
575     if (NULL != cc)
576     {
577       LOG (GNUNET_ERROR_TYPE_DEBUG,
578            "Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
579            GNUNET_sh2s (&msg->cid.connection_of_tunnel));
580       GCC_handle_duplicate_create (cc);
581       return;
582     }
583
584     origin = GCP_get (&pids[0],
585                       GNUNET_YES);
586     LOG (GNUNET_ERROR_TYPE_DEBUG,
587          "Received CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
588          GCP_2s (origin),
589          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
590     path = GCPP_get_path_from_route (path_length - 1,
591                                      pids);
592     if (GNUNET_OK !=
593         GCT_add_inbound_connection (GCP_get_tunnel (origin,
594                                                     GNUNET_YES),
595                                     &msg->cid,
596                                     (enum GNUNET_CADET_ChannelOption) ntohl (msg->options),
597                                     path))
598     {
599       /* Send back BROKEN: duplicate connection on the same path,
600          we will use the other one. */
601       LOG (GNUNET_ERROR_TYPE_DEBUG,
602            "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
603            GCP_2s (sender),
604            GNUNET_sh2s (&msg->cid.connection_of_tunnel),
605            GCPP_2s (path));
606       send_broken_without_mqm (sender,
607                                &msg->cid,
608                                NULL);
609       return;
610     }
611     return;
612   }
613   /* We are merely a hop on the way, check if we can support the route */
614   next = GCP_get (&pids[off + 1],
615                   GNUNET_NO);
616   if ( (NULL == next) ||
617        (GNUNET_NO == GCP_has_core_connection (next)) )
618   {
619     /* unworkable, send back BROKEN notification */
620     LOG (GNUNET_ERROR_TYPE_DEBUG,
621          "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
622          GCP_2s (sender),
623          GNUNET_sh2s (&msg->cid.connection_of_tunnel),
624          GNUNET_i2s (&pids[off + 1]),
625          off + 1);
626     send_broken_without_mqm (sender,
627                              &msg->cid,
628                              &pids[off + 1]);
629     return;
630   }
631   if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
632   {
633     LOG (GNUNET_ERROR_TYPE_DEBUG,
634          "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
635          GCP_2s (sender),
636          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
637     send_broken_without_mqm (sender,
638                              &msg->cid,
639                              &pids[off - 1]);
640     return;
641   }
642
643   /* Workable route, create routing entry */
644   LOG (GNUNET_ERROR_TYPE_DEBUG,
645        "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
646        GCP_2s (sender),
647        GNUNET_sh2s (&msg->cid.connection_of_tunnel),
648        GNUNET_i2s (&pids[off + 1]),
649        off + 1);
650   route = GNUNET_new (struct CadetRoute);
651   route->options = options;
652   route->cid = msg->cid;
653   route->last_use = GNUNET_TIME_absolute_get ();
654   dir_init (&route->prev,
655             route,
656             sender);
657   dir_init (&route->next,
658             route,
659             next);
660   GNUNET_assert (GNUNET_OK ==
661                  GNUNET_CONTAINER_multishortmap_put (routes,
662                                                      &route->cid.connection_of_tunnel,
663                                                      route,
664                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
665   route->hn = GNUNET_CONTAINER_heap_insert (route_heap,
666                                             route,
667                                             route->last_use.abs_value_us);
668   if (NULL == timeout_task)
669     timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
670                                                                                 3),
671                                                  &timeout_cb,
672                                                  NULL);
673 }
674
675
676 /**
677  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK
678  *
679  * @param cls Closure (CadetPeer for neighbor that sent the message).
680  * @param msg Message itself.
681  */
682 static void
683 handle_connection_create_ack (void *cls,
684                               const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
685 {
686   struct CadetPeer *peer = cls;
687   struct CadetConnection *cc;
688
689   /* First, check if ACK belongs to a connection that ends here. */
690   cc = GNUNET_CONTAINER_multishortmap_get (connections,
691                                            &msg->cid.connection_of_tunnel);
692   if (NULL != cc)
693   {
694     /* verify ACK came from the right direction */
695     struct CadetPeerPath *path = GCC_get_path (cc);
696
697     if (peer !=
698         GCPP_get_peer_at_offset (path,
699                                  0))
700     {
701       /* received ACK from unexpected direction, ignore! */
702       GNUNET_break_op (0);
703       return;
704     }
705     LOG (GNUNET_ERROR_TYPE_DEBUG,
706          "Received CONNECTION_CREATE_ACK for connection %s.\n",
707          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
708     GCC_handle_connection_create_ack (cc);
709     return;
710   }
711
712   /* We're just an intermediary peer, route the message along its path */
713   route_message (peer,
714                  &msg->cid,
715                  &msg->header);
716 }
717
718
719 /**
720  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
721  *
722  * @param cls Closure (CadetPeer for neighbor that sent the message).
723  * @param msg Message itself.
724  * @deprecated duplicate logic with #handle_destroy(); dedup!
725  */
726 static void
727 handle_connection_broken (void *cls,
728                           const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
729 {
730   struct CadetPeer *peer = cls;
731   struct CadetConnection *cc;
732   struct CadetRoute *route;
733
734   /* First, check if message belongs to a connection that ends here. */
735   cc = GNUNET_CONTAINER_multishortmap_get (connections,
736                                            &msg->cid.connection_of_tunnel);
737   if (NULL != cc)
738   {
739     /* verify message came from the right direction */
740     struct CadetPeerPath *path = GCC_get_path (cc);
741
742     if (peer !=
743         GCPP_get_peer_at_offset (path,
744                                  0))
745     {
746       /* received message from unexpected direction, ignore! */
747       GNUNET_break_op (0);
748       return;
749     }
750     LOG (GNUNET_ERROR_TYPE_DEBUG,
751          "Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
752          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
753     GCC_destroy_without_core (cc);
754
755     /* FIXME: also destroy the path up to the specified link! */
756     return;
757   }
758
759   /* We're just an intermediary peer, route the message along its path */
760   route = get_route (&msg->cid);
761   route_message (peer,
762                  &msg->cid,
763                  &msg->header);
764   destroy_route (route);
765   /* FIXME: also destroy paths we MAY have up to the specified link! */
766 }
767
768
769 /**
770  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
771  *
772  * @param cls Closure (CadetPeer for neighbor that sent the message).
773  * @param msg Message itself.
774  */
775 static void
776 handle_connection_destroy (void *cls,
777                            const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
778 {
779   struct CadetPeer *peer = cls;
780   struct CadetConnection *cc;
781   struct CadetRoute *route;
782
783   /* First, check if message belongs to a connection that ends here. */
784   cc = GNUNET_CONTAINER_multishortmap_get (connections,
785                                            &msg->cid.connection_of_tunnel);
786   if (NULL != cc)
787   {
788     /* verify message came from the right direction */
789     struct CadetPeerPath *path = GCC_get_path (cc);
790
791     if (peer !=
792         GCPP_get_peer_at_offset (path,
793                                  0))
794     {
795       /* received message from unexpected direction, ignore! */
796       GNUNET_break_op (0);
797       return;
798     }
799     LOG (GNUNET_ERROR_TYPE_DEBUG,
800          "Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
801          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
802
803     GCC_destroy_without_core (cc);
804     return;
805   }
806
807   /* We're just an intermediary peer, route the message along its path */
808   LOG (GNUNET_ERROR_TYPE_DEBUG,
809        "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
810        GNUNET_sh2s (&msg->cid.connection_of_tunnel));
811   route = get_route (&msg->cid);
812   route_message (peer,
813                  &msg->cid,
814                  &msg->header);
815   destroy_route (route);
816 }
817
818
819 /**
820  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
821  *
822  * @param cls Closure (CadetPeer for neighbor that sent the message).
823  * @param msg Message itself.
824  */
825 static void
826 handle_tunnel_kx (void *cls,
827                   const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
828 {
829   struct CadetPeer *peer = cls;
830   struct CadetConnection *cc;
831
832   /* First, check if message belongs to a connection that ends here. */
833   cc = GNUNET_CONTAINER_multishortmap_get (connections,
834                                            &msg->cid.connection_of_tunnel);
835   if (NULL != cc)
836   {
837     /* verify message came from the right direction */
838     struct CadetPeerPath *path = GCC_get_path (cc);
839
840     if (peer !=
841         GCPP_get_peer_at_offset (path,
842                                  0))
843     {
844       /* received message from unexpected direction, ignore! */
845       GNUNET_break_op (0);
846       return;
847     }
848     GCC_handle_kx (cc,
849                    msg);
850     return;
851   }
852
853   /* We're just an intermediary peer, route the message along its path */
854   route_message (peer,
855                  &msg->cid,
856                  &msg->header);
857 }
858
859
860 /**
861  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH
862  *
863  * @param cls Closure (CadetPeer for neighbor that sent the message).
864  * @param msg Message itself.
865  */
866 static void
867 handle_tunnel_kx_auth (void *cls,
868                        const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
869 {
870   struct CadetPeer *peer = cls;
871   struct CadetConnection *cc;
872
873   /* First, check if message belongs to a connection that ends here. */
874   cc = GNUNET_CONTAINER_multishortmap_get (connections,
875                                            &msg->kx.cid.connection_of_tunnel);
876   if (NULL != cc)
877   {
878     /* verify message came from the right direction */
879     struct CadetPeerPath *path = GCC_get_path (cc);
880
881     if (peer !=
882         GCPP_get_peer_at_offset (path,
883                                  0))
884     {
885       /* received message from unexpected direction, ignore! */
886       GNUNET_break_op (0);
887       return;
888     }
889     GCC_handle_kx_auth (cc,
890                         msg);
891     return;
892   }
893
894   /* We're just an intermediary peer, route the message along its path */
895   route_message (peer,
896                  &msg->kx.cid,
897                  &msg->kx.header);
898 }
899
900
901 /**
902  * Check if the encrypted message has the appropriate size.
903  *
904  * @param cls Closure (unused).
905  * @param msg Message to check.
906  *
907  * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
908  */
909 static int
910 check_tunnel_encrypted (void *cls,
911                         const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
912 {
913   return GNUNET_YES;
914 }
915
916
917 /**
918  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED.
919  *
920  * @param cls Closure (CadetPeer for neighbor that sent the message).
921  * @param msg Message itself.
922  */
923 static void
924 handle_tunnel_encrypted (void *cls,
925                          const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
926 {
927   struct CadetPeer *peer = cls;
928   struct CadetConnection *cc;
929
930   /* First, check if message belongs to a connection that ends here. */
931   cc = GNUNET_CONTAINER_multishortmap_get (connections,
932                                            &msg->cid.connection_of_tunnel);
933   if (NULL != cc)
934   {
935     /* verify message came from the right direction */
936     struct CadetPeerPath *path = GCC_get_path (cc);
937
938     if (peer !=
939         GCPP_get_peer_at_offset (path,
940                                  0))
941     {
942       /* received message from unexpected direction, ignore! */
943       GNUNET_break_op (0);
944       return;
945     }
946     GCC_handle_encrypted (cc,
947                           msg);
948     return;
949   }
950   /* We're just an intermediary peer, route the message along its path */
951   route_message (peer,
952                  &msg->cid,
953                  &msg->header);
954 }
955
956
957 /**
958  * Function called after #GNUNET_CORE_connect has succeeded (or failed
959  * for good).  Note that the private key of the peer is intentionally
960  * not exposed here; if you need it, your process should try to read
961  * the private key file directly (which should work if you are
962  * authorized...).  Implementations of this function must not call
963  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
964  * do this later).
965  *
966  * @param cls closure
967  * @param my_identity ID of this peer, NULL if we failed
968  */
969 static void
970 core_init_cb (void *cls,
971               const struct GNUNET_PeerIdentity *my_identity)
972 {
973   if (NULL == my_identity)
974   {
975     GNUNET_break (0);
976     return;
977   }
978   GNUNET_break (0 ==
979                 memcmp (my_identity,
980                         &my_full_id,
981                         sizeof (struct GNUNET_PeerIdentity)));
982 }
983
984
985 /**
986  * Method called whenever a given peer connects.
987  *
988  * @param cls closure
989  * @param peer peer identity this notification is about
990  */
991 static void *
992 core_connect_cb (void *cls,
993                  const struct GNUNET_PeerIdentity *peer,
994                  struct GNUNET_MQ_Handle *mq)
995 {
996   struct CadetPeer *cp;
997
998   LOG (GNUNET_ERROR_TYPE_DEBUG,
999        "CORE connection to peer %s was established.\n",
1000        GNUNET_i2s (peer));
1001   cp = GCP_get (peer,
1002                 GNUNET_YES);
1003   GCP_set_mq (cp,
1004               mq);
1005   return cp;
1006 }
1007
1008
1009 /**
1010  * Method called whenever a peer disconnects.
1011  *
1012  * @param cls closure
1013  * @param peer peer identity this notification is about
1014  */
1015 static void
1016 core_disconnect_cb (void *cls,
1017                     const struct GNUNET_PeerIdentity *peer,
1018                     void *peer_cls)
1019 {
1020   struct CadetPeer *cp = peer_cls;
1021
1022   LOG (GNUNET_ERROR_TYPE_DEBUG,
1023        "CORE connection to peer %s went down.\n",
1024        GNUNET_i2s (peer));
1025   GCP_set_mq (cp,
1026               NULL);
1027 }
1028
1029
1030 /**
1031  * Initialize the CORE subsystem.
1032  *
1033  * @param c Configuration.
1034  */
1035 void
1036 GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
1037 {
1038   struct GNUNET_MQ_MessageHandler handlers[] = {
1039     GNUNET_MQ_hd_var_size (connection_create,
1040                            GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
1041                            struct GNUNET_CADET_ConnectionCreateMessage,
1042                            NULL),
1043     GNUNET_MQ_hd_fixed_size (connection_create_ack,
1044                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
1045                              struct GNUNET_CADET_ConnectionCreateAckMessage,
1046                              NULL),
1047     GNUNET_MQ_hd_fixed_size (connection_broken,
1048                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1049                              struct GNUNET_CADET_ConnectionBrokenMessage,
1050                              NULL),
1051     GNUNET_MQ_hd_fixed_size (connection_destroy,
1052                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
1053                              struct GNUNET_CADET_ConnectionDestroyMessage,
1054                              NULL),
1055     GNUNET_MQ_hd_fixed_size (tunnel_kx,
1056                              GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
1057                              struct GNUNET_CADET_TunnelKeyExchangeMessage,
1058                              NULL),
1059     GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
1060                              GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
1061                              struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
1062                              NULL),
1063     GNUNET_MQ_hd_var_size (tunnel_encrypted,
1064                            GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
1065                            struct GNUNET_CADET_TunnelEncryptedMessage,
1066                            NULL),
1067     GNUNET_MQ_handler_end ()
1068   };
1069
1070   if (GNUNET_OK !=
1071       GNUNET_CONFIGURATION_get_value_number (c,
1072                                              "CADET",
1073                                              "MAX_ROUTES",
1074                                              &max_routes))
1075     max_routes = 10000;
1076   routes = GNUNET_CONTAINER_multishortmap_create (1024,
1077                                                   GNUNET_NO);
1078   route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1079   core = GNUNET_CORE_connect (c,
1080                               NULL,
1081                               &core_init_cb,
1082                               &core_connect_cb,
1083                               &core_disconnect_cb,
1084                               handlers);
1085 }
1086
1087
1088 /**
1089  * Shut down the CORE subsystem.
1090  */
1091 void
1092 GCO_shutdown ()
1093 {
1094   if (NULL != core)
1095   {
1096     GNUNET_CORE_disconnect (core);
1097     core = NULL;
1098   }
1099   GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
1100   GNUNET_CONTAINER_multishortmap_destroy (routes);
1101   routes = NULL;
1102   GNUNET_CONTAINER_heap_destroy (route_heap);
1103   route_heap = NULL;
1104   if (NULL != timeout_task)
1105   {
1106     GNUNET_SCHEDULER_cancel (timeout_task);
1107     timeout_task = NULL;
1108   }
1109 }
1110
1111 /* end of gnunet-cadet-service_core.c */