putting into place the data structures for a global buffer pool shared across routes
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_core.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file cadet/gnunet-service-cadet_core.c
23  * @brief cadet service; interaction with CORE service
24  * @author Bartlomiej Polot
25  * @author Christian Grothoff
26  *
27  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
28  *
29  * TODO:
30  * - properly implement GLOBAL message buffer, instead of per-route buffers
31  * - do NOT use buffering if the route options say no buffer!
32  * - Optimization: given BROKEN messages, destroy paths (?)
33  */
34 #include "platform.h"
35 #include "gnunet-service-cadet-new_core.h"
36 #include "gnunet-service-cadet-new_paths.h"
37 #include "gnunet-service-cadet-new_peer.h"
38 #include "gnunet-service-cadet-new_connection.h"
39 #include "gnunet-service-cadet-new_tunnels.h"
40 #include "gnunet_core_service.h"
41 #include "gnunet_statistics_service.h"
42 #include "cadet_protocol.h"
43
44
45 #define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
46
47 /**
48  * Information we keep per direction for a route.
49  */
50 struct RouteDirection;
51
52
53 /**
54  * Set of CadetRoutes that have exactly the same number of messages
55  * in their buffer.  Used so we can efficiently find all of those
56  * routes that have the current maximum of messages in the buffer (in
57  * case we have to purge).
58  */
59 struct Rung
60 {
61
62   /**
63    * Rung of RouteDirections with one more buffer entry each.
64    */
65   struct Rung *next;
66
67   /**
68    * Rung of RouteDirections with one less buffer entry each.
69    */
70   struct Rung *prev;
71
72   /**
73    * DLL of route directions with a number of buffer entries matching this rung.
74    */
75   struct RouteDirection *rd_head;
76
77   /**
78    * DLL of route directions with a number of buffer entries matching this rung.
79    */
80   struct RouteDirection *rd_tail;
81
82   /**
83    * Total number of route directions in this rung.
84    */
85   unsigned int num_routes;
86 };
87
88
89 /**
90  * Number of messages we are willing to buffer per route.
91  * FIXME: have global buffer pool instead!
92  */
93 #define ROUTE_BUFFER_SIZE 8
94
95
96 /**
97  * Information we keep per direction for a route.
98  */
99 struct RouteDirection
100 {
101
102   /**
103    * DLL of other route directions within the same `struct Rung`.
104    */
105   struct RouteDirection *prev;
106
107   /**
108    * DLL of other route directions within the same `struct Rung`.
109    */
110   struct RouteDirection *next;
111
112   /**
113    * Rung of this route direction (matches length of the buffer DLL).
114    */
115   struct Rung *rung;
116
117   /**
118    * Head of DLL of envelopes we have in the buffer for this direction.
119    */
120   struct GNUNET_MQ_Envelope *env_head;
121
122   /**
123    * Tail of DLL of envelopes we have in the buffer for this direction.
124    */
125   struct GNUNET_MQ_Envelope *env_tail;
126
127   /**
128    * Target peer.
129    */
130   struct CadetPeer *hop;
131
132   /**
133    * Route this direction is part of.
134    */
135   struct CadetRoute *my_route;
136
137   /**
138    * Message queue manager for @e hop.
139    */
140   struct GCP_MessageQueueManager *mqm;
141
142   /**
143    * Cyclic message buffer to @e hop.
144    */
145   struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
146
147   /**
148    * Next write offset to use to append messages to @e out_buffer.
149    */
150   unsigned int out_wpos;
151
152   /**
153    * Next read offset to use to retrieve messages from @e out_buffer.
154    */
155   unsigned int out_rpos;
156
157   /**
158    * Is @e mqm currently ready for transmission?
159    */
160   int is_ready;
161
162 };
163
164
165 /**
166  * Description of a segment of a `struct CadetConnection` at the
167  * intermediate peers.  Routes are basically entries in a peer's
168  * routing table for forwarding traffic.  At both endpoints, the
169  * routes are terminated by a `struct CadetConnection`, which knows
170  * the complete `struct CadetPath` that is formed by the individual
171  * routes.
172  */
173 struct CadetRoute
174 {
175
176   /**
177    * Information about the next hop on this route.
178    */
179   struct RouteDirection next;
180
181   /**
182    * Information about the previous hop on this route.
183    */
184   struct RouteDirection prev;
185
186   /**
187    * Unique identifier for the connection that uses this route.
188    */
189   struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
190
191   /**
192    * When was this route last in use?
193    */
194   struct GNUNET_TIME_Absolute last_use;
195
196   /**
197    * Position of this route in the #route_heap.
198    */
199   struct GNUNET_CONTAINER_HeapNode *hn;
200
201   /**
202    * Options for the route, control buffering.
203    */
204   enum GNUNET_CADET_ChannelOption options;
205 };
206
207
208 /**
209  * Handle to the CORE service.
210  */
211 static struct GNUNET_CORE_Handle *core;
212
213 /**
214  * Routes on which this peer is an intermediate.
215  */
216 static struct GNUNET_CONTAINER_MultiShortmap *routes;
217
218 /**
219  * Heap of routes, MIN-sorted by last activity.
220  */
221 static struct GNUNET_CONTAINER_Heap *route_heap;
222
223 /**
224  * Maximum number of concurrent routes this peer will support.
225  */
226 static unsigned long long max_routes;
227
228 /**
229  * Maximum number of envelopes we will buffer at this peer.
230  */
231 static unsigned long long max_buffers;
232
233 /**
234  * Current number of envelopes we have buffered at this peer.
235  */
236 static unsigned long long cur_buffers;
237
238 /**
239  * Task to timeout routes.
240  */
241 static struct GNUNET_SCHEDULER_Task *timeout_task;
242
243
244 /**
245  * Get the route corresponding to a hash.
246  *
247  * @param cid hash generated from the connection identifier
248  */
249 static struct CadetRoute *
250 get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
251 {
252   return GNUNET_CONTAINER_multishortmap_get (routes,
253                                              &cid->connection_of_tunnel);
254 }
255
256
257 /**
258  * We message @a msg from @a prev.  Find its route by @a cid and
259  * forward to the next hop.  Drop and signal broken route if we do not
260  * have a route.
261  *
262  * @param prev previous hop (sender)
263  * @param cid connection identifier, tells us which route to use
264  * @param msg the message to forward
265  */
266 static void
267 route_message (struct CadetPeer *prev,
268                const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
269                const struct GNUNET_MessageHeader *msg)
270 {
271   struct CadetRoute *route;
272   struct RouteDirection *dir;
273   struct GNUNET_MQ_Envelope *env;
274
275   route = get_route (cid);
276   if (NULL == route)
277   {
278     struct GNUNET_MQ_Envelope *env;
279     struct GNUNET_CADET_ConnectionBrokenMessage *bm;
280
281     LOG (GNUNET_ERROR_TYPE_DEBUG,
282          "Failed to route message of type %u from %s on connection %s: no route\n",
283          ntohs (msg->type),
284          GCP_2s (prev),
285          GNUNET_sh2s (&cid->connection_of_tunnel));
286     env = GNUNET_MQ_msg (bm,
287                          GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
288     bm->cid = *cid;
289     bm->peer1 = my_full_id;
290     GCP_send_ooo (prev,
291                   env);
292     return;
293   }
294   route->last_use = GNUNET_TIME_absolute_get ();
295   GNUNET_CONTAINER_heap_update_cost (route->hn,
296                                      route->last_use.abs_value_us);
297   dir = (prev == route->prev.hop) ? &route->next : &route->prev;
298   if (GNUNET_YES == dir->is_ready)
299   {
300     LOG (GNUNET_ERROR_TYPE_DEBUG,
301          "Routing message of type %u from %s to %s on connection %s\n",
302          ntohs (msg->type),
303          GCP_2s (prev),
304          GNUNET_i2s (GCP_get_id (dir->hop)),
305          GNUNET_sh2s (&cid->connection_of_tunnel));
306     dir->is_ready = GNUNET_NO;
307     GCP_send (dir->mqm,
308               GNUNET_MQ_msg_copy (msg));
309     return;
310   }
311   env = dir->out_buffer[dir->out_wpos];
312   if (NULL != env)
313   {
314     /* Queue full, drop earliest message in queue */
315     LOG (GNUNET_ERROR_TYPE_DEBUG,
316          "Queue full due to new message of type %u from %s to %s on connection %s, dropping old message\n",
317          ntohs (msg->type),
318          GCP_2s (prev),
319          GNUNET_i2s (GCP_get_id (dir->hop)),
320          GNUNET_sh2s (&cid->connection_of_tunnel));
321     GNUNET_STATISTICS_update (stats,
322                               "# messages dropped due to full buffer",
323                               1,
324                               GNUNET_NO);
325   GNUNET_assert (dir->out_rpos == dir->out_wpos);
326     GNUNET_MQ_discard (env);
327     dir->out_rpos++;
328     if (ROUTE_BUFFER_SIZE == dir->out_rpos)
329       dir->out_rpos = 0;
330   }
331   LOG (GNUNET_ERROR_TYPE_DEBUG,
332        "Queueing new message of type %u from %s to %s on connection %s\n",
333        ntohs (msg->type),
334        GCP_2s (prev),
335        GNUNET_i2s (GCP_get_id (dir->hop)),
336        GNUNET_sh2s (&cid->connection_of_tunnel));
337   env = GNUNET_MQ_msg_copy (msg);
338   dir->out_buffer[dir->out_wpos] = env;
339   dir->out_wpos++;
340   if (ROUTE_BUFFER_SIZE == dir->out_wpos)
341     dir->out_wpos = 0;
342 }
343
344
345 /**
346  * Check if the create_connection message has the appropriate size.
347  *
348  * @param cls Closure (unused).
349  * @param msg Message to check.
350  *
351  * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
352  */
353 static int
354 check_connection_create (void *cls,
355                          const struct GNUNET_CADET_ConnectionCreateMessage *msg)
356 {
357   uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
358
359   if (0 != (size % sizeof (struct GNUNET_PeerIdentity)))
360   {
361     GNUNET_break_op (0);
362     return GNUNET_NO;
363   }
364   return GNUNET_YES;
365 }
366
367
368 /**
369  * Free internal data of a route direction.
370  *
371  * @param dir direction to destroy (do NOT free memory of 'dir' itself)
372  */
373 static void
374 destroy_direction (struct RouteDirection *dir)
375 {
376   for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
377     if (NULL != dir->out_buffer[i])
378     {
379       GNUNET_MQ_discard (dir->out_buffer[i]);
380       dir->out_buffer[i] = NULL;
381     }
382   if (NULL != dir->mqm)
383   {
384     GCP_request_mq_cancel (dir->mqm,
385                            NULL);
386     dir->mqm = NULL;
387   }
388 }
389
390
391 /**
392  * Destroy our state for @a route.
393  *
394  * @param route route to destroy
395  */
396 static void
397 destroy_route (struct CadetRoute *route)
398 {
399   LOG (GNUNET_ERROR_TYPE_DEBUG,
400        "Destroying route from %s to %s of connection %s\n",
401        GNUNET_i2s  (GCP_get_id (route->prev.hop)),
402        GNUNET_i2s2 (GCP_get_id (route->next.hop)),
403        GNUNET_sh2s (&route->cid.connection_of_tunnel));
404   GNUNET_assert (route ==
405                  GNUNET_CONTAINER_heap_remove_node (route->hn));
406   GNUNET_assert (GNUNET_YES ==
407                  GNUNET_CONTAINER_multishortmap_remove (routes,
408                                                         &route->cid.connection_of_tunnel,
409                                                         route));
410   destroy_direction (&route->prev);
411   destroy_direction (&route->next);
412   GNUNET_free (route);
413 }
414
415
416 /**
417  * Send message that a route is broken between @a peer1 and @a peer2.
418  *
419  * @param target where to send the message
420  * @param cid connection identifier to use
421  * @param peer1 one of the peers where a link is broken
422  * @param peer2 another one of the peers where a link is broken
423  */
424 static void
425 send_broken (struct RouteDirection *target,
426              const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
427              const struct GNUNET_PeerIdentity *peer1,
428              const struct GNUNET_PeerIdentity *peer2)
429 {
430   struct GNUNET_MQ_Envelope *env;
431   struct GNUNET_CADET_ConnectionBrokenMessage *bm;
432
433   if (NULL == target->mqm)
434     return; /* Can't send notification, connection is down! */
435   LOG (GNUNET_ERROR_TYPE_DEBUG,
436        "Notifying %s about BROKEN route at %s-%s of connection %s\n",
437        GCP_2s (target->hop),
438        GNUNET_i2s (peer1),
439        GNUNET_i2s2 (peer2),
440        GNUNET_sh2s (&cid->connection_of_tunnel));
441
442   env = GNUNET_MQ_msg (bm,
443                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
444   bm->cid = *cid;
445   if (NULL != peer1)
446     bm->peer1 = *peer1;
447   if (NULL != peer2)
448     bm->peer2 = *peer2;
449
450   GCP_request_mq_cancel (target->mqm,
451                          env);
452   target->mqm = NULL;
453 }
454
455
456 /**
457  * Function called to check if any routes have timed out, and if
458  * so, to clean them up.  Finally, schedules itself again at the
459  * earliest time where there might be more work.
460  *
461  * @param cls NULL
462  */
463 static void
464 timeout_cb (void *cls)
465 {
466   struct CadetRoute *r;
467   struct GNUNET_TIME_Relative linger;
468   struct GNUNET_TIME_Absolute exp;
469
470   timeout_task = NULL;
471   linger = GNUNET_TIME_relative_multiply (keepalive_period,
472                                           3);
473   while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
474   {
475     exp = GNUNET_TIME_absolute_add (r->last_use,
476                                     linger);
477     if (0 != GNUNET_TIME_absolute_get_duration (exp).rel_value_us)
478     {
479       /* Route not yet timed out, wait until it does. */
480       timeout_task = GNUNET_SCHEDULER_add_at (exp,
481                                               &timeout_cb,
482                                               NULL);
483       return;
484     }
485     send_broken (&r->prev,
486                  &r->cid,
487                  NULL,
488                  NULL);
489     send_broken (&r->next,
490                  &r->cid,
491                  NULL,
492                  NULL);
493     destroy_route (r);
494   }
495   /* No more routes left, so no need for a #timeout_task */
496 }
497
498
499 /**
500  * Function called when the message queue to the previous hop
501  * becomes available/unavailable.  We expect this function to
502  * be called immediately when we register, and then again
503  * later if the connection ever goes down.
504  *
505  * @param cls the `struct RouteDirection`
506  * @param available #GNUNET_YES if sending is now possible,
507  *                  #GNUNET_NO if sending is no longer possible
508  *                  #GNUNET_SYSERR if sending is no longer possible
509  *                                 and the last envelope was discarded
510  */
511 static void
512 dir_ready_cb (void *cls,
513               int ready)
514 {
515   struct RouteDirection *dir = cls;
516   struct CadetRoute *route = dir->my_route;
517   struct RouteDirection *odir;
518
519   if (GNUNET_YES == ready)
520   {
521     struct GNUNET_MQ_Envelope *env;
522
523     dir->is_ready = GNUNET_YES;
524     if (NULL != (env = dir->out_buffer[dir->out_rpos]))
525     {
526       dir->out_buffer[dir->out_rpos] = NULL;
527       dir->out_rpos++;
528       if (ROUTE_BUFFER_SIZE == dir->out_rpos)
529         dir->out_rpos = 0;
530       dir->is_ready = GNUNET_NO;
531       GCP_send (dir->mqm,
532                 env);
533     }
534     return;
535   }
536   odir = (dir == &route->next) ? &route->prev : &route->next;
537   send_broken (&route->next,
538                &route->cid,
539                GCP_get_id (odir->hop),
540                &my_full_id);
541   destroy_route (route);
542 }
543
544
545 /**
546  * Initialize one of the directions of a route.
547  *
548  * @param route route the direction belongs to
549  * @param dir direction to initialize
550  * @param hop next hop on in the @a dir
551  */
552 static void
553 dir_init (struct RouteDirection *dir,
554           struct CadetRoute *route,
555           struct CadetPeer *hop)
556 {
557   dir->hop = hop;
558   dir->my_route = route;
559   dir->mqm = GCP_request_mq (hop,
560                              &dir_ready_cb,
561                              dir);
562   GNUNET_assert (GNUNET_YES == dir->is_ready);
563 }
564
565
566 /**
567  * We could not create the desired route.  Send a
568  * #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
569  * message to @a target.
570  *
571  * @param target who should receive the message
572  * @param cid identifier of the connection/route that failed
573  * @param failure_at neighbour with which we failed to route,
574  *        or NULL.
575  */
576 static void
577 send_broken_without_mqm (struct CadetPeer *target,
578                          const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
579                          const struct GNUNET_PeerIdentity *failure_at)
580 {
581   struct GNUNET_MQ_Envelope *env;
582   struct GNUNET_CADET_ConnectionBrokenMessage *bm;
583
584   env = GNUNET_MQ_msg (bm,
585                        GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
586   bm->cid = *cid;
587   bm->peer1 = my_full_id;
588   if (NULL != failure_at)
589     bm->peer2 = *failure_at;
590   GCP_send_ooo (target,
591                 env);
592 }
593
594
595 /**
596  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
597  *
598  * @param cls Closure (CadetPeer for neighbor that sent the message).
599  * @param msg Message itself.
600  */
601 static void
602 handle_connection_create (void *cls,
603                           const struct GNUNET_CADET_ConnectionCreateMessage *msg)
604 {
605   struct CadetPeer *sender = cls;
606   struct CadetPeer *next;
607   const struct GNUNET_PeerIdentity *pids = (const struct GNUNET_PeerIdentity *) &msg[1];
608   struct CadetRoute *route;
609   uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
610   unsigned int path_length;
611   unsigned int off;
612   enum GNUNET_CADET_ChannelOption options;
613
614   options = (enum GNUNET_CADET_ChannelOption) ntohl (msg->options);
615   path_length = size / sizeof (struct GNUNET_PeerIdentity);
616   /* Initiator is at offset 0. */
617   for (off=1;off<path_length;off++)
618     if (0 == memcmp (&my_full_id,
619                      &pids[off],
620                      sizeof (struct GNUNET_PeerIdentity)))
621       break;
622   if (off == path_length)
623   {
624     /* We are not on the path, bogus request */
625     GNUNET_break_op (0);
626     return;
627   }
628   /* Check previous hop */
629   if (sender != GCP_get (&pids[off - 1],
630                          GNUNET_NO))
631   {
632     /* sender is not on the path, not allowed */
633     GNUNET_break_op (0);
634     return;
635   }
636   if (NULL !=
637       get_route (&msg->cid))
638   {
639     /* Duplicate CREATE, pass it on, previous one might have been lost! */
640     LOG (GNUNET_ERROR_TYPE_DEBUG,
641          "Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
642          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
643     route_message (sender,
644                    &msg->cid,
645                    &msg->header);
646     return;
647   }
648   if (off == path_length - 1)
649   {
650     /* We are the destination, create connection */
651     struct CadetConnection *cc;
652     struct CadetPeerPath *path;
653     struct CadetPeer *origin;
654
655     cc = GNUNET_CONTAINER_multishortmap_get (connections,
656                                              &msg->cid.connection_of_tunnel);
657     if (NULL != cc)
658     {
659       LOG (GNUNET_ERROR_TYPE_DEBUG,
660            "Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
661            GNUNET_sh2s (&msg->cid.connection_of_tunnel));
662       GCC_handle_duplicate_create (cc);
663       return;
664     }
665
666     origin = GCP_get (&pids[0],
667                       GNUNET_YES);
668     LOG (GNUNET_ERROR_TYPE_DEBUG,
669          "Received CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
670          GCP_2s (origin),
671          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
672     path = GCPP_get_path_from_route (path_length - 1,
673                                      pids);
674     if (GNUNET_OK !=
675         GCT_add_inbound_connection (GCP_get_tunnel (origin,
676                                                     GNUNET_YES),
677                                     &msg->cid,
678                                     (enum GNUNET_CADET_ChannelOption) ntohl (msg->options),
679                                     path))
680     {
681       /* Send back BROKEN: duplicate connection on the same path,
682          we will use the other one. */
683       LOG (GNUNET_ERROR_TYPE_DEBUG,
684            "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
685            GCP_2s (sender),
686            GNUNET_sh2s (&msg->cid.connection_of_tunnel),
687            GCPP_2s (path));
688       send_broken_without_mqm (sender,
689                                &msg->cid,
690                                NULL);
691       return;
692     }
693     return;
694   }
695   /* We are merely a hop on the way, check if we can support the route */
696   next = GCP_get (&pids[off + 1],
697                   GNUNET_NO);
698   if ( (NULL == next) ||
699        (GNUNET_NO == GCP_has_core_connection (next)) )
700   {
701     /* unworkable, send back BROKEN notification */
702     LOG (GNUNET_ERROR_TYPE_DEBUG,
703          "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
704          GCP_2s (sender),
705          GNUNET_sh2s (&msg->cid.connection_of_tunnel),
706          GNUNET_i2s (&pids[off + 1]),
707          off + 1);
708     send_broken_without_mqm (sender,
709                              &msg->cid,
710                              &pids[off + 1]);
711     return;
712   }
713   if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
714   {
715     LOG (GNUNET_ERROR_TYPE_DEBUG,
716          "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
717          GCP_2s (sender),
718          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
719     send_broken_without_mqm (sender,
720                              &msg->cid,
721                              &pids[off - 1]);
722     return;
723   }
724
725   /* Workable route, create routing entry */
726   LOG (GNUNET_ERROR_TYPE_DEBUG,
727        "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
728        GCP_2s (sender),
729        GNUNET_sh2s (&msg->cid.connection_of_tunnel),
730        GNUNET_i2s (&pids[off + 1]),
731        off + 1);
732   route = GNUNET_new (struct CadetRoute);
733   route->options = options;
734   route->cid = msg->cid;
735   route->last_use = GNUNET_TIME_absolute_get ();
736   dir_init (&route->prev,
737             route,
738             sender);
739   dir_init (&route->next,
740             route,
741             next);
742   GNUNET_assert (GNUNET_OK ==
743                  GNUNET_CONTAINER_multishortmap_put (routes,
744                                                      &route->cid.connection_of_tunnel,
745                                                      route,
746                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
747   route->hn = GNUNET_CONTAINER_heap_insert (route_heap,
748                                             route,
749                                             route->last_use.abs_value_us);
750   if (NULL == timeout_task)
751     timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
752                                                                                 3),
753                                                  &timeout_cb,
754                                                  NULL);
755 }
756
757
758 /**
759  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK
760  *
761  * @param cls Closure (CadetPeer for neighbor that sent the message).
762  * @param msg Message itself.
763  */
764 static void
765 handle_connection_create_ack (void *cls,
766                               const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
767 {
768   struct CadetPeer *peer = cls;
769   struct CadetConnection *cc;
770
771   /* First, check if ACK belongs to a connection that ends here. */
772   cc = GNUNET_CONTAINER_multishortmap_get (connections,
773                                            &msg->cid.connection_of_tunnel);
774   if (NULL != cc)
775   {
776     /* verify ACK came from the right direction */
777     struct CadetPeerPath *path = GCC_get_path (cc);
778
779     if (peer !=
780         GCPP_get_peer_at_offset (path,
781                                  0))
782     {
783       /* received ACK from unexpected direction, ignore! */
784       GNUNET_break_op (0);
785       return;
786     }
787     LOG (GNUNET_ERROR_TYPE_DEBUG,
788          "Received CONNECTION_CREATE_ACK for connection %s.\n",
789          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
790     GCC_handle_connection_create_ack (cc);
791     return;
792   }
793
794   /* We're just an intermediary peer, route the message along its path */
795   route_message (peer,
796                  &msg->cid,
797                  &msg->header);
798 }
799
800
801 /**
802  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
803  *
804  * @param cls Closure (CadetPeer for neighbor that sent the message).
805  * @param msg Message itself.
806  * @deprecated duplicate logic with #handle_destroy(); dedup!
807  */
808 static void
809 handle_connection_broken (void *cls,
810                           const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
811 {
812   struct CadetPeer *peer = cls;
813   struct CadetConnection *cc;
814   struct CadetRoute *route;
815
816   /* First, check if message belongs to a connection that ends here. */
817   cc = GNUNET_CONTAINER_multishortmap_get (connections,
818                                            &msg->cid.connection_of_tunnel);
819   if (NULL != cc)
820   {
821     /* verify message came from the right direction */
822     struct CadetPeerPath *path = GCC_get_path (cc);
823
824     if (peer !=
825         GCPP_get_peer_at_offset (path,
826                                  0))
827     {
828       /* received message from unexpected direction, ignore! */
829       GNUNET_break_op (0);
830       return;
831     }
832     LOG (GNUNET_ERROR_TYPE_DEBUG,
833          "Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
834          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
835     GCC_destroy_without_core (cc);
836
837     /* FIXME: also destroy the path up to the specified link! */
838     return;
839   }
840
841   /* We're just an intermediary peer, route the message along its path */
842   route_message (peer,
843                  &msg->cid,
844                  &msg->header);
845   route = get_route (&msg->cid);
846   if (NULL != route)
847     destroy_route (route);
848   /* FIXME: also destroy paths we MAY have up to the specified link! */
849 }
850
851
852 /**
853  * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
854  *
855  * @param cls Closure (CadetPeer for neighbor that sent the message).
856  * @param msg Message itself.
857  */
858 static void
859 handle_connection_destroy (void *cls,
860                            const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
861 {
862   struct CadetPeer *peer = cls;
863   struct CadetConnection *cc;
864   struct CadetRoute *route;
865
866   /* First, check if message belongs to a connection that ends here. */
867   cc = GNUNET_CONTAINER_multishortmap_get (connections,
868                                            &msg->cid.connection_of_tunnel);
869   if (NULL != cc)
870   {
871     /* verify message came from the right direction */
872     struct CadetPeerPath *path = GCC_get_path (cc);
873
874     if (peer !=
875         GCPP_get_peer_at_offset (path,
876                                  0))
877     {
878       /* received message from unexpected direction, ignore! */
879       GNUNET_break_op (0);
880       return;
881     }
882     LOG (GNUNET_ERROR_TYPE_DEBUG,
883          "Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
884          GNUNET_sh2s (&msg->cid.connection_of_tunnel));
885
886     GCC_destroy_without_core (cc);
887     return;
888   }
889
890   /* We're just an intermediary peer, route the message along its path */
891   LOG (GNUNET_ERROR_TYPE_DEBUG,
892        "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
893        GNUNET_sh2s (&msg->cid.connection_of_tunnel));
894   route_message (peer,
895                  &msg->cid,
896                  &msg->header);
897   route = get_route (&msg->cid);
898   if (NULL != route)
899     destroy_route (route);
900 }
901
902
903 /**
904  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
905  *
906  * @param cls Closure (CadetPeer for neighbor that sent the message).
907  * @param msg Message itself.
908  */
909 static void
910 handle_tunnel_kx (void *cls,
911                   const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
912 {
913   struct CadetPeer *peer = cls;
914   struct CadetConnection *cc;
915
916   /* First, check if message belongs to a connection that ends here. */
917   cc = GNUNET_CONTAINER_multishortmap_get (connections,
918                                            &msg->cid.connection_of_tunnel);
919   if (NULL != cc)
920   {
921     /* verify message came from the right direction */
922     struct CadetPeerPath *path = GCC_get_path (cc);
923
924     if (peer !=
925         GCPP_get_peer_at_offset (path,
926                                  0))
927     {
928       /* received message from unexpected direction, ignore! */
929       GNUNET_break_op (0);
930       return;
931     }
932     GCC_handle_kx (cc,
933                    msg);
934     return;
935   }
936
937   /* We're just an intermediary peer, route the message along its path */
938   route_message (peer,
939                  &msg->cid,
940                  &msg->header);
941 }
942
943
944 /**
945  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH
946  *
947  * @param cls Closure (CadetPeer for neighbor that sent the message).
948  * @param msg Message itself.
949  */
950 static void
951 handle_tunnel_kx_auth (void *cls,
952                        const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
953 {
954   struct CadetPeer *peer = cls;
955   struct CadetConnection *cc;
956
957   /* First, check if message belongs to a connection that ends here. */
958   cc = GNUNET_CONTAINER_multishortmap_get (connections,
959                                            &msg->kx.cid.connection_of_tunnel);
960   if (NULL != cc)
961   {
962     /* verify message came from the right direction */
963     struct CadetPeerPath *path = GCC_get_path (cc);
964
965     if (peer !=
966         GCPP_get_peer_at_offset (path,
967                                  0))
968     {
969       /* received message from unexpected direction, ignore! */
970       GNUNET_break_op (0);
971       return;
972     }
973     GCC_handle_kx_auth (cc,
974                         msg);
975     return;
976   }
977
978   /* We're just an intermediary peer, route the message along its path */
979   route_message (peer,
980                  &msg->kx.cid,
981                  &msg->kx.header);
982 }
983
984
985 /**
986  * Check if the encrypted message has the appropriate size.
987  *
988  * @param cls Closure (unused).
989  * @param msg Message to check.
990  *
991  * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
992  */
993 static int
994 check_tunnel_encrypted (void *cls,
995                         const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
996 {
997   return GNUNET_YES;
998 }
999
1000
1001 /**
1002  * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED.
1003  *
1004  * @param cls Closure (CadetPeer for neighbor that sent the message).
1005  * @param msg Message itself.
1006  */
1007 static void
1008 handle_tunnel_encrypted (void *cls,
1009                          const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
1010 {
1011   struct CadetPeer *peer = cls;
1012   struct CadetConnection *cc;
1013
1014   /* First, check if message belongs to a connection that ends here. */
1015   cc = GNUNET_CONTAINER_multishortmap_get (connections,
1016                                            &msg->cid.connection_of_tunnel);
1017   if (NULL != cc)
1018   {
1019     /* verify message came from the right direction */
1020     struct CadetPeerPath *path = GCC_get_path (cc);
1021
1022     if (peer !=
1023         GCPP_get_peer_at_offset (path,
1024                                  0))
1025     {
1026       /* received message from unexpected direction, ignore! */
1027       GNUNET_break_op (0);
1028       return;
1029     }
1030     GCC_handle_encrypted (cc,
1031                           msg);
1032     return;
1033   }
1034   /* We're just an intermediary peer, route the message along its path */
1035   route_message (peer,
1036                  &msg->cid,
1037                  &msg->header);
1038 }
1039
1040
1041 /**
1042  * Function called after #GNUNET_CORE_connect has succeeded (or failed
1043  * for good).  Note that the private key of the peer is intentionally
1044  * not exposed here; if you need it, your process should try to read
1045  * the private key file directly (which should work if you are
1046  * authorized...).  Implementations of this function must not call
1047  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
1048  * do this later).
1049  *
1050  * @param cls closure
1051  * @param my_identity ID of this peer, NULL if we failed
1052  */
1053 static void
1054 core_init_cb (void *cls,
1055               const struct GNUNET_PeerIdentity *my_identity)
1056 {
1057   if (NULL == my_identity)
1058   {
1059     GNUNET_break (0);
1060     return;
1061   }
1062   GNUNET_break (0 ==
1063                 memcmp (my_identity,
1064                         &my_full_id,
1065                         sizeof (struct GNUNET_PeerIdentity)));
1066 }
1067
1068
1069 /**
1070  * Method called whenever a given peer connects.
1071  *
1072  * @param cls closure
1073  * @param peer peer identity this notification is about
1074  */
1075 static void *
1076 core_connect_cb (void *cls,
1077                  const struct GNUNET_PeerIdentity *peer,
1078                  struct GNUNET_MQ_Handle *mq)
1079 {
1080   struct CadetPeer *cp;
1081
1082   LOG (GNUNET_ERROR_TYPE_DEBUG,
1083        "CORE connection to peer %s was established.\n",
1084        GNUNET_i2s (peer));
1085   cp = GCP_get (peer,
1086                 GNUNET_YES);
1087   GCP_set_mq (cp,
1088               mq);
1089   return cp;
1090 }
1091
1092
1093 /**
1094  * Method called whenever a peer disconnects.
1095  *
1096  * @param cls closure
1097  * @param peer peer identity this notification is about
1098  */
1099 static void
1100 core_disconnect_cb (void *cls,
1101                     const struct GNUNET_PeerIdentity *peer,
1102                     void *peer_cls)
1103 {
1104   struct CadetPeer *cp = peer_cls;
1105
1106   LOG (GNUNET_ERROR_TYPE_DEBUG,
1107        "CORE connection to peer %s went down.\n",
1108        GNUNET_i2s (peer));
1109   GCP_set_mq (cp,
1110               NULL);
1111 }
1112
1113
1114 /**
1115  * Initialize the CORE subsystem.
1116  *
1117  * @param c Configuration.
1118  */
1119 void
1120 GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
1121 {
1122   struct GNUNET_MQ_MessageHandler handlers[] = {
1123     GNUNET_MQ_hd_var_size (connection_create,
1124                            GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
1125                            struct GNUNET_CADET_ConnectionCreateMessage,
1126                            NULL),
1127     GNUNET_MQ_hd_fixed_size (connection_create_ack,
1128                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
1129                              struct GNUNET_CADET_ConnectionCreateAckMessage,
1130                              NULL),
1131     GNUNET_MQ_hd_fixed_size (connection_broken,
1132                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1133                              struct GNUNET_CADET_ConnectionBrokenMessage,
1134                              NULL),
1135     GNUNET_MQ_hd_fixed_size (connection_destroy,
1136                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
1137                              struct GNUNET_CADET_ConnectionDestroyMessage,
1138                              NULL),
1139     GNUNET_MQ_hd_fixed_size (tunnel_kx,
1140                              GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
1141                              struct GNUNET_CADET_TunnelKeyExchangeMessage,
1142                              NULL),
1143     GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
1144                              GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
1145                              struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
1146                              NULL),
1147     GNUNET_MQ_hd_var_size (tunnel_encrypted,
1148                            GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
1149                            struct GNUNET_CADET_TunnelEncryptedMessage,
1150                            NULL),
1151     GNUNET_MQ_handler_end ()
1152   };
1153
1154   if (GNUNET_OK !=
1155       GNUNET_CONFIGURATION_get_value_number (c,
1156                                              "CADET",
1157                                              "MAX_ROUTES",
1158                                              &max_routes))
1159     max_routes = 5000;
1160   if (GNUNET_OK !=
1161       GNUNET_CONFIGURATION_get_value_number (c,
1162                                              "CADET",
1163                                              "MAX_MSGS_QUEUE",
1164                                              &max_buffers))
1165     max_buffers = 10000;
1166   routes = GNUNET_CONTAINER_multishortmap_create (1024,
1167                                                   GNUNET_NO);
1168   route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1169   core = GNUNET_CORE_connect (c,
1170                               NULL,
1171                               &core_init_cb,
1172                               &core_connect_cb,
1173                               &core_disconnect_cb,
1174                               handlers);
1175 }
1176
1177
1178 /**
1179  * Shut down the CORE subsystem.
1180  */
1181 void
1182 GCO_shutdown ()
1183 {
1184   if (NULL != core)
1185   {
1186     GNUNET_CORE_disconnect (core);
1187     core = NULL;
1188   }
1189   GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
1190   GNUNET_CONTAINER_multishortmap_destroy (routes);
1191   routes = NULL;
1192   GNUNET_CONTAINER_heap_destroy (route_heap);
1193   route_heap = NULL;
1194   if (NULL != timeout_task)
1195   {
1196     GNUNET_SCHEDULER_cancel (timeout_task);
1197     timeout_task = NULL;
1198   }
1199 }
1200
1201 /* end of gnunet-cadet-service_core.c */