/**
* Entry in list of connections used by tunnel, with metadata.
*/
-struct CadetTConnection;
+struct CadetTConnection
+{
+ /**
+ * Next in DLL.
+ */
+ struct CadetTConnection *next;
+
+ /**
+ * Prev in DLL.
+ */
+ struct CadetTConnection *prev;
+
+ /**
+ * Connection handle.
+ */
+ struct CadetConnection *cc;
+
+ /**
+ * Tunnel this connection belongs to.
+ */
+ struct CadetTunnel *t;
+
+ /**
+ * Creation time, to keep oldest connection alive.
+ */
+ struct GNUNET_TIME_Absolute created;
+
+ /**
+ * Connection throughput, to keep fastest connection alive.
+ */
+ uint32_t throughput;
+
+ /**
+ * Is the connection currently ready for transmission?
+ */
+ int is_ready;
+};
+
/**
* Active path through the network (used by a tunnel). There may
* @author Christian Grothoff
*
* TODO:
- * - congestion control
- * - GCC_debug()
* - keepalive messages
- * - performance metrics
- * - back-off reset
+ * - keep performance metrics (?)
*/
#include "platform.h"
#include "gnunet-service-cadet-new_channel.h"
CADET_CONNECTION_SENT,
/**
- * Connection confirmed, ready to carry traffic.
+ * We are an inbound connection, and received a CREATE. Need to
+ * send an CREATE_ACK back.
*/
- CADET_CONNECTION_READY,
+ CADET_CONNECTION_CREATE_RECEIVED,
/**
- * Connection to be destroyed, just waiting to empty queues.
+ * Connection confirmed, ready to carry traffic.
*/
- CADET_CONNECTION_DESTROYED,
+ CADET_CONNECTION_READY
- /**
- * Connection to be destroyed because of a distant peer, same as DESTROYED.
- */
- CADET_CONNECTION_BROKEN
};
*/
struct GNUNET_MQ_Envelope *env;
- /**
- * Message queue to the first hop, or NULL if we have no connection yet.
- */
- struct GNUNET_MQ_Handle *mq;
-
/**
* Handle for calling #GCP_request_mq_cancel() once we are finished.
*/
/**
* Function to call once we are ready to transmit.
*/
- GNUNET_SCHEDULER_TaskCallback ready_cb;
+ GCC_ReadyCallback ready_cb;
/**
* Closure for @e ready_cb.
*/
unsigned int off;
-};
-
+ /**
+ * Are we ready to transmit via @e mq_man right now?
+ */
+ int mqm_ready;
-/**
- * Is the given connection currently ready for transmission?
- *
- * @param cc connection to transmit on
- * @return #GNUNET_YES if we could transmit
- */
-int
-GCC_is_ready (struct CadetConnection *cc)
-{
- return ( (NULL != cc->mq) &&
- (CADET_CONNECTION_READY == cc->state) &&
- (NULL == cc->env) ) ? GNUNET_YES : GNUNET_NO;
-}
+};
/**
void
GCC_destroy (struct CadetConnection *cc)
{
- if (NULL != cc->env)
- {
- if (NULL != cc->mq)
- GNUNET_MQ_send_cancel (cc->env);
- else
- GNUNET_MQ_discard (cc->env);
- cc->env = NULL;
- }
- if ( (NULL != cc->mq) &&
- (CADET_CONNECTION_SENDING_CREATE != cc->state) )
+ struct GNUNET_MQ_Envelope *env = NULL;
+
+ if (CADET_CONNECTION_SENDING_CREATE != cc->state)
{
- /* Need to notify next hop that we are down. */
- struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_ConnectionDestroyMessage *destroy_msg;
+ /* Need to notify next hop that we are down. */
env = GNUNET_MQ_msg (destroy_msg,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
destroy_msg->cid = cc->cid;
- GNUNET_MQ_send (cc->mq,
- env);
}
- cc->mq = NULL;
- GCP_request_mq_cancel (cc->mq_man);
+ GCP_request_mq_cancel (cc->mq_man,
+ env);
cc->mq_man = NULL;
GCPP_del_connection (cc->path,
cc->off,
void
GCC_handle_connection_ack (struct CadetConnection *cc)
{
- GNUNET_SCHEDULER_cancel (cc->task);
-#if FIXME
+ if (NULL != cc->task)
+ {
+ GNUNET_SCHEDULER_cancel (cc->task);
+ cc->task = NULL;
+ }
+#if FIXME_KEEPALIVE
cc->task = GNUNET_SCHEDULER_add_delayed (cc->keepalive_period,
&send_keepalive,
cc);
#endif
cc->state = CADET_CONNECTION_READY;
- cc->ready_cb (cc->ready_cb_cls);
+ if (GNUNET_YES == cc->mqm_ready)
+ cc->ready_cb (cc->ready_cb_cls,
+ GNUNET_YES);
}
GCC_handle_kx (struct CadetConnection *cc,
const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
{
+ if (CADET_CONNECTION_SENT == cc->state)
+ {
+ /* We didn't get the CREATE_ACK, but instead got payload. That's fine,
+ clearly something is working, so pretend we got an ACK. */
+ GCC_handle_connection_ack (cc);
+ }
GCT_handle_kx (cc->ct,
msg);
}
GCC_handle_encrypted (struct CadetConnection *cc,
const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
{
+ if (CADET_CONNECTION_SENT == cc->state)
+ {
+ /* We didn't get the CREATE_ACK, but instead got payload. That's fine,
+ clearly something is working, so pretend we got an ACK. */
+ GCC_handle_connection_ack (cc);
+ }
GCT_handle_encrypted (cc->ct,
msg);
}
* @param cls the `struct CadetConnection` to initiate
*/
static void
-send_create (void *cls);
-
-
-/**
- * We finished transmission of the create message, now wait for
- * ACK or retransmit.
- *
- * @param cls the `struct CadetConnection` that sent the create message
- */
-static void
-transmit_create_done_cb (void *cls)
+send_create (void *cls)
{
struct CadetConnection *cc = cls;
+ struct GNUNET_CADET_ConnectionCreateMessage *create_msg;
+ struct GNUNET_PeerIdentity *pids;
+ struct GNUNET_MQ_Envelope *env;
+ unsigned int path_length;
+ cc->task = NULL;
+ GNUNET_assert (GNUNET_YES == cc->mqm_ready);
+ path_length = GCPP_get_length (cc->path);
+ env = GNUNET_MQ_msg_extra (create_msg,
+ path_length * sizeof (struct GNUNET_PeerIdentity),
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
+ create_msg->cid = cc->cid;
+ pids = (struct GNUNET_PeerIdentity *) &create_msg[1];
+ for (unsigned int i=0;i<path_length;i++)
+ pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path,
+ i));
+ cc->env = env;
+ cc->mqm_ready = GNUNET_NO;
cc->state = CADET_CONNECTION_SENT;
- cc->env = NULL;
- /* FIXME: at some point, we need to reset the delay back to 0! */
- cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay);
- cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay,
- &send_create,
- cc);
+ GCP_send (cc->mq_man,
+ env);
}
/**
- * Send a CREATE message to the first hop.
+ * Send a CREATE_ACK message towards the origin.
*
* @param cls the `struct CadetConnection` to initiate
*/
static void
-send_create (void *cls)
+send_create_ack (void *cls)
{
struct CadetConnection *cc = cls;
struct GNUNET_CADET_ConnectionCreateMessage *create_msg;
unsigned int path_length;
cc->task = NULL;
- GNUNET_assert (NULL != cc->mq);
+ GNUNET_assert (GNUNET_YES == cc->mqm_ready);
path_length = GCPP_get_length (cc->path);
env = GNUNET_MQ_msg_extra (create_msg,
path_length * sizeof (struct GNUNET_PeerIdentity),
pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path,
i));
cc->env = env;
- GNUNET_MQ_notify_sent (env,
- &transmit_create_done_cb,
- cc);
- GNUNET_MQ_send (cc->mq,
- env);
+ cc->mqm_ready = GNUNET_NO;
+ cc->state = CADET_CONNECTION_READY;
+ GCP_send (cc->mq_man,
+ env);
+}
+
+
+/**
+ * We got a #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE for a
+ * connection that we already have. Either our ACK got lost
+ * or something is fishy. Consider retransmitting the ACK.
+ *
+ * @param cc connection that got the duplicate CREATE
+ */
+void
+GCC_handle_duplicate_create (struct CadetConnection *cc)
+{
+ if (GNUNET_YES == cc->mqm_ready)
+ {
+ /* Tell tunnel that we are not ready for transmission anymore
+ (until CREATE_ACK is done) */
+ cc->ready_cb (cc->ready_cb_cls,
+ GNUNET_NO);
+
+ /* Revert back to the state of having only received the 'CREATE',
+ and immediately proceed to send the CREATE_ACK. */
+ cc->state = CADET_CONNECTION_CREATE_RECEIVED;
+ cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
+ cc);
+ }
+ else
+ {
+ /* We are currently sending something else back, which
+ can only be an ACK or payload, either of which would
+ do. So actually no need to do anything. */
+ }
}
* peer at the first hop. Adjust accordingly.
*
* @param cls the `struct CadetConnection`
- * @param mq NULL if the CORE connection was lost, non-NULL if
- * it became available
+ * @param available #GNUNET_YES if sending is now possible,
+ * #GNUNET_NO if sending is no longer possible
+ * #GNUNET_SYSERR if sending is no longer possible
+ * and the last envelope was discarded
*/
static void
manage_first_hop_mq (void *cls,
- struct GNUNET_MQ_Handle *mq)
+ int available)
{
struct CadetConnection *cc = cls;
- if (NULL == mq)
+ if (GNUNET_YES != available)
{
/* Connection is down, for now... */
- cc->mq = NULL;
+ cc->mqm_ready = GNUNET_NO;
+ cc->state = CADET_CONNECTION_NEW;
+ cc->retry_delay = GNUNET_TIME_UNIT_ZERO;
if (NULL != cc->task)
{
GNUNET_SCHEDULER_cancel (cc->task);
cc->task = NULL;
}
+ cc->ready_cb (cc->ready_cb_cls,
+ GNUNET_NO);
return;
}
- cc->mq = mq;
- cc->state = CADET_CONNECTION_SENDING_CREATE;
-
- /* Now repeat sending connection creation messages
- down the path, until we get an ACK! */
- cc->task = GNUNET_SCHEDULER_add_now (&send_create,
- cc);
+ cc->mqm_ready = GNUNET_YES;
+ switch (cc->state)
+ {
+ case CADET_CONNECTION_NEW:
+ /* Transmit immediately */
+ cc->task = GNUNET_SCHEDULER_add_now (&send_create,
+ cc);
+ break;
+ case CADET_CONNECTION_SENDING_CREATE:
+ /* Should not be possible to be called in this state. */
+ GNUNET_assert (0);
+ break;
+ case CADET_CONNECTION_SENT:
+ /* Retry a bit later... */
+ cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay);
+ cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay,
+ &send_create,
+ cc);
+ break;
+ case CADET_CONNECTION_CREATE_RECEIVED:
+ /* We got the 'CREATE' (incoming connection), should send the CREATE_ACK */
+ cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
+ cc);
+ break;
+ case CADET_CONNECTION_READY:
+ cc->ready_cb (cc->ready_cb_cls,
+ GNUNET_YES);
+ break;
+ }
}
* @param destination where to go
* @param path which path to take (may not be the full path)
* @param ct which tunnel uses this connection
+ * @param init_state initial state for the connection
* @param ready_cb function to call when ready to transmit
* @param ready_cb_cls closure for @a cb
* @return handle to the connection
struct CadetPeerPath *path,
struct CadetTConnection *ct,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
- GNUNET_SCHEDULER_TaskCallback ready_cb,
+ enum CadetConnectionState init_state,
+ GCC_ReadyCallback ready_cb,
void *ready_cb_cls)
{
struct CadetConnection *cc;
destination);
GNUNET_assert (UINT_MAX > off);
cc = GNUNET_new (struct CadetConnection);
+ cc->state = init_state;
cc->ct = ct;
cc->cid = *cid;
GNUNET_assert (GNUNET_OK ==
struct CadetPeerPath *path,
struct CadetTConnection *ct,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
- GNUNET_SCHEDULER_TaskCallback ready_cb,
+ GCC_ReadyCallback ready_cb,
void *ready_cb_cls)
{
- struct CadetConnection *cc;
-
- cc = connection_create (destination,
- path,
- ct,
- cid,
- ready_cb,
- ready_cb_cls);
- /* FIXME: send CREATE_ACK? */
- return cc;
+ return connection_create (destination,
+ path,
+ ct,
+ cid,
+ CADET_CONNECTION_CREATE_RECEIVED,
+ ready_cb,
+ ready_cb_cls);
}
GCC_create (struct CadetPeer *destination,
struct CadetPeerPath *path,
struct CadetTConnection *ct,
- GNUNET_SCHEDULER_TaskCallback ready_cb,
+ GCC_ReadyCallback ready_cb,
void *ready_cb_cls)
{
struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
- struct CadetConnection *cc;
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&cid,
sizeof (cid));
- cc = connection_create (destination,
- path,
- ct,
- &cid,
- ready_cb,
- ready_cb_cls);
- /* FIXME: send CREATE? */
- return cc;
-}
-
-
-/**
- * We finished transmission of a message, if we are still ready, tell
- * the tunnel!
- *
- * @param cls our `struct CadetConnection`
- */
-static void
-transmit_done_cb (void *cls)
-{
- struct CadetConnection *cc = cls;
-
- cc->env = NULL;
- if ( (NULL != cc->mq) &&
- (CADET_CONNECTION_READY == cc->state) )
- cc->ready_cb (cc->ready_cb_cls);
+ return connection_create (destination,
+ path,
+ ct,
+ &cid,
+ CADET_CONNECTION_NEW,
+ ready_cb,
+ ready_cb_cls);
}
* connection is right now ready for transmission.
*
* @param cc connection identification
- * @param env envelope with message to transmit
+ * @param env envelope with message to transmit; must NOT
+ * yet have a #GNUNET_MQ_notify_sent() callback attached to it
*/
void
GCC_transmit (struct CadetConnection *cc,
struct GNUNET_MQ_Envelope *env)
{
- GNUNET_assert (NULL == cc->env);
- cc->env = env;
- GNUNET_MQ_notify_sent (env,
- &transmit_done_cb,
- cc);
- if ( (NULL != cc->mq) &&
- (CADET_CONNECTION_READY == cc->state) )
- GNUNET_MQ_send (cc->mq,
- env);
+ GNUNET_assert (GNUNET_YES == cc->mqm_ready);
+ GNUNET_assert (CADET_CONNECTION_READY == cc->state);
+ cc->mqm_ready = GNUNET_NO;
+ GCP_send (cc->mq_man,
+ env);
}
}
+/**
+ * Get a (static) string for a connection.
+ *
+ * @param cc Connection.
+ */
+const char *
+GCC_2s (const struct CadetConnection *cc)
+{
+ static char buf[128];
+
+ if (NULL == cc)
+ return "Connection(NULL)";
+
+ if (NULL != cc->ct)
+ {
+ GNUNET_snprintf (buf,
+ sizeof (buf),
+ "Connection(%s(Tunnel(%s)))",
+ GNUNET_sh2s (&cc->cid.connection_of_tunnel),
+ GCT_2s (cc->ct->t));
+ return buf;
+ }
+ GNUNET_snprintf (buf,
+ sizeof (buf),
+ "Connection(%s(Tunnel(NULL)))",
+ GNUNET_sh2s (&cc->cid.connection_of_tunnel));
+ return buf;
+}
+
+
+#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
+
+
/**
* Log connection info.
*
GCC_debug (struct CadetConnection *cc,
enum GNUNET_ErrorType level)
{
- GNUNET_break (0); // FIXME: implement...
+ int do_log;
+ char *s;
+
+ do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
+ "cadet-con",
+ __FILE__, __FUNCTION__, __LINE__);
+ if (0 == do_log)
+ return;
+ if (NULL == cc)
+ {
+ LOG2 (level,
+ "Connection (NULL)\n");
+ return;
+ }
+ s = GCPP_2s (cc->path);
+ LOG2 (level,
+ "Connection %s to %s via path %s in state %d is %s\n",
+ GCC_2s (cc),
+ GCP_2s (cc->destination),
+ s,
+ cc->state,
+ (GNUNET_YES == cc->mqm_ready) ? "ready" : "busy");
+ GNUNET_free (s);
}
/* end of gnunet-service-cadet-new_connection.c */
#include "gnunet-service-cadet-new_peer.h"
#include "cadet_protocol.h"
+
/**
- * Is the given connection currently ready for transmission?
+ * Function called to notify tunnel about change in our readyness.
*
- * @param cc connection to transmit on
- * @return #GNUNET_YES if we could transmit
+ * @param cls closure
+ * @param is_ready #GNUNET_YES if the connection is now ready for transmission,
+ * #GNUNET_NO if the connection is no longer ready for transmission
*/
-int
-GCC_is_ready (struct CadetConnection *cc);
+typedef void
+(*GCC_ReadyCallback)(void *cls,
+ int is_ready);
/**
GCC_create (struct CadetPeer *destination,
struct CadetPeerPath *path,
struct CadetTConnection *ct,
- GNUNET_SCHEDULER_TaskCallback ready_cb,
+ GCC_ReadyCallback ready_cb,
void *ready_cb_cls);
struct CadetPeerPath *path,
struct CadetTConnection *ct,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
- GNUNET_SCHEDULER_TaskCallback ready_cb,
+ GCC_ReadyCallback ready_cb,
void *ready_cb_cls);
GCC_get_id (struct CadetConnection *cc);
+/**
+ * Get a (static) string for a connection.
+ *
+ * @param cc Connection.
+ */
+const char *
+GCC_2s (const struct CadetConnection *cc);
+
+
/**
* Log connection info.
*
* @author Christian Grothoff
*
* All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
+ *
+ * TODO:
+ * - pass encrypted ACK to connection (!)
+ * - given BROKEN messages, destroy paths (?)
+ * -
+ * - handle POLL (if needed)
*/
#include "platform.h"
#include "gnunet-service-cadet-new_core.h"
#include "gnunet_core_service.h"
#include "cadet_protocol.h"
+/**
+ * Number of messages we are willing to buffer per route.
+ */
+#define ROUTE_BUFFER_SIZE 8
+
+
+/**
+ * Information we keep per direction for a route.
+ */
+struct RouteDirection
+{
+ /**
+ * Target peer.
+ */
+ struct CadetPeer *hop;
+
+ /**
+ * Route this direction is part of.
+ */
+ struct CadetRoute *my_route;
+
+ /**
+ * Message queue manager for @e hop.
+ */
+ struct GCP_MessageQueueManager *mqm;
+
+ /**
+ * Cyclic message buffer to @e hop.
+ */
+ struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
+
+ /**
+ * Next write offset to use to append messages to @e out_buffer.
+ */
+ unsigned int out_wpos;
+
+ /**
+ * Next read offset to use to retrieve messages from @e out_buffer.
+ */
+ unsigned int out_rpos;
+
+ /**
+ * Is @e mqm currently ready for transmission?
+ */
+ int is_ready;
+
+};
+
/**
* Description of a segment of a `struct CadetConnection` at the
{
/**
- * Previous hop on this route.
+ * Information about the next hop on this route.
*/
- struct CadetPeer *prev_hop;
+ struct RouteDirection next;
/**
- * Next hop on this route.
+ * Information about the previous hop on this route.
*/
- struct CadetPeer *next_hop;
-
- /**
- * Message queue notifications for @e prev_hop.
- */
- struct GCP_MessageQueueManager *prev_mqm;
-
- /**
- * Message queue notifications for @e next_hop.
- */
- struct GCP_MessageQueueManager *next_mqm;
+ struct RouteDirection prev;
/**
* Unique identifier for the connection that uses this route.
*/
struct GNUNET_TIME_Absolute last_use;
- /**
- * Counter, used to verify that both MQs are up when the route is
- * initialized.
- */
- unsigned int up;
-
};
const struct GNUNET_MessageHeader *msg)
{
struct CadetRoute *route;
+ struct RouteDirection *dir;
+ struct GNUNET_MQ_Envelope *env;
route = get_route (cid);
if (NULL == route)
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
bm->cid = *cid;
bm->peer1 = my_full_id;
- GCP_send (prev,
- env);
+ GCP_send_ooo (prev,
+ env);
return;
}
- /* FIXME: support round-robin queue management here somewhere! */
- GCP_send ((prev == route->prev_hop) ? route->next_hop : route->prev_hop,
- GNUNET_MQ_msg_copy (msg));
+ dir = (prev == route->prev.hop) ? &route->next : &route->prev;
+ if (GNUNET_YES == dir->is_ready)
+ {
+ dir->is_ready = GNUNET_NO;
+ GCP_send (dir->mqm,
+ GNUNET_MQ_msg_copy (msg));
+ return;
+ }
+ env = dir->out_buffer[dir->out_wpos];
+ if (NULL != env)
+ {
+ /* Queue full, drop earliest message in queue */
+ GNUNET_assert (dir->out_rpos == dir->out_wpos);
+ GNUNET_MQ_discard (env);
+ dir->out_rpos++;
+ if (ROUTE_BUFFER_SIZE == dir->out_rpos)
+ dir->out_rpos = 0;
+ }
+ env = GNUNET_MQ_msg_copy (msg);
+ dir->out_buffer[dir->out_wpos] = env;
+ dir->out_wpos++;
+ if (ROUTE_BUFFER_SIZE == dir->out_wpos)
+ dir->out_wpos = 0;
}
}
+/**
+ * Free internal data of a route direction.
+ *
+ * @param dir direction to destroy (do NOT free memory of 'dir' itself)
+ */
+static void
+destroy_direction (struct RouteDirection *dir)
+{
+ for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
+ if (NULL != dir->out_buffer[i])
+ {
+ GNUNET_MQ_discard (dir->out_buffer[i]);
+ dir->out_buffer[i] = NULL;
+ }
+ if (NULL != dir->mqm)
+ {
+ GCP_request_mq_cancel (dir->mqm,
+ NULL);
+ dir->mqm = NULL;
+ }
+}
+
+
/**
* Destroy our state for @a route.
*
static void
destroy_route (struct CadetRoute *route)
{
- GCP_request_mq_cancel (route->next_mqm);
- GCP_request_mq_cancel (route->prev_mqm);
+ destroy_direction (&route->prev);
+ destroy_direction (&route->next);
GNUNET_free (route);
}
* @param peer2 another one of the peers where a link is broken
*/
static void
-send_broken (struct CadetPeer *target,
+send_broken (struct RouteDirection *target,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
const struct GNUNET_PeerIdentity *peer1,
const struct GNUNET_PeerIdentity *peer2)
bm->peer1 = *peer1;
if (NULL != peer2)
bm->peer2 = *peer2;
- GCP_send (target,
- env);
+ GCP_request_mq_cancel (target->mqm,
+ env);
+ target->mqm = NULL;
}
* be called immediately when we register, and then again
* later if the connection ever goes down.
*
- * @param cls the `struct CadetRoute`
- * @param mq the message queue, NULL if connection went down
+ * @param cls the `struct RouteDirection`
+ * @param available #GNUNET_YES if sending is now possible,
+ * #GNUNET_NO if sending is no longer possible
+ * #GNUNET_SYSERR if sending is no longer possible
+ * and the last envelope was discarded
*/
static void
-mqm_cr_destroy_prev (void *cls,
- struct GNUNET_MQ_Handle *mq)
+dir_ready_cb (void *cls,
+ int ready)
{
- struct CadetRoute *route = cls;
+ struct RouteDirection *dir = cls;
+ struct CadetRoute *route = dir->my_route;
+ struct RouteDirection *odir;
- if (NULL != mq)
+ if (GNUNET_YES == ready)
{
- route->up |= 1;
+ struct GNUNET_MQ_Envelope *env;
+
+ dir->is_ready = GNUNET_YES;
+ if (NULL != (env = dir->out_buffer[dir->out_rpos]))
+ {
+ dir->out_buffer[dir->out_rpos] = NULL;
+ dir->out_rpos++;
+ if (ROUTE_BUFFER_SIZE == dir->out_rpos)
+ dir->out_rpos = 0;
+ dir->is_ready = GNUNET_NO;
+ GCP_send (dir->mqm,
+ env);
+ }
return;
}
- send_broken (route->next_hop,
+ odir = (dir == &route->next) ? &route->prev : &route->next;
+ send_broken (&route->next,
&route->cid,
- GCP_get_id (route->prev_hop),
+ GCP_get_id (odir->hop),
&my_full_id);
destroy_route (route);
}
/**
- * Function called when the message queue to the previous hop
- * becomes available/unavailable. We expect this function to
- * be called immediately when we register, and then again
- * later if the connection ever goes down.
+ * Initialize one of the directions of a route.
*
- * @param cls the `struct CadetRoute`
- * @param mq the message queue, NULL if connection went down
+ * @param route route the direction belongs to
+ * @param dir direction to initialize
+ * @param hop next hop on in the @a dir
*/
static void
-mqm_cr_destroy_next (void *cls,
- struct GNUNET_MQ_Handle *mq)
+dir_init (struct RouteDirection *dir,
+ struct CadetRoute *route,
+ struct CadetPeer *hop)
{
- struct CadetRoute *route = cls;
-
- if (NULL != mq)
- {
- route->up |= 2;
- return;
- }
- send_broken (route->prev_hop,
- &route->cid,
- GCP_get_id (route->next_hop),
- &my_full_id);
- destroy_route (route);
+ dir->hop = hop;
+ dir->my_route = route;
+ dir->mqm = GCP_request_mq (hop,
+ &dir_ready_cb,
+ dir);
+ GNUNET_assert (GNUNET_YES == dir->is_ready);
}
if (NULL !=
get_route (&msg->cid))
{
- /* CID not chosen at random, collides */
- GNUNET_break_op (0);
+ /* Duplicate CREATE, pass it on, previous one might have been lost! */
+ route_message (sender,
+ &msg->cid,
+ &msg->header);
return;
}
if (off == path_length - 1)
{
/* We are the destination, create connection */
+ struct CadetConnection *cc;
struct CadetPeerPath *path;
struct CadetPeer *origin;
+ cc = GNUNET_CONTAINER_multishortmap_get (connections,
+ &msg->cid.connection_of_tunnel);
+ if (NULL != cc)
+ {
+ /* Duplicate CREATE, likely our ACK got lost, retransmit the ACK! */
+ GNUNET_break (0); // FIXME: not implemented!
+ return;
+ }
+
path = GCPP_get_path_from_route (path_length,
pids);
origin = GCP_get (&pids[0],
GCT_add_inbound_connection (GCT_create_tunnel (origin),
&msg->cid,
path);
-
return;
}
/* We are merely a hop on the way, check if we can support the route */
next = GCP_get (&pids[off + 1],
GNUNET_NO);
if ( (NULL == next) ||
- (NULL == GCP_get_mq (next)) )
+ (GNUNET_NO == GCP_has_core_connection (next)) )
{
/* unworkable, send back BROKEN notification */
- send_broken (sender,
- &msg->cid,
- &pids[off + 1],
- &my_full_id);
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+ env = GNUNET_MQ_msg (bm,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+ bm->cid = msg->cid;
+ bm->peer1 = pids[off + 1];
+ bm->peer2 = my_full_id;
+ GCP_send_ooo (sender,
+ env);
return;
}
/* Workable route, create routing entry */
route = GNUNET_new (struct CadetRoute);
route->cid = msg->cid;
- route->prev_mqm = GCP_request_mq (sender,
- &mqm_cr_destroy_prev,
- route);
- route->next_mqm = GCP_request_mq (next,
- &mqm_cr_destroy_next,
- route);
- route->prev_hop = sender;
- route->next_hop = next;
- GNUNET_assert ((1|2) == route->up);
+ dir_init (&route->prev,
+ route,
+ sender);
+ dir_init (&route->next,
+ route,
+ next);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multishortmap_put (routes,
&route->cid.connection_of_tunnel,
* @param msg Message itself.
*/
static void
-handle_connection_ack (void *cls,
- const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg)
+handle_connection_create_ack (void *cls,
+ const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
struct GNUNET_CADET_ConnectionCreateMessage,
NULL),
- GNUNET_MQ_hd_fixed_size (connection_ack,
+ GNUNET_MQ_hd_fixed_size (connection_create_ack,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
struct GNUNET_CADET_ConnectionCreateMessageAckMessage,
NULL),
}
+/**
+ * Convert a path to a human-readable string.
+ *
+ * @param path path to convert
+ * @return string, to be freed by caller (unlike other *_2s APIs!)
+ */
+char *
+GCPP_2s (struct CadetPeerPath *path)
+{
+ char *s;
+ char *old;
+
+ old = GNUNET_strdup ("");
+ for (unsigned int i = 0;
+ i < path->entries_length;
+ i++)
+ {
+ GNUNET_asprintf (&s,
+ "%s %s",
+ old,
+ GCP_2s (GCPP_get_peer_at_offset (path,
+ i)));
+ GNUNET_free_non_null (old);
+ old = s;
+ }
+ return old;
+}
+
+
/* end of gnunet-service-cadet-new_paths.c */
unsigned int off);
+/**
+ * Convert a path to a human-readable string.
+ *
+ * @param path path to convert
+ * @return string, to be freed by caller (unlike other *_2s APIs!)
+ */
+char *
+GCPP_2s (struct CadetPeerPath *p);
+
+
#endif
*/
struct CadetPeer *cp;
+ /**
+ * Envelope this manager would like to transmit once it is its turn.
+ */
+ struct GNUNET_MQ_Envelope *env;
+
};
*/
unsigned int num_paths;
+ /**
+ * Number of message queue managers of this peer that have a message in waiting.
+ */
+ unsigned int mqm_ready_counter;
+
/**
* Current length of the @e path_heads and @path_tails arrays.
* The arrays should be grown as needed.
/**
- * Get the message queue for peer @a cp.
+ * Set the message queue to @a mq for peer @a cp and notify watchers.
*
* @param cp peer to modify
- * @return message queue (can be NULL)
+ * @param mq message queue to set (can be NULL)
*/
-struct GNUNET_MQ_Handle *
-GCP_get_mq (struct CadetPeer *cp)
+void
+GCP_set_mq (struct CadetPeer *cp,
+ struct GNUNET_MQ_Handle *mq)
{
- return cp->core_mq;
+ cp->core_mq = mq;
+
+ for (struct GCP_MessageQueueManager *mqm = cp->mqm_head;
+ NULL != mqm;
+ mqm = mqm->next)
+ {
+ if (NULL == mq)
+ {
+ if (NULL != mqm->env)
+ {
+ GNUNET_MQ_discard (mqm->env);
+ mqm->env = NULL;
+ mqm->cb (mqm->cb_cls,
+ GNUNET_SYSERR);
+ }
+ else
+ {
+ mqm->cb (mqm->cb_cls,
+ GNUNET_NO);
+ }
+ }
+ else
+ {
+ GNUNET_assert (NULL == mqm->env);
+ mqm->cb (mqm->cb_cls,
+ GNUNET_YES);
+ }
+ }
}
/**
- * Set the message queue to @a mq for peer @a cp and notify watchers.
+ * Transmit current envelope from this @a mqm.
*
- * @param cp peer to modify
- * @param mq message queue to set (can be NULL)
+ * @param mqm mqm to transmit message for now
*/
-void
-GCP_set_mq (struct CadetPeer *cp,
- struct GNUNET_MQ_Handle *mq)
+static void
+mqm_execute (struct GCP_MessageQueueManager *mqm)
{
- cp->core_mq = mq;
+ struct CadetPeer *cp = mqm->cp;
+
+ /* Move entry to the end of the DLL, to be fair. */
+ if (mqm != cp->mqm_tail)
+ {
+ GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
+ cp->mqm_tail,
+ mqm);
+ GNUNET_CONTAINER_DLL_insert_tail (cp->mqm_head,
+ cp->mqm_tail,
+ mqm);
+ }
+ GNUNET_MQ_send (cp->core_mq,
+ mqm->env);
+ mqm->env = NULL;
+ cp->mqm_ready_counter--;
+}
+
+
+/**
+ * Function called when CORE took one of the messages from
+ * a message queue manager and transmitted it.
+ *
+ * @param cls the `struct CadetPeeer` where we made progress
+ */
+static void
+mqm_send_done (void *cls)
+{
+ struct CadetPeer *cp = cls;
+
+ if (0 == cp->mqm_ready_counter)
+ return; /* nothing to do */
for (struct GCP_MessageQueueManager *mqm = cp->mqm_head;
NULL != mqm;
mqm = mqm->next)
- mqm->cb (mqm->cb_cls,
- mq);
+ {
+ if (NULL == mqm->env)
+ continue;
+ mqm_execute (mqm);
+ return;
+ }
}
/**
* Send the message in @a env to @a cp.
*
- * @param cp the peer
- * @param env envelope with the message to send
+ * @param mqm the message queue manager to use for transmission
+ * @param env envelope with the message to send; must NOT
+ * yet have a #GNUNET_MQ_notify_sent() callback attached to it
*/
void
-GCP_send (struct CadetPeer *cp,
+GCP_send (struct GCP_MessageQueueManager *mqm,
struct GNUNET_MQ_Envelope *env)
{
+ struct CadetPeer *cp = mqm->cp;
+
GNUNET_assert (NULL != cp->core_mq);
- GNUNET_MQ_send (cp->core_mq,
- env);
+ GNUNET_assert (NULL == mqm->env);
+ GNUNET_MQ_notify_sent (env,
+ &mqm_send_done,
+ cp);
+ mqm->env = env;
+ cp->mqm_ready_counter++;
+ if (0 != GNUNET_MQ_get_length (cp->core_mq))
+ return;
+ mqm_execute (mqm);
}
}
+/**
+ * Test if @a cp has a core-level connection
+ *
+ * @param cp peer to test
+ * @return #GNUNET_YES if @a cp has a core-level connection
+ */
+int
+GCP_has_core_connection (struct CadetPeer *cp)
+{
+ return (NULL != cp->core_mq) ? GNUNET_YES : GNUNET_NO;
+}
+
+
/**
* Start message queue change notifications.
*
mqm);
if (NULL != cp->core_mq)
cb (cb_cls,
- cp->core_mq);
+ GNUNET_YES);
return mqm;
}
* Stops message queue change notifications.
*
* @param mqm handle matching request to cancel
+ * @param last_env final message to transmit, or NULL
*/
void
-GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm)
+GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm,
+ struct GNUNET_MQ_Envelope *last_env)
{
struct CadetPeer *cp = mqm->cp;
+ if (NULL != mqm->env)
+ GNUNET_MQ_discard (mqm->env);
+ if (NULL != last_env)
+ {
+ if (NULL != cp->core_mq)
+ GNUNET_MQ_send (cp->core_mq,
+ last_env);
+ else
+ GNUNET_MQ_discard (last_env);
+ }
GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
cp->mqm_tail,
mqm);
}
+/**
+ * Send the message in @a env to @a cp, overriding queueing logic.
+ * This function should only be used to send error messages outside
+ * of flow and congestion control, similar to ICMP. Note that
+ * the envelope may be silently discarded as well.
+ *
+ * @param cp peer to send the message to
+ * @param env envelope with the message to send
+ */
+void
+GCP_send_ooo (struct CadetPeer *cp,
+ struct GNUNET_MQ_Envelope *env)
+{
+ if (NULL == cp->core_mq)
+ {
+ GNUNET_MQ_discard (env);
+ return;
+ }
+ GNUNET_MQ_send (cp->core_mq,
+ env);
+}
+
+
+
/* end of gnunet-service-cadet-new_peer.c */
/**
* Data structure used to track whom we have to notify about changes
- * to our message queue.
+ * in our ability to transmit to a given peer.
+ *
+ * All queue managers will be given equal chance for sending messages
+ * to @a cp. This construct this guarantees fairness for access to @a
+ * cp among the different message queues. Each connection or route
+ * will have its respective message queue managers for each direction.
*/
struct GCP_MessageQueueManager;
* Function to call with updated message queue object.
*
* @param cls closure
- * @param mq NULL if MQ is gone, otherwise an active message queue
+ * @param available #GNUNET_YES if sending is now possible,
+ * #GNUNET_NO if sending is no longer possible
+ * #GNUNET_SYSERR if sending is no longer possible
+ * and the last envelope was discarded
*/
typedef void
(*GCP_MessageQueueNotificationCallback)(void *cls,
- struct GNUNET_MQ_Handle *mq);
+ int available);
/**
- * Start message queue change notifications.
+ * Start message queue change notifications. Will create a new slot
+ * to manage the message queue to the given @a cp.
*
* @param cp peer to notify for
* @param cb function to call if mq becomes available or unavailable
/**
- * Stops message queue change notifications.
+ * Test if @a cp has a core-level connection
*
- * @param mqm handle matching request to cancel
+ * @param cp peer to test
+ * @return #GNUNET_YES if @a cp has a core-level connection
+ */
+int
+GCP_has_core_connection (struct CadetPeer *cp);
+
+
+/**
+ * Send the message in @a env via a @a mqm. Must only be called at
+ * most once after the respective
+ * #GCP_MessageQueueNotificationCallback was called with `available`
+ * set to #GNUNET_YES, and not after the callback was called with
+ * `available` set to #GNUNET_NO or #GNUNET_SYSERR.
+ *
+ * @param mqm message queue manager for the transmission
+ * @param env envelope with the message to send; must NOT
+ * yet have a #GNUNET_MQ_notify_sent() callback attached to it
*/
void
-GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm);
+GCP_send (struct GCP_MessageQueueManager *mqm,
+ struct GNUNET_MQ_Envelope *env);
/**
- * Set the message queue to @a mq for peer @a cp and notify watchers.
+ * Send the message in @a env to @a cp, overriding queueing logic.
+ * This function should only be used to send error messages outside
+ * of flow and congestion control, similar to ICMP. Note that
+ * the envelope may be silently discarded as well.
*
- * @param cp peer to modify
- * @param mq message queue to set (can be NULL)
+ * @param cp peer to send the message to
+ * @param env envelope with the message to send
*/
void
-GCP_set_mq (struct CadetPeer *cp,
- struct GNUNET_MQ_Handle *mq);
+GCP_send_ooo (struct CadetPeer *cp,
+ struct GNUNET_MQ_Envelope *env);
/**
- * Get the message queue for peer @a cp.
+ * Stops message queue change notifications and sends a last message.
+ * In practice, this is implemented by sending that @a last_env
+ * message immediately (if any), ignoring queue order.
*
- * @param cp peer to modify
- * @return message queue (can be NULL)
+ * @param mqm handle matching request to cancel
+ * @param last_env final message to transmit, or NULL
*/
-struct GNUNET_MQ_Handle *
-GCP_get_mq (struct CadetPeer *cp);
+void
+GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm,
+ struct GNUNET_MQ_Envelope *last_env);
/**
- * Send the message in @a env to @a cp.
+ * Set the message queue to @a mq for peer @a cp and notify watchers.
*
- * @param cp the peer
- * @param env envelope with the message to send
+ * @param cp peer to modify
+ * @param mq message queue to set (can be NULL)
*/
void
-GCP_send (struct CadetPeer *cp,
- struct GNUNET_MQ_Envelope *env);
+GCP_set_mq (struct CadetPeer *cp,
+ struct GNUNET_MQ_Handle *mq);
#endif
};
-/**
- * Entry in list of connections used by tunnel, with metadata.
- */
-struct CadetTConnection
-{
- /**
- * Next in DLL.
- */
- struct CadetTConnection *next;
-
- /**
- * Prev in DLL.
- */
- struct CadetTConnection *prev;
-
- /**
- * Connection handle.
- */
- struct CadetConnection *cc;
-
- /**
- * Tunnel this connection belongs to.
- */
- struct CadetTunnel *t;
-
- /**
- * Creation time, to keep oldest connection alive.
- */
- struct GNUNET_TIME_Absolute created;
-
- /**
- * Connection throughput, to keep fastest connection alive.
- */
- uint32_t throughput;
-};
-
-
/**
* Struct used to save messages in a non-ready tunnel to send once connected.
*/
/**
- * A connection is ready for transmission. Looks at our message queue
- * and if there is a message, sends it out via the connection.
+ * A connection is @a is_ready for transmission. Looks at our message
+ * queue and if there is a message, sends it out via the connection.
*
- * @param cls the `struct CadetTConnection` that is ready
+ * @param cls the `struct CadetTConnection` that is @a is_ready
+ * @param is_ready #GNUNET_YES if connection are now ready,
+ * #GNUNET_NO if connection are no longer ready
*/
static void
-connection_ready_cb (void *cls)
+connection_ready_cb (void *cls,
+ int is_ready)
{
struct CadetTConnection *ct = cls;
struct CadetTunnel *t = ct->t;
struct CadetTunnelQueueEntry *tq = t->tq_head;
+ if (GNUNET_NO == ct->is_ready)
+ {
+ ct->is_ready = GNUNET_NO;
+ return;
+ }
+ ct->is_ready = GNUNET_YES;
if (NULL == tq)
return; /* no messages pending right now */
tq);
if (NULL != tq->cid)
*tq->cid = *GCC_get_id (ct->cc);
+ ct->is_ready = GNUNET_NO;
GCC_transmit (ct->cc,
tq->env);
tq->cont (tq->cont_cls);
* at our message queue and if there is a message, picks a connection
* to send it on.
*
+ * FIXME: yuck... Need better selection logic!
+ *
* @param t tunnel to process messages on
*/
static void
for (ct = t->connection_head;
NULL != ct;
ct = ct->next)
- if (GNUNET_YES == GCC_is_ready (ct->cc))
+ if (GNUNET_YES == ct->is_ready)
break;
if (NULL == ct)
return; /* no connections ready */
- connection_ready_cb (ct);
+
+ /* FIXME: a bit hackish to do it like this... */
+ connection_ready_cb (ct,
+ GNUNET_YES);
}
path,
ct,
&connection_ready_cb,
- t);
+ ct);
/* FIXME: schedule job to kill connection (and path?) if it takes
too long to get ready! (And track performance data on how long
other connections took with the tunnel!)