2 This file is part of GNUnet.
3 Copyright (C) 2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file cadet/gnunet-service-cadet_core.c
23 * @brief cadet service; interaction with CORE service
24 * @author Bartlomiej Polot
25 * @author Christian Grothoff
27 * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
30 * - Optimization: given BROKEN messages, destroy paths (?)
33 #include "gnunet-service-cadet_core.h"
34 #include "gnunet-service-cadet_paths.h"
35 #include "gnunet-service-cadet_peer.h"
36 #include "gnunet-service-cadet_connection.h"
37 #include "gnunet-service-cadet_tunnels.h"
38 #include "gnunet_core_service.h"
39 #include "gnunet_statistics_service.h"
40 #include "cadet_protocol.h"
42 #define LOG(level, ...) GNUNET_log_from (level, "cadet-cor", __VA_ARGS__)
45 * Information we keep per direction for a route.
47 struct RouteDirection;
50 * Set of CadetRoutes that have exactly the same number of messages
51 * in their buffer. Used so we can efficiently find all of those
52 * routes that have the current maximum of messages in the buffer (in
53 * case we have to purge).
58 * Rung of RouteDirections with one more buffer entry each.
63 * Rung of RouteDirections with one less buffer entry each.
68 * DLL of route directions with a number of buffer entries matching this rung.
70 struct RouteDirection *rd_head;
73 * DLL of route directions with a number of buffer entries matching this rung.
75 struct RouteDirection *rd_tail;
78 * Total number of route directions in this rung.
80 unsigned int num_routes;
83 * Number of messages route directions at this rung have
86 unsigned int rung_off;
91 * Information we keep per direction for a route.
96 * DLL of other route directions within the same `struct Rung`.
98 struct RouteDirection *prev;
101 * DLL of other route directions within the same `struct Rung`.
103 struct RouteDirection *next;
106 * Rung of this route direction (matches length of the buffer DLL).
111 * Head of DLL of envelopes we have in the buffer for this direction.
113 struct GNUNET_MQ_Envelope *env_head;
116 * Tail of DLL of envelopes we have in the buffer for this direction.
118 struct GNUNET_MQ_Envelope *env_tail;
123 struct CadetPeer *hop;
126 * Route this direction is part of.
128 struct CadetRoute *my_route;
131 * Message queue manager for @e hop.
133 struct GCP_MessageQueueManager *mqm;
136 * Is @e mqm currently ready for transmission?
143 * Description of a segment of a `struct CadetConnection` at the
144 * intermediate peers. Routes are basically entries in a peer's
145 * routing table for forwarding traffic. At both endpoints, the
146 * routes are terminated by a `struct CadetConnection`, which knows
147 * the complete `struct CadetPath` that is formed by the individual
153 * Information about the next hop on this route.
155 struct RouteDirection next;
158 * Information about the previous hop on this route.
160 struct RouteDirection prev;
163 * Unique identifier for the connection that uses this route.
165 struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
168 * When was this route last in use?
170 struct GNUNET_TIME_Absolute last_use;
173 * Position of this route in the #route_heap.
175 struct GNUNET_CONTAINER_HeapNode *hn;
180 * Handle to the CORE service.
182 static struct GNUNET_CORE_Handle *core;
185 * Routes on which this peer is an intermediate.
187 static struct GNUNET_CONTAINER_MultiShortmap *routes;
190 * Heap of routes, MIN-sorted by last activity.
192 static struct GNUNET_CONTAINER_Heap *route_heap;
195 * Rung zero (always pointed to by #rung_head).
197 static struct Rung rung_zero;
200 * DLL of rungs, with the head always point to a rung of
201 * route directions with no messages in the queue.
203 static struct Rung *rung_head = &rung_zero;
206 * Tail of the #rung_head DLL.
208 static struct Rung *rung_tail = &rung_zero;
211 * Maximum number of concurrent routes this peer will support.
213 static unsigned long long max_routes;
216 * Maximum number of envelopes we will buffer at this peer.
218 static unsigned long long max_buffers;
221 * Current number of envelopes we have buffered at this peer.
223 static unsigned long long cur_buffers;
226 * Task to timeout routes.
228 static struct GNUNET_SCHEDULER_Task *timeout_task;
232 * Get the route corresponding to a hash.
234 * @param cid hash generated from the connection identifier
236 static struct CadetRoute *
237 get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
239 return GNUNET_CONTAINER_multishortmap_get (routes,
240 &cid->connection_of_tunnel);
245 * Lower the rung in which @a dir is by 1.
247 * @param dir direction to lower in rung.
250 lower_rung (struct RouteDirection *dir)
252 struct Rung *rung = dir->rung;
255 GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir);
257 GNUNET_assert (NULL != prev);
258 if (prev->rung_off != rung->rung_off - 1)
260 prev = GNUNET_new (struct Rung);
261 prev->rung_off = rung->rung_off - 1;
262 GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung->prev, prev);
264 GNUNET_assert (NULL != prev);
265 GNUNET_CONTAINER_DLL_insert (prev->rd_head, prev->rd_tail, dir);
271 * Discard the buffer @a env from the route direction @a dir and
272 * move @a dir down a rung.
274 * @param dir direction that contains the @a env in the buffer
275 * @param env envelope to discard
278 discard_buffer (struct RouteDirection *dir, struct GNUNET_MQ_Envelope *env)
280 GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env);
282 GNUNET_MQ_discard (env);
284 GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
289 * Discard all messages from the highest rung, to make space.
292 discard_all_from_rung_tail ()
294 struct Rung *tail = rung_tail;
295 struct RouteDirection *dir;
297 while (NULL != (dir = tail->rd_head))
299 LOG (GNUNET_ERROR_TYPE_DEBUG,
300 "Queue full due new message %s on connection %s, dropping old message\n",
301 GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
302 GNUNET_STATISTICS_update (stats,
303 "# messages dropped due to full buffer",
306 discard_buffer (dir, dir->env_head);
308 GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, tail);
314 * We message @a msg from @a prev. Find its route by @a cid and
315 * forward to the next hop. Drop and signal broken route if we do not
318 * @param prev previous hop (sender)
319 * @param cid connection identifier, tells us which route to use
320 * @param msg the message to forward
323 route_message (struct CadetPeer *prev,
324 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
325 const struct GNUNET_MessageHeader *msg,
326 const enum GNUNET_MQ_PriorityPreferences priority)
328 struct CadetRoute *route;
329 struct RouteDirection *dir;
332 struct GNUNET_MQ_Envelope *env;
334 route = get_route (cid);
337 struct GNUNET_MQ_Envelope *env;
338 struct GNUNET_CADET_ConnectionBrokenMessage *bm;
340 LOG (GNUNET_ERROR_TYPE_DEBUG,
341 "Failed to route message of type %u from %s on connection %s: no route\n",
344 GNUNET_sh2s (&cid->connection_of_tunnel));
345 switch (ntohs (msg->type))
347 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
348 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
349 /* No need to respond to these! */
352 env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
354 bm->peer1 = my_full_id;
355 GCP_send_ooo (prev, env);
358 route->last_use = GNUNET_TIME_absolute_get ();
359 GNUNET_CONTAINER_heap_update_cost (route->hn, route->last_use.abs_value_us);
360 dir = (prev == route->prev.hop) ? &route->next : &route->prev;
361 if (GNUNET_YES == dir->is_ready)
363 LOG (GNUNET_ERROR_TYPE_DEBUG,
364 "Routing message of type %u from %s to %s on connection %s\n",
367 GNUNET_i2s (GCP_get_id (dir->hop)),
368 GNUNET_sh2s (&cid->connection_of_tunnel));
369 dir->is_ready = GNUNET_NO;
370 GCP_send (dir->mqm, GNUNET_MQ_msg_copy (msg));
373 /* Check if low latency is required and if the previous message was
374 unreliable; if so, make sure we only queue one message per
375 direction (no buffering). */
376 if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) &&
377 (NULL != dir->env_head) &&
379 (GNUNET_MQ_env_get_options (dir->env_head) & GNUNET_MQ_PREF_UNRELIABLE)))
380 discard_buffer (dir, dir->env_head);
381 /* Check for duplicates */
382 for (const struct GNUNET_MQ_Envelope *env = dir->env_head; NULL != env;
383 env = GNUNET_MQ_env_next (env))
385 const struct GNUNET_MessageHeader *hdr = GNUNET_MQ_env_get_msg (env);
387 if ((hdr->size == msg->size) && (0 == memcmp (hdr, msg, ntohs (msg->size))))
389 LOG (GNUNET_ERROR_TYPE_DEBUG,
390 "Received duplicate of message already in buffer, dropping\n");
391 GNUNET_STATISTICS_update (stats,
392 "# messages dropped due to duplicate in buffer",
400 if (cur_buffers == max_buffers)
402 /* Need to make room. */
403 if (NULL != rung->next)
405 /* Easy case, drop messages from route directions in highest rung */
406 discard_all_from_rung_tail ();
410 /* We are in the highest rung, drop our own! */
411 LOG (GNUNET_ERROR_TYPE_DEBUG,
412 "Queue full due new message %s on connection %s, dropping old message\n",
413 GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
414 GNUNET_STATISTICS_update (stats,
415 "# messages dropped due to full buffer",
418 discard_buffer (dir, dir->env_head);
422 /* remove 'dir' from current rung */
423 GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir);
424 /* make 'nxt' point to the next higher rung, create if necessary */
426 if ((NULL == nxt) || (rung->rung_off + 1 != nxt->rung_off))
428 nxt = GNUNET_new (struct Rung);
429 nxt->rung_off = rung->rung_off + 1;
430 GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung, nxt);
432 /* insert 'dir' into next higher rung */
433 GNUNET_CONTAINER_DLL_insert (nxt->rd_head, nxt->rd_tail, dir);
436 /* add message into 'dir' buffer */
437 LOG (GNUNET_ERROR_TYPE_DEBUG,
438 "Queueing new message of type %u from %s to %s on connection %s\n",
441 GNUNET_i2s (GCP_get_id (dir->hop)),
442 GNUNET_sh2s (&cid->connection_of_tunnel));
443 env = GNUNET_MQ_msg_copy (msg);
444 GNUNET_MQ_env_set_options (env, priority);
445 if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) &&
446 (0 != (priority & GNUNET_MQ_PREF_OUT_OF_ORDER)) &&
447 (NULL != dir->env_head) &&
448 (0 == (GNUNET_MQ_env_get_options (dir->env_head)
449 & GNUNET_MQ_PREF_LOW_LATENCY)))
450 GNUNET_MQ_dll_insert_head (&dir->env_head, &dir->env_tail, env);
452 GNUNET_MQ_dll_insert_tail (&dir->env_head, &dir->env_tail, env);
454 GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
455 /* Clean up 'rung' if now empty (and not head) */
456 if ((NULL == rung->rd_head) && (rung != rung_head))
458 GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, rung);
465 * Check if the create_connection message has the appropriate size.
467 * @param cls Closure (unused).
468 * @param msg Message to check.
470 * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
473 check_connection_create (void *cls,
474 const struct GNUNET_CADET_ConnectionCreateMessage *msg)
476 uint16_t size = ntohs (msg->header.size) - sizeof(*msg);
478 if (0 != (size % sizeof(struct GNUNET_PeerIdentity)))
488 * Free internal data of a route direction.
490 * @param dir direction to destroy (do NOT free memory of 'dir' itself)
493 destroy_direction (struct RouteDirection *dir)
495 struct GNUNET_MQ_Envelope *env;
497 while (NULL != (env = dir->env_head))
499 GNUNET_STATISTICS_update (stats,
500 "# messages dropped due to route destruction",
503 discard_buffer (dir, env);
505 if (NULL != dir->mqm)
507 GCP_request_mq_cancel (dir->mqm, NULL);
510 GNUNET_CONTAINER_DLL_remove (rung_head->rd_head, rung_head->rd_tail, dir);
515 * Destroy our state for @a route.
517 * @param route route to destroy
520 destroy_route (struct CadetRoute *route)
522 LOG (GNUNET_ERROR_TYPE_DEBUG,
523 "Destroying route from %s to %s of connection %s\n",
524 GNUNET_i2s (GCP_get_id (route->prev.hop)),
525 GNUNET_i2s2 (GCP_get_id (route->next.hop)),
526 GNUNET_sh2s (&route->cid.connection_of_tunnel));
527 GNUNET_assert (route == GNUNET_CONTAINER_heap_remove_node (route->hn));
530 GNUNET_CONTAINER_multishortmap_remove (routes,
531 &route->cid.connection_of_tunnel,
533 GNUNET_STATISTICS_set (stats,
535 GNUNET_CONTAINER_multishortmap_size (routes),
537 destroy_direction (&route->prev);
538 destroy_direction (&route->next);
544 * Send message that a route is broken between @a peer1 and @a peer2.
546 * @param target where to send the message
547 * @param cid connection identifier to use
548 * @param peer1 one of the peers where a link is broken
549 * @param peer2 another one of the peers where a link is broken
552 send_broken (struct RouteDirection *target,
553 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
554 const struct GNUNET_PeerIdentity *peer1,
555 const struct GNUNET_PeerIdentity *peer2)
557 struct GNUNET_MQ_Envelope *env;
558 struct GNUNET_CADET_ConnectionBrokenMessage *bm;
560 if (NULL == target->mqm)
561 return; /* Can't send notification, connection is down! */
562 LOG (GNUNET_ERROR_TYPE_DEBUG,
563 "Notifying %s about BROKEN route at %s-%s of connection %s\n",
564 GCP_2s (target->hop),
567 GNUNET_sh2s (&cid->connection_of_tunnel));
569 env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
575 GCP_request_mq_cancel (target->mqm, env);
581 * Function called to check if any routes have timed out, and if
582 * so, to clean them up. Finally, schedules itself again at the
583 * earliest time where there might be more work.
588 timeout_cb (void *cls)
590 struct CadetRoute *r;
591 struct GNUNET_TIME_Relative linger;
592 struct GNUNET_TIME_Absolute exp;
595 linger = GNUNET_TIME_relative_multiply (keepalive_period, 3);
596 while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
598 exp = GNUNET_TIME_absolute_add (r->last_use, linger);
599 if (0 != GNUNET_TIME_absolute_get_remaining (exp).rel_value_us)
601 /* Route not yet timed out, wait until it does. */
602 timeout_task = GNUNET_SCHEDULER_add_at (exp, &timeout_cb, NULL);
605 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
606 "Sending BROKEN due to timeout (%s was last use, %s linger)\n",
607 GNUNET_STRINGS_absolute_time_to_string (r->last_use),
608 GNUNET_STRINGS_relative_time_to_string (linger, GNUNET_YES));
609 send_broken (&r->prev, &r->cid, NULL, NULL);
610 send_broken (&r->next, &r->cid, NULL, NULL);
613 /* No more routes left, so no need for a #timeout_task */
618 * Function called when the message queue to the previous hop
619 * becomes available/unavailable. We expect this function to
620 * be called immediately when we register, and then again
621 * later if the connection ever goes down.
623 * @param cls the `struct RouteDirection`
624 * @param available #GNUNET_YES if sending is now possible,
625 * #GNUNET_NO if sending is no longer possible
626 * #GNUNET_SYSERR if sending is no longer possible
627 * and the last envelope was discarded
630 dir_ready_cb (void *cls, int ready)
632 struct RouteDirection *dir = cls;
633 struct CadetRoute *route = dir->my_route;
634 struct RouteDirection *odir;
636 if (GNUNET_YES == ready)
638 struct GNUNET_MQ_Envelope *env;
640 dir->is_ready = GNUNET_YES;
641 if (NULL != (env = dir->env_head))
643 GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env);
645 GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
647 dir->is_ready = GNUNET_NO;
648 GCP_send (dir->mqm, env);
652 odir = (dir == &route->next) ? &route->prev : &route->next;
653 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending BROKEN due to MQ going down\n");
654 send_broken (&route->next, &route->cid, GCP_get_id (odir->hop), &my_full_id);
655 destroy_route (route);
660 * Initialize one of the directions of a route.
662 * @param route route the direction belongs to
663 * @param dir direction to initialize
664 * @param hop next hop on in the @a dir
667 dir_init (struct RouteDirection *dir,
668 struct CadetRoute *route,
669 struct CadetPeer *hop)
672 dir->my_route = route;
673 dir->mqm = GCP_request_mq (hop, &dir_ready_cb, dir);
674 GNUNET_CONTAINER_DLL_insert (rung_head->rd_head, rung_head->rd_tail, dir);
675 dir->rung = rung_head;
676 GNUNET_assert (GNUNET_YES == dir->is_ready);
681 * We could not create the desired route. Send a
682 * #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
683 * message to @a target.
685 * @param target who should receive the message
686 * @param cid identifier of the connection/route that failed
687 * @param failure_at neighbour with which we failed to route,
691 send_broken_without_mqm (
692 struct CadetPeer *target,
693 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
694 const struct GNUNET_PeerIdentity *failure_at)
696 struct GNUNET_MQ_Envelope *env;
697 struct GNUNET_CADET_ConnectionBrokenMessage *bm;
699 env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
701 bm->peer1 = my_full_id;
702 if (NULL != failure_at)
703 bm->peer2 = *failure_at;
704 GCP_send_ooo (target, env);
709 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
711 * @param cls Closure (CadetPeer for neighbor that sent the message).
712 * @param msg Message itself.
715 handle_connection_create (
717 const struct GNUNET_CADET_ConnectionCreateMessage *msg)
719 struct CadetPeer *sender = cls;
720 struct CadetPeer *next;
721 const struct GNUNET_PeerIdentity *pids =
722 (const struct GNUNET_PeerIdentity *) &msg[1];
723 struct CadetRoute *route;
724 uint16_t size = ntohs (msg->header.size) - sizeof(*msg);
725 unsigned int path_length;
728 path_length = size / sizeof(struct GNUNET_PeerIdentity);
729 if (0 == path_length)
731 LOG (GNUNET_ERROR_TYPE_DEBUG,
732 "Dropping CADET_CONNECTION_CREATE with empty path\n");
736 LOG (GNUNET_ERROR_TYPE_DEBUG,
737 "Handling CADET_CONNECTION_CREATE from %s for CID %s with %u hops\n",
739 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
741 /* Check for loops */
743 struct GNUNET_CONTAINER_MultiPeerMap *map;
745 map = GNUNET_CONTAINER_multipeermap_create (path_length * 2, GNUNET_YES);
746 GNUNET_assert (NULL != map);
747 for (unsigned int i = 0; i < path_length; i++)
749 LOG (GNUNET_ERROR_TYPE_DEBUG,
750 "CADET_CONNECTION_CREATE has peer %s at offset %u\n",
751 GNUNET_i2s (&pids[i]),
753 if (GNUNET_SYSERR == GNUNET_CONTAINER_multipeermap_put (
757 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
760 GNUNET_CONTAINER_multipeermap_destroy (map);
761 LOG (GNUNET_ERROR_TYPE_DEBUG,
762 "Dropping CADET_CONNECTION_CREATE with cyclic path\n");
767 GNUNET_CONTAINER_multipeermap_destroy (map);
769 /* Initiator is at offset 0, find us */
770 for (off = 1; off < path_length; off++)
771 if (0 == GNUNET_memcmp (&my_full_id, &pids[off]))
773 if (off == path_length)
775 LOG (GNUNET_ERROR_TYPE_DEBUG,
776 "Dropping CADET_CONNECTION_CREATE without us in the path\n");
780 /* Check previous hop */
781 if (sender != GCP_get (&pids[off - 1], GNUNET_NO))
783 LOG (GNUNET_ERROR_TYPE_DEBUG,
784 "Dropping CADET_CONNECTION_CREATE without sender at previous hop in the path\n");
788 if (NULL != (route = get_route (&msg->cid)))
790 /* Duplicate CREATE, pass it on, previous one might have been lost! */
792 LOG (GNUNET_ERROR_TYPE_DEBUG,
793 "Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
794 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
795 route_message (sender,
798 GNUNET_MQ_PRIO_CRITICAL_CONTROL
799 | GNUNET_MQ_PREF_LOW_LATENCY);
802 if (off == path_length - 1)
804 /* We are the destination, create connection */
805 struct CadetConnection *cc;
806 struct CadetPeerPath *path;
807 struct CadetPeer *origin;
809 cc = GCC_lookup (&msg->cid);
812 LOG (GNUNET_ERROR_TYPE_DEBUG,
813 "Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
814 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
815 GCC_handle_duplicate_create (cc);
819 origin = GCP_get (&pids[0], GNUNET_YES);
820 LOG (GNUNET_ERROR_TYPE_DEBUG,
821 "I am destination for CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
823 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
824 path = GCPP_get_path_from_route (path_length - 1, pids);
826 GCT_add_inbound_connection (GCP_get_tunnel (origin, GNUNET_YES),
830 /* Send back BROKEN: duplicate connection on the same path,
831 we will use the other one. */
832 LOG (GNUNET_ERROR_TYPE_DEBUG,
833 "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
835 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
837 send_broken_without_mqm (sender, &msg->cid, NULL);
842 /* We are merely a hop on the way, check if we can support the route */
843 next = GCP_get (&pids[off + 1], GNUNET_NO);
844 if ((NULL == next) || (GNUNET_NO == GCP_has_core_connection (next)))
846 /* unworkable, send back BROKEN notification */
847 LOG (GNUNET_ERROR_TYPE_DEBUG,
848 "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
850 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
851 GNUNET_i2s (&pids[off + 1]),
853 send_broken_without_mqm (sender, &msg->cid, &pids[off + 1]);
856 if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
858 LOG (GNUNET_ERROR_TYPE_DEBUG,
859 "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
861 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
862 send_broken_without_mqm (sender, &msg->cid, &pids[off - 1]);
866 /* Workable route, create routing entry */
867 LOG (GNUNET_ERROR_TYPE_DEBUG,
868 "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
870 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
871 GNUNET_i2s (&pids[off + 1]),
873 route = GNUNET_new (struct CadetRoute);
874 route->cid = msg->cid;
875 route->last_use = GNUNET_TIME_absolute_get ();
876 dir_init (&route->prev, route, sender);
877 dir_init (&route->next, route, next);
878 GNUNET_assert (GNUNET_OK ==
879 GNUNET_CONTAINER_multishortmap_put (
881 &route->cid.connection_of_tunnel,
883 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
884 GNUNET_STATISTICS_set (stats,
886 GNUNET_CONTAINER_multishortmap_size (routes),
888 route->hn = GNUNET_CONTAINER_heap_insert (route_heap,
890 route->last_use.abs_value_us);
891 if (NULL == timeout_task)
893 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
898 /* also pass CREATE message along to next hop */
899 route_message (sender,
902 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
907 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK
909 * @param cls Closure (CadetPeer for neighbor that sent the message).
910 * @param msg Message itself.
913 handle_connection_create_ack (
915 const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
917 struct CadetPeer *peer = cls;
918 struct CadetConnection *cc;
920 /* First, check if ACK belongs to a connection that ends here. */
921 cc = GCC_lookup (&msg->cid);
924 /* verify ACK came from the right direction */
926 struct CadetPeerPath *path = GCC_get_path (cc, &len);
928 if (peer != GCPP_get_peer_at_offset (path, 0))
930 /* received ACK from unexpected direction, ignore! */
934 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Received CONNECTION_CREATE_ACK for connection %s.\n",
936 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
937 GCC_handle_connection_create_ack (cc);
941 /* We're just an intermediary peer, route the message along its path */
945 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
950 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
952 * @param cls Closure (CadetPeer for neighbor that sent the message).
953 * @param msg Message itself.
954 * @deprecated duplicate logic with #handle_destroy(); dedup!
957 handle_connection_broken (
959 const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
961 struct CadetPeer *peer = cls;
962 struct CadetConnection *cc;
963 struct CadetRoute *route;
965 /* First, check if message belongs to a connection that ends here. */
966 cc = GCC_lookup (&msg->cid);
969 /* verify message came from the right direction */
971 struct CadetPeerPath *path = GCC_get_path (cc, &len);
973 if (peer != GCPP_get_peer_at_offset (path, 0))
975 /* received message from unexpected direction, ignore! */
979 LOG (GNUNET_ERROR_TYPE_DEBUG,
980 "Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
981 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
982 GCC_destroy_without_core (cc);
984 /* FIXME: also destroy the path up to the specified link! */
988 /* We're just an intermediary peer, route the message along its path */
992 GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL);
993 route = get_route (&msg->cid);
995 destroy_route (route);
996 /* FIXME: also destroy paths we MAY have up to the specified link! */
1001 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
1003 * @param cls Closure (CadetPeer for neighbor that sent the message).
1004 * @param msg Message itself.
1007 handle_connection_destroy (
1009 const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
1011 struct CadetPeer *peer = cls;
1012 struct CadetConnection *cc;
1013 struct CadetRoute *route;
1015 /* First, check if message belongs to a connection that ends here. */
1016 cc = GCC_lookup (&msg->cid);
1019 /* verify message came from the right direction */
1021 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1023 if (peer != GCPP_get_peer_at_offset (path, 0))
1025 /* received message from unexpected direction, ignore! */
1026 GNUNET_break_op (0);
1029 LOG (GNUNET_ERROR_TYPE_DEBUG,
1030 "Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
1031 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1033 GCC_destroy_without_core (cc);
1037 /* We're just an intermediary peer, route the message along its path */
1038 LOG (GNUNET_ERROR_TYPE_DEBUG,
1039 "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
1040 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1041 route_message (peer,
1044 GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL);
1045 route = get_route (&msg->cid);
1047 destroy_route (route);
1052 * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
1054 * @param cls Closure (CadetPeer for neighbor that sent the message).
1055 * @param msg Message itself.
1058 handle_tunnel_kx (void *cls,
1059 const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
1061 struct CadetPeer *peer = cls;
1062 struct CadetConnection *cc;
1064 /* First, check if message belongs to a connection that ends here. */
1065 LOG (GNUNET_ERROR_TYPE_DEBUG,
1066 "Routing KX with ephemeral %s on CID %s\n",
1067 GNUNET_e2s (&msg->ephemeral_key),
1068 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1071 cc = GCC_lookup (&msg->cid);
1074 /* verify message came from the right direction */
1076 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1078 if (peer != GCPP_get_peer_at_offset (path, 0))
1080 /* received message from unexpected direction, ignore! */
1081 GNUNET_break_op (0);
1084 GCC_handle_kx (cc, msg);
1088 /* We're just an intermediary peer, route the message along its path */
1089 route_message (peer,
1092 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
1097 * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH
1099 * @param cls Closure (CadetPeer for neighbor that sent the message).
1100 * @param msg Message itself.
1103 handle_tunnel_kx_auth (
1105 const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
1107 struct CadetPeer *peer = cls;
1108 struct CadetConnection *cc;
1110 /* First, check if message belongs to a connection that ends here. */
1111 cc = GCC_lookup (&msg->kx.cid);
1114 /* verify message came from the right direction */
1116 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1118 if (peer != GCPP_get_peer_at_offset (path, 0))
1120 /* received message from unexpected direction, ignore! */
1121 GNUNET_break_op (0);
1124 GCC_handle_kx_auth (cc, msg);
1128 /* We're just an intermediary peer, route the message along its path */
1129 route_message (peer,
1132 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
1137 * Check if the encrypted message has the appropriate size.
1139 * @param cls Closure (unused).
1140 * @param msg Message to check.
1142 * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
1145 check_tunnel_encrypted (void *cls,
1146 const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
1153 * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED.
1155 * @param cls Closure (CadetPeer for neighbor that sent the message).
1156 * @param msg Message itself.
1159 handle_tunnel_encrypted (void *cls,
1160 const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
1162 struct CadetPeer *peer = cls;
1163 struct CadetConnection *cc;
1165 /* First, check if message belongs to a connection that ends here. */
1166 cc = GCC_lookup (&msg->cid);
1169 /* verify message came from the right direction */
1171 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1173 if (peer != GCPP_get_peer_at_offset (path, 0))
1175 /* received message from unexpected direction, ignore! */
1176 GNUNET_break_op (0);
1179 GCC_handle_encrypted (cc, msg);
1182 /* We're just an intermediary peer, route the message along its path */
1183 route_message (peer, &msg->cid, &msg->header, GNUNET_MQ_PRIO_BEST_EFFORT);
1188 * Function called after #GNUNET_CORE_connect has succeeded (or failed
1189 * for good). Note that the private key of the peer is intentionally
1190 * not exposed here; if you need it, your process should try to read
1191 * the private key file directly (which should work if you are
1192 * authorized...). Implementations of this function must not call
1193 * #GNUNET_CORE_disconnect (other than by scheduling a new task to
1196 * @param cls closure
1197 * @param my_identity ID of this peer, NULL if we failed
1200 core_init_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
1202 if (NULL == my_identity)
1207 GNUNET_break (0 == GNUNET_memcmp (my_identity, &my_full_id));
1212 * Method called whenever a given peer connects.
1214 * @param cls closure
1215 * @param peer peer identity this notification is about
1218 core_connect_cb (void *cls,
1219 const struct GNUNET_PeerIdentity *peer,
1220 struct GNUNET_MQ_Handle *mq)
1222 struct CadetPeer *cp;
1224 LOG (GNUNET_ERROR_TYPE_DEBUG,
1225 "CORE connection to peer %s was established.\n",
1227 cp = GCP_get (peer, GNUNET_YES);
1228 GCP_set_mq (cp, mq);
1234 * Method called whenever a peer disconnects.
1236 * @param cls closure
1237 * @param peer peer identity this notification is about
1240 core_disconnect_cb (void *cls,
1241 const struct GNUNET_PeerIdentity *peer,
1244 struct CadetPeer *cp = peer_cls;
1246 LOG (GNUNET_ERROR_TYPE_DEBUG,
1247 "CORE connection to peer %s went down.\n",
1249 GCP_set_mq (cp, NULL);
1254 * Initialize the CORE subsystem.
1256 * @param c Configuration.
1259 GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
1261 struct GNUNET_MQ_MessageHandler handlers[] =
1262 { GNUNET_MQ_hd_var_size (connection_create,
1263 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
1264 struct GNUNET_CADET_ConnectionCreateMessage,
1266 GNUNET_MQ_hd_fixed_size (connection_create_ack,
1267 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
1268 struct GNUNET_CADET_ConnectionCreateAckMessage,
1270 GNUNET_MQ_hd_fixed_size (connection_broken,
1271 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1272 struct GNUNET_CADET_ConnectionBrokenMessage,
1274 GNUNET_MQ_hd_fixed_size (connection_destroy,
1275 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
1276 struct GNUNET_CADET_ConnectionDestroyMessage,
1278 GNUNET_MQ_hd_fixed_size (tunnel_kx,
1279 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
1280 struct GNUNET_CADET_TunnelKeyExchangeMessage,
1282 GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
1283 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
1284 struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
1286 GNUNET_MQ_hd_var_size (tunnel_encrypted,
1287 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
1288 struct GNUNET_CADET_TunnelEncryptedMessage,
1290 GNUNET_MQ_handler_end () };
1292 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c,
1297 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c,
1301 max_buffers = 10000;
1302 routes = GNUNET_CONTAINER_multishortmap_create (1024, GNUNET_NO);
1303 route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1304 core = GNUNET_CORE_connect (c,
1308 &core_disconnect_cb,
1314 * Shut down the CORE subsystem.
1321 GNUNET_CORE_disconnect (core);
1324 GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
1325 GNUNET_CONTAINER_multishortmap_destroy (routes);
1327 GNUNET_CONTAINER_heap_destroy (route_heap);
1329 if (NULL != timeout_task)
1331 GNUNET_SCHEDULER_cancel (timeout_task);
1332 timeout_task = NULL;
1337 /* end of gnunet-cadet-service_core.c */