2 This file is part of GNUnet.
3 Copyright (C) 2017 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file cadet/gnunet-service-cadet_core.c
23 * @brief cadet service; interaction with CORE service
24 * @author Bartlomiej Polot
25 * @author Christian Grothoff
27 * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
30 * - Optimization: given BROKEN messages, destroy paths (?)
33 #include "gnunet-service-cadet_core.h"
34 #include "gnunet-service-cadet_paths.h"
35 #include "gnunet-service-cadet_peer.h"
36 #include "gnunet-service-cadet_connection.h"
37 #include "gnunet-service-cadet_tunnels.h"
38 #include "gnunet_core_service.h"
39 #include "gnunet_statistics_service.h"
40 #include "cadet_protocol.h"
42 #define LOG(level, ...) GNUNET_log_from (level, "cadet-cor", __VA_ARGS__)
45 * Information we keep per direction for a route.
47 struct RouteDirection;
50 * Set of CadetRoutes that have exactly the same number of messages
51 * in their buffer. Used so we can efficiently find all of those
52 * routes that have the current maximum of messages in the buffer (in
53 * case we have to purge).
58 * Rung of RouteDirections with one more buffer entry each.
63 * Rung of RouteDirections with one less buffer entry each.
68 * DLL of route directions with a number of buffer entries matching this rung.
70 struct RouteDirection *rd_head;
73 * DLL of route directions with a number of buffer entries matching this rung.
75 struct RouteDirection *rd_tail;
78 * Total number of route directions in this rung.
80 unsigned int num_routes;
83 * Number of messages route directions at this rung have
86 unsigned int rung_off;
91 * Information we keep per direction for a route.
96 * DLL of other route directions within the same `struct Rung`.
98 struct RouteDirection *prev;
101 * DLL of other route directions within the same `struct Rung`.
103 struct RouteDirection *next;
106 * Rung of this route direction (matches length of the buffer DLL).
111 * Head of DLL of envelopes we have in the buffer for this direction.
113 struct GNUNET_MQ_Envelope *env_head;
116 * Tail of DLL of envelopes we have in the buffer for this direction.
118 struct GNUNET_MQ_Envelope *env_tail;
123 struct CadetPeer *hop;
126 * Route this direction is part of.
128 struct CadetRoute *my_route;
131 * Message queue manager for @e hop.
133 struct GCP_MessageQueueManager *mqm;
136 * Is @e mqm currently ready for transmission?
143 * Description of a segment of a `struct CadetConnection` at the
144 * intermediate peers. Routes are basically entries in a peer's
145 * routing table for forwarding traffic. At both endpoints, the
146 * routes are terminated by a `struct CadetConnection`, which knows
147 * the complete `struct CadetPath` that is formed by the individual
153 * Information about the next hop on this route.
155 struct RouteDirection next;
158 * Information about the previous hop on this route.
160 struct RouteDirection prev;
163 * Unique identifier for the connection that uses this route.
165 struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
168 * When was this route last in use?
170 struct GNUNET_TIME_Absolute last_use;
173 * Position of this route in the #route_heap.
175 struct GNUNET_CONTAINER_HeapNode *hn;
180 * Handle to the CORE service.
182 static struct GNUNET_CORE_Handle *core;
185 * Routes on which this peer is an intermediate.
187 static struct GNUNET_CONTAINER_MultiShortmap *routes;
190 * Heap of routes, MIN-sorted by last activity.
192 static struct GNUNET_CONTAINER_Heap *route_heap;
195 * Rung zero (always pointed to by #rung_head).
197 static struct Rung rung_zero;
200 * DLL of rungs, with the head always point to a rung of
201 * route directions with no messages in the queue.
203 static struct Rung *rung_head = &rung_zero;
206 * Tail of the #rung_head DLL.
208 static struct Rung *rung_tail = &rung_zero;
211 * Maximum number of concurrent routes this peer will support.
213 static unsigned long long max_routes;
216 * Maximum number of envelopes we will buffer at this peer.
218 static unsigned long long max_buffers;
221 * Current number of envelopes we have buffered at this peer.
223 static unsigned long long cur_buffers;
226 * Task to timeout routes.
228 static struct GNUNET_SCHEDULER_Task *timeout_task;
231 * Get the route corresponding to a hash.
233 * @param cid hash generated from the connection identifier
235 static struct CadetRoute *
236 get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
238 return GNUNET_CONTAINER_multishortmap_get (routes,
239 &cid->connection_of_tunnel);
244 * Lower the rung in which @a dir is by 1.
246 * @param dir direction to lower in rung.
249 lower_rung (struct RouteDirection *dir)
251 struct Rung *rung = dir->rung;
254 GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir);
256 GNUNET_assert (NULL != prev);
257 if (prev->rung_off != rung->rung_off - 1)
259 prev = GNUNET_new (struct Rung);
260 prev->rung_off = rung->rung_off - 1;
261 GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung->prev, prev);
263 GNUNET_assert (NULL != prev);
264 GNUNET_CONTAINER_DLL_insert (prev->rd_head, prev->rd_tail, dir);
270 * Discard the buffer @a env from the route direction @a dir and
271 * move @a dir down a rung.
273 * @param dir direction that contains the @a env in the buffer
274 * @param env envelope to discard
277 discard_buffer (struct RouteDirection *dir, struct GNUNET_MQ_Envelope *env)
279 GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env);
281 GNUNET_MQ_discard (env);
283 GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
288 * Discard all messages from the highest rung, to make space.
291 discard_all_from_rung_tail ()
293 struct Rung *tail = rung_tail;
294 struct RouteDirection *dir;
296 while (NULL != (dir = tail->rd_head))
298 LOG (GNUNET_ERROR_TYPE_DEBUG,
299 "Queue full due new message %s on connection %s, dropping old message\n",
300 GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
301 GNUNET_STATISTICS_update (stats,
302 "# messages dropped due to full buffer",
305 discard_buffer (dir, dir->env_head);
307 GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, tail);
313 * We message @a msg from @a prev. Find its route by @a cid and
314 * forward to the next hop. Drop and signal broken route if we do not
317 * @param prev previous hop (sender)
318 * @param cid connection identifier, tells us which route to use
319 * @param msg the message to forward
322 route_message (struct CadetPeer *prev,
323 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
324 const struct GNUNET_MessageHeader *msg,
325 const enum GNUNET_MQ_PriorityPreferences priority)
327 struct CadetRoute *route;
328 struct RouteDirection *dir;
331 struct GNUNET_MQ_Envelope *env;
333 route = get_route (cid);
336 struct GNUNET_MQ_Envelope *env;
337 struct GNUNET_CADET_ConnectionBrokenMessage *bm;
339 LOG (GNUNET_ERROR_TYPE_DEBUG,
340 "Failed to route message of type %u from %s on connection %s: no route\n",
343 GNUNET_sh2s (&cid->connection_of_tunnel));
344 switch (ntohs (msg->type))
346 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
347 case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
348 /* No need to respond to these! */
351 env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
353 bm->peer1 = my_full_id;
354 GCP_send_ooo (prev, env);
357 route->last_use = GNUNET_TIME_absolute_get ();
358 GNUNET_CONTAINER_heap_update_cost (route->hn, route->last_use.abs_value_us);
359 dir = (prev == route->prev.hop) ? &route->next : &route->prev;
360 if (GNUNET_YES == dir->is_ready)
362 LOG (GNUNET_ERROR_TYPE_DEBUG,
363 "Routing message of type %u from %s to %s on connection %s\n",
366 GNUNET_i2s (GCP_get_id (dir->hop)),
367 GNUNET_sh2s (&cid->connection_of_tunnel));
368 dir->is_ready = GNUNET_NO;
369 GCP_send (dir->mqm, GNUNET_MQ_msg_copy (msg));
372 /* Check if low latency is required and if the previous message was
373 unreliable; if so, make sure we only queue one message per
374 direction (no buffering). */
375 if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) &&
376 (NULL != dir->env_head) &&
378 (GNUNET_MQ_env_get_options (dir->env_head) & GNUNET_MQ_PREF_UNRELIABLE)))
379 discard_buffer (dir, dir->env_head);
380 /* Check for duplicates */
381 for (const struct GNUNET_MQ_Envelope *env = dir->env_head; NULL != env;
382 env = GNUNET_MQ_env_next (env))
384 const struct GNUNET_MessageHeader *hdr = GNUNET_MQ_env_get_msg (env);
386 if ((hdr->size == msg->size) && (0 == memcmp (hdr, msg, ntohs (msg->size))))
388 LOG (GNUNET_ERROR_TYPE_DEBUG,
389 "Received duplicate of message already in buffer, dropping\n");
390 GNUNET_STATISTICS_update (stats,
391 "# messages dropped due to duplicate in buffer",
399 if (cur_buffers == max_buffers)
401 /* Need to make room. */
402 if (NULL != rung->next)
404 /* Easy case, drop messages from route directions in highest rung */
405 discard_all_from_rung_tail ();
409 /* We are in the highest rung, drop our own! */
410 LOG (GNUNET_ERROR_TYPE_DEBUG,
411 "Queue full due new message %s on connection %s, dropping old message\n",
412 GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
413 GNUNET_STATISTICS_update (stats,
414 "# messages dropped due to full buffer",
417 discard_buffer (dir, dir->env_head);
421 /* remove 'dir' from current rung */
422 GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir);
423 /* make 'nxt' point to the next higher rung, create if necessary */
425 if ((NULL == nxt) || (rung->rung_off + 1 != nxt->rung_off))
427 nxt = GNUNET_new (struct Rung);
428 nxt->rung_off = rung->rung_off + 1;
429 GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung, nxt);
431 /* insert 'dir' into next higher rung */
432 GNUNET_CONTAINER_DLL_insert (nxt->rd_head, nxt->rd_tail, dir);
435 /* add message into 'dir' buffer */
436 LOG (GNUNET_ERROR_TYPE_DEBUG,
437 "Queueing new message of type %u from %s to %s on connection %s\n",
440 GNUNET_i2s (GCP_get_id (dir->hop)),
441 GNUNET_sh2s (&cid->connection_of_tunnel));
442 env = GNUNET_MQ_msg_copy (msg);
443 GNUNET_MQ_env_set_options (env, priority);
444 if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) &&
445 (0 != (priority & GNUNET_MQ_PREF_OUT_OF_ORDER)) &&
446 (NULL != dir->env_head) &&
447 (0 == (GNUNET_MQ_env_get_options (dir->env_head)
448 & GNUNET_MQ_PREF_LOW_LATENCY)))
449 GNUNET_MQ_dll_insert_head (&dir->env_head, &dir->env_tail, env);
451 GNUNET_MQ_dll_insert_tail (&dir->env_head, &dir->env_tail, env);
453 GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
454 /* Clean up 'rung' if now empty (and not head) */
455 if ((NULL == rung->rd_head) && (rung != rung_head))
457 GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, rung);
464 * Check if the create_connection message has the appropriate size.
466 * @param cls Closure (unused).
467 * @param msg Message to check.
469 * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
472 check_connection_create (void *cls,
473 const struct GNUNET_CADET_ConnectionCreateMessage *msg)
475 uint16_t size = ntohs (msg->header.size) - sizeof(*msg);
477 if (0 != (size % sizeof(struct GNUNET_PeerIdentity)))
487 * Free internal data of a route direction.
489 * @param dir direction to destroy (do NOT free memory of 'dir' itself)
492 destroy_direction (struct RouteDirection *dir)
494 struct GNUNET_MQ_Envelope *env;
496 while (NULL != (env = dir->env_head))
498 GNUNET_STATISTICS_update (stats,
499 "# messages dropped due to route destruction",
502 discard_buffer (dir, env);
504 if (NULL != dir->mqm)
506 GCP_request_mq_cancel (dir->mqm, NULL);
509 GNUNET_CONTAINER_DLL_remove (rung_head->rd_head, rung_head->rd_tail, dir);
514 * Destroy our state for @a route.
516 * @param route route to destroy
519 destroy_route (struct CadetRoute *route)
521 LOG (GNUNET_ERROR_TYPE_DEBUG,
522 "Destroying route from %s to %s of connection %s\n",
523 GNUNET_i2s (GCP_get_id (route->prev.hop)),
524 GNUNET_i2s2 (GCP_get_id (route->next.hop)),
525 GNUNET_sh2s (&route->cid.connection_of_tunnel));
526 GNUNET_assert (route == GNUNET_CONTAINER_heap_remove_node (route->hn));
529 GNUNET_CONTAINER_multishortmap_remove (routes,
530 &route->cid.connection_of_tunnel,
532 GNUNET_STATISTICS_set (stats,
534 GNUNET_CONTAINER_multishortmap_size (routes),
536 destroy_direction (&route->prev);
537 destroy_direction (&route->next);
543 * Send message that a route is broken between @a peer1 and @a peer2.
545 * @param target where to send the message
546 * @param cid connection identifier to use
547 * @param peer1 one of the peers where a link is broken
548 * @param peer2 another one of the peers where a link is broken
551 send_broken (struct RouteDirection *target,
552 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
553 const struct GNUNET_PeerIdentity *peer1,
554 const struct GNUNET_PeerIdentity *peer2)
556 struct GNUNET_MQ_Envelope *env;
557 struct GNUNET_CADET_ConnectionBrokenMessage *bm;
559 if (NULL == target->mqm)
560 return; /* Can't send notification, connection is down! */
561 LOG (GNUNET_ERROR_TYPE_DEBUG,
562 "Notifying %s about BROKEN route at %s-%s of connection %s\n",
563 GCP_2s (target->hop),
566 GNUNET_sh2s (&cid->connection_of_tunnel));
568 env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
574 GCP_request_mq_cancel (target->mqm, env);
580 * Function called to check if any routes have timed out, and if
581 * so, to clean them up. Finally, schedules itself again at the
582 * earliest time where there might be more work.
587 timeout_cb (void *cls)
589 struct CadetRoute *r;
590 struct GNUNET_TIME_Relative linger;
591 struct GNUNET_TIME_Absolute exp;
594 linger = GNUNET_TIME_relative_multiply (keepalive_period, 3);
595 while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
597 exp = GNUNET_TIME_absolute_add (r->last_use, linger);
598 if (0 != GNUNET_TIME_absolute_get_remaining (exp).rel_value_us)
600 /* Route not yet timed out, wait until it does. */
601 timeout_task = GNUNET_SCHEDULER_add_at (exp, &timeout_cb, NULL);
604 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
605 "Sending BROKEN due to timeout (%s was last use, %s linger)\n",
606 GNUNET_STRINGS_absolute_time_to_string (r->last_use),
607 GNUNET_STRINGS_relative_time_to_string (linger, GNUNET_YES));
608 send_broken (&r->prev, &r->cid, NULL, NULL);
609 send_broken (&r->next, &r->cid, NULL, NULL);
612 /* No more routes left, so no need for a #timeout_task */
617 * Function called when the message queue to the previous hop
618 * becomes available/unavailable. We expect this function to
619 * be called immediately when we register, and then again
620 * later if the connection ever goes down.
622 * @param cls the `struct RouteDirection`
623 * @param available #GNUNET_YES if sending is now possible,
624 * #GNUNET_NO if sending is no longer possible
625 * #GNUNET_SYSERR if sending is no longer possible
626 * and the last envelope was discarded
629 dir_ready_cb (void *cls, int ready)
631 struct RouteDirection *dir = cls;
632 struct CadetRoute *route = dir->my_route;
633 struct RouteDirection *odir;
635 if (GNUNET_YES == ready)
637 struct GNUNET_MQ_Envelope *env;
639 dir->is_ready = GNUNET_YES;
640 if (NULL != (env = dir->env_head))
642 GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env);
644 GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
646 dir->is_ready = GNUNET_NO;
647 GCP_send (dir->mqm, env);
651 odir = (dir == &route->next) ? &route->prev : &route->next;
652 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending BROKEN due to MQ going down\n");
653 send_broken (&route->next, &route->cid, GCP_get_id (odir->hop), &my_full_id);
654 destroy_route (route);
659 * Initialize one of the directions of a route.
661 * @param route route the direction belongs to
662 * @param dir direction to initialize
663 * @param hop next hop on in the @a dir
666 dir_init (struct RouteDirection *dir,
667 struct CadetRoute *route,
668 struct CadetPeer *hop)
671 dir->my_route = route;
672 dir->mqm = GCP_request_mq (hop, &dir_ready_cb, dir);
673 GNUNET_CONTAINER_DLL_insert (rung_head->rd_head, rung_head->rd_tail, dir);
674 dir->rung = rung_head;
675 GNUNET_assert (GNUNET_YES == dir->is_ready);
680 * We could not create the desired route. Send a
681 * #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
682 * message to @a target.
684 * @param target who should receive the message
685 * @param cid identifier of the connection/route that failed
686 * @param failure_at neighbour with which we failed to route,
690 send_broken_without_mqm (
691 struct CadetPeer *target,
692 const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
693 const struct GNUNET_PeerIdentity *failure_at)
695 struct GNUNET_MQ_Envelope *env;
696 struct GNUNET_CADET_ConnectionBrokenMessage *bm;
698 env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
700 bm->peer1 = my_full_id;
701 if (NULL != failure_at)
702 bm->peer2 = *failure_at;
703 GCP_send_ooo (target, env);
708 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
710 * @param cls Closure (CadetPeer for neighbor that sent the message).
711 * @param msg Message itself.
714 handle_connection_create (
716 const struct GNUNET_CADET_ConnectionCreateMessage *msg)
718 struct CadetPeer *sender = cls;
719 struct CadetPeer *next;
720 const struct GNUNET_PeerIdentity *pids =
721 (const struct GNUNET_PeerIdentity *) &msg[1];
722 struct CadetRoute *route;
723 uint16_t size = ntohs (msg->header.size) - sizeof(*msg);
724 unsigned int path_length;
726 struct CadetTunnel *t;
728 path_length = size / sizeof(struct GNUNET_PeerIdentity);
729 if (0 == path_length)
731 LOG (GNUNET_ERROR_TYPE_DEBUG,
732 "Dropping CADET_CONNECTION_CREATE with empty path\n");
736 LOG (GNUNET_ERROR_TYPE_DEBUG,
737 "Handling CADET_CONNECTION_CREATE from %s for CID %s with %u hops\n",
739 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
741 /* Check for loops */
743 struct GNUNET_CONTAINER_MultiPeerMap *map;
745 map = GNUNET_CONTAINER_multipeermap_create (path_length * 2, GNUNET_YES);
746 GNUNET_assert (NULL != map);
747 for (unsigned int i = 0; i < path_length; i++)
749 LOG (GNUNET_ERROR_TYPE_DEBUG,
750 "CADET_CONNECTION_CREATE has peer %s at offset %u\n",
751 GNUNET_i2s (&pids[i]),
753 if (GNUNET_SYSERR == GNUNET_CONTAINER_multipeermap_put (
757 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
760 GNUNET_CONTAINER_multipeermap_destroy (map);
761 LOG (GNUNET_ERROR_TYPE_DEBUG,
762 "Dropping CADET_CONNECTION_CREATE with cyclic path\n");
767 GNUNET_CONTAINER_multipeermap_destroy (map);
769 /* Initiator is at offset 0, find us */
770 for (off = 1; off < path_length; off++)
771 if (0 == GNUNET_memcmp (&my_full_id, &pids[off]))
773 if (off == path_length)
775 LOG (GNUNET_ERROR_TYPE_DEBUG,
776 "Dropping CADET_CONNECTION_CREATE without us in the path\n");
780 /* Check previous hop */
781 if (sender != GCP_get (&pids[off - 1], GNUNET_NO))
783 LOG (GNUNET_ERROR_TYPE_DEBUG,
784 "Dropping CADET_CONNECTION_CREATE without sender at previous hop in the path\n");
788 if (NULL != (route = get_route (&msg->cid)))
790 /* Duplicate CREATE, pass it on, previous one might have been lost! */
792 LOG (GNUNET_ERROR_TYPE_DEBUG,
793 "Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
794 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
795 route_message (sender,
798 GNUNET_MQ_PRIO_CRITICAL_CONTROL
799 | GNUNET_MQ_PREF_LOW_LATENCY);
802 if (off == path_length - 1)
804 /* We are the destination, create connection */
805 struct CadetConnection *cc;
806 struct CadetPeerPath *path;
807 struct CadetPeer *origin;
809 cc = GCC_lookup (&msg->cid);
812 LOG (GNUNET_ERROR_TYPE_DEBUG,
813 "Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
814 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
815 GCC_handle_duplicate_create (cc);
819 origin = GCP_get (&pids[0], GNUNET_YES);
820 LOG (GNUNET_ERROR_TYPE_DEBUG,
821 "I am destination for CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
823 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
824 path = GCPP_get_path_from_route (path_length - 1, pids);
825 t = GCP_get_tunnel (sender, GNUNET_YES);
827 // Check for CADET state in case the other side has lost the tunnel (xrs,t3ss)
828 if ((GNUNET_YES == msg->has_monotime) &&
829 (GNUNET_YES == GCP_check_and_update_monotime(origin, msg->monotime)) &&
830 ( GNUNET_OK == GCP_check_monotime_sig(origin, msg)) &&
831 (CADET_TUNNEL_KEY_OK == GCT_get_estate(t)))
833 GCT_change_estate (t, CADET_TUNNEL_KEY_UNINITIALIZED);
837 GCT_add_inbound_connection (t,
841 /* Send back BROKEN: duplicate connection on the same path,
842 we will use the other one. */
843 LOG (GNUNET_ERROR_TYPE_DEBUG,
844 "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
846 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
848 send_broken_without_mqm (sender, &msg->cid, NULL);
853 /* We are merely a hop on the way, check if we can support the route */
854 next = GCP_get (&pids[off + 1], GNUNET_NO);
855 if ((NULL == next) || (GNUNET_NO == GCP_has_core_connection (next)))
857 /* unworkable, send back BROKEN notification */
858 LOG (GNUNET_ERROR_TYPE_DEBUG,
859 "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
861 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
862 GNUNET_i2s (&pids[off + 1]),
864 send_broken_without_mqm (sender, &msg->cid, &pids[off + 1]);
867 if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
869 LOG (GNUNET_ERROR_TYPE_DEBUG,
870 "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
872 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
873 send_broken_without_mqm (sender, &msg->cid, &pids[off - 1]);
877 /* Workable route, create routing entry */
878 LOG (GNUNET_ERROR_TYPE_DEBUG,
879 "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
881 GNUNET_sh2s (&msg->cid.connection_of_tunnel),
882 GNUNET_i2s (&pids[off + 1]),
884 route = GNUNET_new (struct CadetRoute);
885 route->cid = msg->cid;
886 route->last_use = GNUNET_TIME_absolute_get ();
887 dir_init (&route->prev, route, sender);
888 dir_init (&route->next, route, next);
889 GNUNET_assert (GNUNET_OK ==
890 GNUNET_CONTAINER_multishortmap_put (
892 &route->cid.connection_of_tunnel,
894 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
895 GNUNET_STATISTICS_set (stats,
897 GNUNET_CONTAINER_multishortmap_size (routes),
899 route->hn = GNUNET_CONTAINER_heap_insert (route_heap,
901 route->last_use.abs_value_us);
902 if (NULL == timeout_task)
904 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
909 /* also pass CREATE message along to next hop */
910 route_message (sender,
913 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
918 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK
920 * @param cls Closure (CadetPeer for neighbor that sent the message).
921 * @param msg Message itself.
924 handle_connection_create_ack (
926 const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
928 struct CadetPeer *peer = cls;
929 struct CadetConnection *cc;
931 /* First, check if ACK belongs to a connection that ends here. */
932 cc = GCC_lookup (&msg->cid);
935 /* verify ACK came from the right direction */
937 struct CadetPeerPath *path = GCC_get_path (cc, &len);
939 if (peer != GCPP_get_peer_at_offset (path, 0))
941 /* received ACK from unexpected direction, ignore! */
945 LOG (GNUNET_ERROR_TYPE_DEBUG,
946 "Received CONNECTION_CREATE_ACK for connection %s.\n",
947 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
948 GCC_handle_connection_create_ack (cc);
952 /* We're just an intermediary peer, route the message along its path */
956 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
961 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
963 * @param cls Closure (CadetPeer for neighbor that sent the message).
964 * @param msg Message itself.
965 * @deprecated duplicate logic with #handle_destroy(); dedup!
968 handle_connection_broken (
970 const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
972 struct CadetPeer *peer = cls;
973 struct CadetConnection *cc;
974 struct CadetRoute *route;
976 /* First, check if message belongs to a connection that ends here. */
977 cc = GCC_lookup (&msg->cid);
980 /* verify message came from the right direction */
982 struct CadetPeerPath *path = GCC_get_path (cc, &len);
984 if (peer != GCPP_get_peer_at_offset (path, 0))
986 /* received message from unexpected direction, ignore! */
990 LOG (GNUNET_ERROR_TYPE_DEBUG,
991 "Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
992 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
993 GCC_destroy_without_core (cc);
995 /* FIXME: also destroy the path up to the specified link! */
999 /* We're just an intermediary peer, route the message along its path */
1000 route_message (peer,
1003 GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL);
1004 route = get_route (&msg->cid);
1006 destroy_route (route);
1007 /* FIXME: also destroy paths we MAY have up to the specified link! */
1012 * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
1014 * @param cls Closure (CadetPeer for neighbor that sent the message).
1015 * @param msg Message itself.
1018 handle_connection_destroy (
1020 const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
1022 struct CadetPeer *peer = cls;
1023 struct CadetConnection *cc;
1024 struct CadetRoute *route;
1026 /* First, check if message belongs to a connection that ends here. */
1027 cc = GCC_lookup (&msg->cid);
1030 /* verify message came from the right direction */
1032 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1034 if (peer != GCPP_get_peer_at_offset (path, 0))
1036 /* received message from unexpected direction, ignore! */
1037 GNUNET_break_op (0);
1040 LOG (GNUNET_ERROR_TYPE_DEBUG,
1041 "Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
1042 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1044 GCC_destroy_without_core (cc);
1048 /* We're just an intermediary peer, route the message along its path */
1049 LOG (GNUNET_ERROR_TYPE_DEBUG,
1050 "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
1051 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1052 route_message (peer,
1055 GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL);
1056 route = get_route (&msg->cid);
1058 destroy_route (route);
1063 * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
1065 * @param cls Closure (CadetPeer for neighbor that sent the message).
1066 * @param msg Message itself.
1069 handle_tunnel_kx (void *cls,
1070 const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
1072 struct CadetPeer *peer = cls;
1073 struct CadetConnection *cc;
1075 /* First, check if message belongs to a connection that ends here. */
1076 LOG (GNUNET_ERROR_TYPE_DEBUG,
1077 "Routing KX with ephemeral %s on CID %s\n",
1078 GNUNET_e2s (&msg->ephemeral_key),
1079 GNUNET_sh2s (&msg->cid.connection_of_tunnel));
1082 cc = GCC_lookup (&msg->cid);
1085 /* verify message came from the right direction */
1087 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1089 if (peer != GCPP_get_peer_at_offset (path, 0))
1091 /* received message from unexpected direction, ignore! */
1092 GNUNET_break_op (0);
1095 GCC_handle_kx (cc, msg);
1099 /* We're just an intermediary peer, route the message along its path */
1100 route_message (peer,
1103 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
1108 * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH
1110 * @param cls Closure (CadetPeer for neighbor that sent the message).
1111 * @param msg Message itself.
1114 handle_tunnel_kx_auth (
1116 const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
1118 struct CadetPeer *peer = cls;
1119 struct CadetConnection *cc;
1121 /* First, check if message belongs to a connection that ends here. */
1122 cc = GCC_lookup (&msg->kx.cid);
1125 /* verify message came from the right direction */
1127 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1129 if (peer != GCPP_get_peer_at_offset (path, 0))
1131 /* received message from unexpected direction, ignore! */
1132 GNUNET_break_op (0);
1135 GCC_handle_kx_auth (cc, msg);
1139 /* We're just an intermediary peer, route the message along its path */
1140 route_message (peer,
1143 GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
1148 * Check if the encrypted message has the appropriate size.
1150 * @param cls Closure (unused).
1151 * @param msg Message to check.
1153 * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
1156 check_tunnel_encrypted (void *cls,
1157 const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
1164 * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED.
1166 * @param cls Closure (CadetPeer for neighbor that sent the message).
1167 * @param msg Message itself.
1170 handle_tunnel_encrypted (void *cls,
1171 const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
1173 struct CadetPeer *peer = cls;
1174 struct CadetConnection *cc;
1176 /* First, check if message belongs to a connection that ends here. */
1177 cc = GCC_lookup (&msg->cid);
1180 /* verify message came from the right direction */
1182 struct CadetPeerPath *path = GCC_get_path (cc, &len);
1184 if (peer != GCPP_get_peer_at_offset (path, 0))
1186 /* received message from unexpected direction, ignore! */
1187 GNUNET_break_op (0);
1190 GCC_handle_encrypted (cc, msg);
1193 /* We're just an intermediary peer, route the message along its path */
1194 route_message (peer, &msg->cid, &msg->header, GNUNET_MQ_PRIO_BEST_EFFORT);
1199 * Function called after #GNUNET_CORE_connect has succeeded (or failed
1200 * for good). Note that the private key of the peer is intentionally
1201 * not exposed here; if you need it, your process should try to read
1202 * the private key file directly (which should work if you are
1203 * authorized...). Implementations of this function must not call
1204 * #GNUNET_CORE_disconnect (other than by scheduling a new task to
1207 * @param cls closure
1208 * @param my_identity ID of this peer, NULL if we failed
1211 core_init_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
1213 if (NULL == my_identity)
1218 GNUNET_break (0 == GNUNET_memcmp (my_identity, &my_full_id));
1223 * Method called whenever a given peer connects.
1225 * @param cls closure
1226 * @param peer peer identity this notification is about
1229 core_connect_cb (void *cls,
1230 const struct GNUNET_PeerIdentity *peer,
1231 struct GNUNET_MQ_Handle *mq)
1233 struct CadetPeer *cp;
1235 LOG (GNUNET_ERROR_TYPE_DEBUG,
1236 "CORE connection to peer %s was established.\n",
1238 cp = GCP_get (peer, GNUNET_YES);
1239 GCP_set_mq (cp, mq);
1245 * Method called whenever a peer disconnects.
1247 * @param cls closure
1248 * @param peer peer identity this notification is about
1251 core_disconnect_cb (void *cls,
1252 const struct GNUNET_PeerIdentity *peer,
1255 struct CadetPeer *cp = peer_cls;
1257 LOG (GNUNET_ERROR_TYPE_DEBUG,
1258 "CORE connection to peer %s went down.\n",
1260 GCP_set_mq (cp, NULL);
1265 * Initialize the CORE subsystem.
1267 * @param c Configuration.
1270 GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
1272 struct GNUNET_MQ_MessageHandler handlers[] =
1273 { GNUNET_MQ_hd_var_size (connection_create,
1274 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
1275 struct GNUNET_CADET_ConnectionCreateMessage,
1277 GNUNET_MQ_hd_fixed_size (connection_create_ack,
1278 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
1279 struct GNUNET_CADET_ConnectionCreateAckMessage,
1281 GNUNET_MQ_hd_fixed_size (connection_broken,
1282 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
1283 struct GNUNET_CADET_ConnectionBrokenMessage,
1285 GNUNET_MQ_hd_fixed_size (connection_destroy,
1286 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
1287 struct GNUNET_CADET_ConnectionDestroyMessage,
1289 GNUNET_MQ_hd_fixed_size (tunnel_kx,
1290 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
1291 struct GNUNET_CADET_TunnelKeyExchangeMessage,
1293 GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
1294 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
1295 struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
1297 GNUNET_MQ_hd_var_size (tunnel_encrypted,
1298 GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
1299 struct GNUNET_CADET_TunnelEncryptedMessage,
1301 GNUNET_MQ_handler_end () };
1303 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c,
1308 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c,
1312 max_buffers = 10000;
1313 routes = GNUNET_CONTAINER_multishortmap_create (1024, GNUNET_NO);
1314 route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1315 core = GNUNET_CORE_connect (c,
1319 &core_disconnect_cb,
1325 * Shut down the CORE subsystem.
1332 GNUNET_CORE_disconnect (core);
1335 GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
1336 GNUNET_CONTAINER_multishortmap_destroy (routes);
1338 GNUNET_CONTAINER_heap_destroy (route_heap);
1340 if (NULL != timeout_task)
1342 GNUNET_SCHEDULER_cancel (timeout_task);
1343 timeout_task = NULL;
1348 /* end of gnunet-cadet-service_core.c */