* @author Christian Grothoff
*
* All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
+ *
+ * TODO:
+ * - Optimization: given BROKEN messages, destroy paths (?)
*/
#include "platform.h"
#include "gnunet-service-cadet-new_core.h"
#include "gnunet-service-cadet-new_paths.h"
#include "gnunet-service-cadet-new_peer.h"
#include "gnunet-service-cadet-new_connection.h"
+#include "gnunet-service-cadet-new_tunnels.h"
#include "gnunet_core_service.h"
#include "cadet_protocol.h"
+#define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
+
+
+/**
+ * Number of messages we are willing to buffer per route.
+ */
+#define ROUTE_BUFFER_SIZE 8
+
+
+/**
+ * Information we keep per direction for a route.
+ */
+struct RouteDirection
+{
+ /**
+ * Target peer.
+ */
+ struct CadetPeer *hop;
+
+ /**
+ * Route this direction is part of.
+ */
+ struct CadetRoute *my_route;
+
+ /**
+ * Message queue manager for @e hop.
+ */
+ struct GCP_MessageQueueManager *mqm;
+
+ /**
+ * Cyclic message buffer to @e hop.
+ */
+ struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
+
+ /**
+ * Next write offset to use to append messages to @e out_buffer.
+ */
+ unsigned int out_wpos;
+
+ /**
+ * Next read offset to use to retrieve messages from @e out_buffer.
+ */
+ unsigned int out_rpos;
+
+ /**
+ * Is @e mqm currently ready for transmission?
+ */
+ int is_ready;
+
+};
+
+
/**
* Description of a segment of a `struct CadetConnection` at the
* intermediate peers. Routes are basically entries in a peer's
{
/**
- * Previous hop on this route.
+ * Information about the next hop on this route.
*/
- struct CadetPeer *prev_hop;
+ struct RouteDirection next;
/**
- * Next hop on this route.
+ * Information about the previous hop on this route.
*/
- struct CadetPeer *next_hop;
-
- /**
- * Message queue notifications for @e prev_hop.
- */
- struct GCP_MessageQueueManager *prev_mqm;
-
- /**
- * Message queue notifications for @e next_hop.
- */
- struct GCP_MessageQueueManager *next_mqm;
+ struct RouteDirection prev;
/**
* Unique identifier for the connection that uses this route.
const struct GNUNET_MessageHeader *msg)
{
struct CadetRoute *route;
+ struct RouteDirection *dir;
+ struct GNUNET_MQ_Envelope *env;
route = get_route (cid);
if (NULL == route)
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to route message of type %u from %s on connection %s: no route\n",
+ ntohs (msg->type),
+ GCP_2s (prev),
+ GNUNET_sh2s (&cid->connection_of_tunnel));
env = GNUNET_MQ_msg (bm,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
bm->cid = *cid;
bm->peer1 = my_full_id;
- GCP_send (prev,
- env);
+ GCP_send_ooo (prev,
+ env);
return;
}
- /* FIXME: support round-robin queue management here somewhere! */
- GCP_send ((prev == route->prev_hop) ? route->next_hop : route->prev_hop,
- GNUNET_MQ_msg_copy (msg));
+ dir = (prev == route->prev.hop) ? &route->next : &route->prev;
+ if (GNUNET_YES == dir->is_ready)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Routing message of type %u from %s to %s on connection %s\n",
+ ntohs (msg->type),
+ GCP_2s (prev),
+ GNUNET_i2s (GCP_get_id (dir->hop)),
+ GNUNET_sh2s (&cid->connection_of_tunnel));
+ dir->is_ready = GNUNET_NO;
+ GCP_send (dir->mqm,
+ GNUNET_MQ_msg_copy (msg));
+ return;
+ }
+ env = dir->out_buffer[dir->out_wpos];
+ if (NULL != env)
+ {
+ /* Queue full, drop earliest message in queue */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queue full due to new message of type %u from %s to %s on connection %s, dropping old message\n",
+ ntohs (msg->type),
+ GCP_2s (prev),
+ GNUNET_i2s (GCP_get_id (dir->hop)),
+ GNUNET_sh2s (&cid->connection_of_tunnel));
+ GNUNET_STATISTICS_update (stats,
+ "# messages dropped due to full buffer",
+ 1,
+ GNUNET_NO);
+ GNUNET_assert (dir->out_rpos == dir->out_wpos);
+ GNUNET_MQ_discard (env);
+ dir->out_rpos++;
+ if (ROUTE_BUFFER_SIZE == dir->out_rpos)
+ dir->out_rpos = 0;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing new message of type %u from %s to %s on connection %s\n",
+ ntohs (msg->type),
+ GCP_2s (prev),
+ GNUNET_i2s (GCP_get_id (dir->hop)),
+ GNUNET_sh2s (&cid->connection_of_tunnel));
+ env = GNUNET_MQ_msg_copy (msg);
+ dir->out_buffer[dir->out_wpos] = env;
+ dir->out_wpos++;
+ if (ROUTE_BUFFER_SIZE == dir->out_wpos)
+ dir->out_wpos = 0;
}
}
+/**
+ * Free internal data of a route direction.
+ *
+ * @param dir direction to destroy (do NOT free memory of 'dir' itself)
+ */
+static void
+destroy_direction (struct RouteDirection *dir)
+{
+ for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
+ if (NULL != dir->out_buffer[i])
+ {
+ GNUNET_MQ_discard (dir->out_buffer[i]);
+ dir->out_buffer[i] = NULL;
+ }
+ if (NULL != dir->mqm)
+ {
+ GCP_request_mq_cancel (dir->mqm,
+ NULL);
+ dir->mqm = NULL;
+ }
+}
+
+
/**
* Destroy our state for @a route.
*
static void
destroy_route (struct CadetRoute *route)
{
- GCP_request_mq_cancel (route->next_mqm);
- GCP_request_mq_cancel (route->prev_mqm);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying route from %s to %s of connection %s\n",
+ GNUNET_i2s (GCP_get_id (route->prev.hop)),
+ GNUNET_i2s2 (GCP_get_id (route->next.hop)),
+ GNUNET_sh2s (&route->cid.connection_of_tunnel));
+ destroy_direction (&route->prev);
+ destroy_direction (&route->next);
GNUNET_free (route);
}
+/**
+ * Send message that a route is broken between @a peer1 and @a peer2.
+ *
+ * @param target where to send the message
+ * @param cid connection identifier to use
+ * @param peer1 one of the peers where a link is broken
+ * @param peer2 another one of the peers where a link is broken
+ */
+static void
+send_broken (struct RouteDirection *target,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
+ const struct GNUNET_PeerIdentity *peer1,
+ const struct GNUNET_PeerIdentity *peer2)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+ if (NULL == target->mqm)
+ return; /* Can't send notification, connection is down! */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Notifying %s about BROKEN route at %s-%s of connection %s\n",
+ GCP_2s (target->hop),
+ GNUNET_i2s (peer1),
+ GNUNET_i2s2 (peer2),
+ GNUNET_sh2s (&cid->connection_of_tunnel));
+
+ env = GNUNET_MQ_msg (bm,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+ bm->cid = *cid;
+ if (NULL != peer1)
+ bm->peer1 = *peer1;
+ if (NULL != peer2)
+ bm->peer2 = *peer2;
+
+ GCP_request_mq_cancel (target->mqm,
+ env);
+ target->mqm = NULL;
+}
+
+
+/**
+ * Function called when the message queue to the previous hop
+ * becomes available/unavailable. We expect this function to
+ * be called immediately when we register, and then again
+ * later if the connection ever goes down.
+ *
+ * @param cls the `struct RouteDirection`
+ * @param available #GNUNET_YES if sending is now possible,
+ * #GNUNET_NO if sending is no longer possible
+ * #GNUNET_SYSERR if sending is no longer possible
+ * and the last envelope was discarded
+ */
+static void
+dir_ready_cb (void *cls,
+ int ready)
+{
+ struct RouteDirection *dir = cls;
+ struct CadetRoute *route = dir->my_route;
+ struct RouteDirection *odir;
+
+ if (GNUNET_YES == ready)
+ {
+ struct GNUNET_MQ_Envelope *env;
+
+ dir->is_ready = GNUNET_YES;
+ if (NULL != (env = dir->out_buffer[dir->out_rpos]))
+ {
+ dir->out_buffer[dir->out_rpos] = NULL;
+ dir->out_rpos++;
+ if (ROUTE_BUFFER_SIZE == dir->out_rpos)
+ dir->out_rpos = 0;
+ dir->is_ready = GNUNET_NO;
+ GCP_send (dir->mqm,
+ env);
+ }
+ return;
+ }
+ odir = (dir == &route->next) ? &route->prev : &route->next;
+ send_broken (&route->next,
+ &route->cid,
+ GCP_get_id (odir->hop),
+ &my_full_id);
+ destroy_route (route);
+}
+
+
+/**
+ * Initialize one of the directions of a route.
+ *
+ * @param route route the direction belongs to
+ * @param dir direction to initialize
+ * @param hop next hop on in the @a dir
+ */
+static void
+dir_init (struct RouteDirection *dir,
+ struct CadetRoute *route,
+ struct CadetPeer *hop)
+{
+ dir->hop = hop;
+ dir->my_route = route;
+ dir->mqm = GCP_request_mq (hop,
+ &dir_ready_cb,
+ dir);
+ GNUNET_assert (GNUNET_YES == dir->is_ready);
+}
+
+
/**
* Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
*
GNUNET_break_op (0);
return;
}
+ if (NULL !=
+ get_route (&msg->cid))
+ {
+ /* Duplicate CREATE, pass it on, previous one might have been lost! */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+ route_message (sender,
+ &msg->cid,
+ &msg->header);
+ return;
+ }
if (off == path_length - 1)
{
/* We are the destination, create connection */
+ struct CadetConnection *cc;
+ struct CadetPeerPath *path;
+ struct CadetPeer *origin;
+
+ cc = GNUNET_CONTAINER_multishortmap_get (connections,
+ &msg->cid.connection_of_tunnel);
+ if (NULL != cc)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+ GCC_handle_duplicate_create (cc);
+ return;
+ }
+
+ origin = GCP_get (&pids[0],
+ GNUNET_YES);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
+ GCP_2s (origin),
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+ path = GCPP_get_path_from_route (path_length - 1,
+ pids);
+ if (GNUNET_OK !=
+ GCT_add_inbound_connection (GCP_get_tunnel (origin,
+ GNUNET_YES),
+ &msg->cid,
+ path))
+ {
+ /* Send back BROKEN: duplicate connection on the same path,
+ we will use the other one. */
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
+ GCP_2s (sender),
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel),
+ GCPP_2s (path));
+ env = GNUNET_MQ_msg (bm,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+ bm->cid = msg->cid;
+ bm->peer1 = my_full_id;
+ GCP_send_ooo (sender,
+ env);
+ return;
+ }
return;
}
/* We are merely a hop on the way, check if we can support the route */
next = GCP_get (&pids[off + 1],
GNUNET_NO);
- if (NULL == next)
+ if ( (NULL == next) ||
+ (GNUNET_NO == GCP_has_core_connection (next)) )
{
- /* unworkable, send back BROKEN */
- GNUNET_break (0); // FIXME...
+ /* unworkable, send back BROKEN notification */
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
+ GCP_2s (sender),
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel),
+ GNUNET_i2s (&pids[off + 1]),
+ off + 1);
+ env = GNUNET_MQ_msg (bm,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+ bm->cid = msg->cid;
+ bm->peer1 = pids[off + 1];
+ bm->peer2 = my_full_id;
+ GCP_send_ooo (sender,
+ env);
return;
}
+ /* Workable route, create routing entry */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
+ GCP_2s (sender),
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel),
+ GNUNET_i2s (&pids[off + 1]),
+ off + 1);
route = GNUNET_new (struct CadetRoute);
-
-#if FIXME
- GCC_handle_create (peer,
- &msg->cid,
- path_length,
- route);
-#endif
+ route->cid = msg->cid;
+ dir_init (&route->prev,
+ route,
+ sender);
+ dir_init (&route->next,
+ route,
+ next);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multishortmap_put (routes,
+ &route->cid.connection_of_tunnel,
+ route,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
}
* @param msg Message itself.
*/
static void
-handle_connection_ack (void *cls,
- const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg)
+handle_connection_create_ack (void *cls,
+ const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
GNUNET_break_op (0);
return;
}
- GCC_handle_connection_ack (cc);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CONNECTION_CREATE_ACK for connection %s.\n",
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+ GCC_handle_connection_create_ack (cc);
return;
}
GNUNET_break_op (0);
return;
}
- GCC_destroy (cc);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+ GCC_destroy_without_core (cc);
/* FIXME: also destroy the path up to the specified link! */
return;
GNUNET_break_op (0);
return;
}
- GCC_destroy (cc);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel));
+
+ GCC_destroy_without_core (cc);
return;
}
/* We're just an intermediary peer, route the message along its path */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
+ GNUNET_sh2s (&msg->cid.connection_of_tunnel));
route = get_route (&msg->cid);
route_message (peer,
&msg->cid,
}
-/**
- * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_HOP_BY_HOP_ENCRYPTED_ACK.
- *
- * @param cls Closure (CadetPeer for neighbor that sent the message).
- * @param msg Message itself.
- */
-static void
-handle_hop_by_hop_encrypted_ack (void *cls,
- const struct GNUNET_CADET_ConnectionEncryptedAckMessage *msg)
-{
- struct CadetPeer *peer = cls;
-
-#if FIXME
- GCC_handle_poll (peer,
- msg);
-#endif
-}
-
-
-/**
- * Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED_POLL
- *
- * @param cls Closure (CadetPeer for neighbor that sent the message).
- * @param msg Message itself.
- */
-static void
-handle_poll (void *cls,
- const struct GNUNET_CADET_ConnectionHopByHopPollMessage *msg)
-{
- struct CadetPeer *peer = cls;
-
-#if FIXME
- GCC_handle_poll (peer,
- msg);
-#endif
-}
-
-
/**
* Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
*
msg);
return;
}
-
/* We're just an intermediary peer, route the message along its path */
route_message (peer,
&msg->cid,
{
struct CadetPeer *cp;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "CORE connection to peer %s was established.\n",
+ GNUNET_i2s (peer));
cp = GCP_get (peer,
GNUNET_YES);
GCP_set_mq (cp,
{
struct CadetPeer *cp = peer_cls;
- /* FIXME: also check all routes going via peer and
- send broken messages to the other direction! */
- GNUNET_break (0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "CORE connection to peer %s went down.\n",
+ GNUNET_i2s (peer));
GCP_set_mq (cp,
NULL);
}
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
struct GNUNET_CADET_ConnectionCreateMessage,
NULL),
- GNUNET_MQ_hd_fixed_size (connection_ack,
+ GNUNET_MQ_hd_fixed_size (connection_create_ack,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
- struct GNUNET_CADET_ConnectionCreateMessageAckMessage,
+ struct GNUNET_CADET_ConnectionCreateAckMessage,
NULL),
GNUNET_MQ_hd_fixed_size (connection_broken,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
struct GNUNET_CADET_ConnectionDestroyMessage,
NULL),
- GNUNET_MQ_hd_fixed_size (hop_by_hop_encrypted_ack,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_HOP_BY_HOP_ENCRYPTED_ACK,
- struct GNUNET_CADET_ConnectionEncryptedAckMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (poll,
- GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED_POLL,
- struct GNUNET_CADET_ConnectionHopByHopPollMessage,
- NULL),
GNUNET_MQ_hd_fixed_size (tunnel_kx,
GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
struct GNUNET_CADET_TunnelKeyExchangeMessage,