From bc43d8978d8695ff97cc67b65c29769e9c7f8f0e Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 19 Jan 2017 15:52:22 +0100 Subject: [PATCH] much work on connection/route/peer-level queue management --- src/cadet/gnunet-service-cadet-new.h | 39 +- .../gnunet-service-cadet-new_connection.c | 361 +++++++++++------- .../gnunet-service-cadet-new_connection.h | 26 +- src/cadet/gnunet-service-cadet-new_core.c | 267 +++++++++---- src/cadet/gnunet-service-cadet-new_paths.c | 29 ++ src/cadet/gnunet-service-cadet-new_paths.h | 10 + src/cadet/gnunet-service-cadet-new_peer.c | 172 ++++++++- src/cadet/gnunet-service-cadet-new_peer.h | 76 ++-- src/cadet/gnunet-service-cadet-new_tunnels.c | 66 ++-- 9 files changed, 732 insertions(+), 314 deletions(-) diff --git a/src/cadet/gnunet-service-cadet-new.h b/src/cadet/gnunet-service-cadet-new.h index 862a0f088..9f4667e23 100644 --- a/src/cadet/gnunet-service-cadet-new.h +++ b/src/cadet/gnunet-service-cadet-new.h @@ -105,7 +105,44 @@ struct CadetPeerPathEntry /** * Entry in list of connections used by tunnel, with metadata. */ -struct CadetTConnection; +struct CadetTConnection +{ + /** + * Next in DLL. + */ + struct CadetTConnection *next; + + /** + * Prev in DLL. + */ + struct CadetTConnection *prev; + + /** + * Connection handle. + */ + struct CadetConnection *cc; + + /** + * Tunnel this connection belongs to. + */ + struct CadetTunnel *t; + + /** + * Creation time, to keep oldest connection alive. + */ + struct GNUNET_TIME_Absolute created; + + /** + * Connection throughput, to keep fastest connection alive. + */ + uint32_t throughput; + + /** + * Is the connection currently ready for transmission? + */ + int is_ready; +}; + /** * Active path through the network (used by a tunnel). There may diff --git a/src/cadet/gnunet-service-cadet-new_connection.c b/src/cadet/gnunet-service-cadet-new_connection.c index 5123f9d45..bf88d78e1 100644 --- a/src/cadet/gnunet-service-cadet-new_connection.c +++ b/src/cadet/gnunet-service-cadet-new_connection.c @@ -27,11 +27,8 @@ * @author Christian Grothoff * * TODO: - * - congestion control - * - GCC_debug() * - keepalive messages - * - performance metrics - * - back-off reset + * - keep performance metrics (?) */ #include "platform.h" #include "gnunet-service-cadet-new_channel.h" @@ -64,19 +61,16 @@ enum CadetConnectionState CADET_CONNECTION_SENT, /** - * Connection confirmed, ready to carry traffic. + * We are an inbound connection, and received a CREATE. Need to + * send an CREATE_ACK back. */ - CADET_CONNECTION_READY, + CADET_CONNECTION_CREATE_RECEIVED, /** - * Connection to be destroyed, just waiting to empty queues. + * Connection confirmed, ready to carry traffic. */ - CADET_CONNECTION_DESTROYED, + CADET_CONNECTION_READY - /** - * Connection to be destroyed because of a distant peer, same as DESTROYED. - */ - CADET_CONNECTION_BROKEN }; @@ -111,11 +105,6 @@ struct CadetConnection */ struct GNUNET_MQ_Envelope *env; - /** - * Message queue to the first hop, or NULL if we have no connection yet. - */ - struct GNUNET_MQ_Handle *mq; - /** * Handle for calling #GCP_request_mq_cancel() once we are finished. */ @@ -129,7 +118,7 @@ struct CadetConnection /** * Function to call once we are ready to transmit. */ - GNUNET_SCHEDULER_TaskCallback ready_cb; + GCC_ReadyCallback ready_cb; /** * Closure for @e ready_cb. @@ -151,22 +140,12 @@ struct CadetConnection */ unsigned int off; -}; - + /** + * Are we ready to transmit via @e mq_man right now? + */ + int mqm_ready; -/** - * Is the given connection currently ready for transmission? - * - * @param cc connection to transmit on - * @return #GNUNET_YES if we could transmit - */ -int -GCC_is_ready (struct CadetConnection *cc) -{ - return ( (NULL != cc->mq) && - (CADET_CONNECTION_READY == cc->state) && - (NULL == cc->env) ) ? GNUNET_YES : GNUNET_NO; -} +}; /** @@ -177,29 +156,19 @@ GCC_is_ready (struct CadetConnection *cc) void GCC_destroy (struct CadetConnection *cc) { - if (NULL != cc->env) - { - if (NULL != cc->mq) - GNUNET_MQ_send_cancel (cc->env); - else - GNUNET_MQ_discard (cc->env); - cc->env = NULL; - } - if ( (NULL != cc->mq) && - (CADET_CONNECTION_SENDING_CREATE != cc->state) ) + struct GNUNET_MQ_Envelope *env = NULL; + + if (CADET_CONNECTION_SENDING_CREATE != cc->state) { - /* Need to notify next hop that we are down. */ - struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_ConnectionDestroyMessage *destroy_msg; + /* Need to notify next hop that we are down. */ env = GNUNET_MQ_msg (destroy_msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY); destroy_msg->cid = cc->cid; - GNUNET_MQ_send (cc->mq, - env); } - cc->mq = NULL; - GCP_request_mq_cancel (cc->mq_man); + GCP_request_mq_cancel (cc->mq_man, + env); cc->mq_man = NULL; GCPP_del_connection (cc->path, cc->off, @@ -234,14 +203,20 @@ GCC_get_ct (struct CadetConnection *cc) void GCC_handle_connection_ack (struct CadetConnection *cc) { - GNUNET_SCHEDULER_cancel (cc->task); -#if FIXME + if (NULL != cc->task) + { + GNUNET_SCHEDULER_cancel (cc->task); + cc->task = NULL; + } +#if FIXME_KEEPALIVE cc->task = GNUNET_SCHEDULER_add_delayed (cc->keepalive_period, &send_keepalive, cc); #endif cc->state = CADET_CONNECTION_READY; - cc->ready_cb (cc->ready_cb_cls); + if (GNUNET_YES == cc->mqm_ready) + cc->ready_cb (cc->ready_cb_cls, + GNUNET_YES); } @@ -255,6 +230,12 @@ void GCC_handle_kx (struct CadetConnection *cc, const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg) { + if (CADET_CONNECTION_SENT == cc->state) + { + /* We didn't get the CREATE_ACK, but instead got payload. That's fine, + clearly something is working, so pretend we got an ACK. */ + GCC_handle_connection_ack (cc); + } GCT_handle_kx (cc->ct, msg); } @@ -270,6 +251,12 @@ void GCC_handle_encrypted (struct CadetConnection *cc, const struct GNUNET_CADET_TunnelEncryptedMessage *msg) { + if (CADET_CONNECTION_SENT == cc->state) + { + /* We didn't get the CREATE_ACK, but instead got payload. That's fine, + clearly something is working, so pretend we got an ACK. */ + GCC_handle_connection_ack (cc); + } GCT_handle_encrypted (cc->ct, msg); } @@ -281,37 +268,40 @@ GCC_handle_encrypted (struct CadetConnection *cc, * @param cls the `struct CadetConnection` to initiate */ static void -send_create (void *cls); - - -/** - * We finished transmission of the create message, now wait for - * ACK or retransmit. - * - * @param cls the `struct CadetConnection` that sent the create message - */ -static void -transmit_create_done_cb (void *cls) +send_create (void *cls) { struct CadetConnection *cc = cls; + struct GNUNET_CADET_ConnectionCreateMessage *create_msg; + struct GNUNET_PeerIdentity *pids; + struct GNUNET_MQ_Envelope *env; + unsigned int path_length; + cc->task = NULL; + GNUNET_assert (GNUNET_YES == cc->mqm_ready); + path_length = GCPP_get_length (cc->path); + env = GNUNET_MQ_msg_extra (create_msg, + path_length * sizeof (struct GNUNET_PeerIdentity), + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE); + create_msg->cid = cc->cid; + pids = (struct GNUNET_PeerIdentity *) &create_msg[1]; + for (unsigned int i=0;ipath, + i)); + cc->env = env; + cc->mqm_ready = GNUNET_NO; cc->state = CADET_CONNECTION_SENT; - cc->env = NULL; - /* FIXME: at some point, we need to reset the delay back to 0! */ - cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay); - cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay, - &send_create, - cc); + GCP_send (cc->mq_man, + env); } /** - * Send a CREATE message to the first hop. + * Send a CREATE_ACK message towards the origin. * * @param cls the `struct CadetConnection` to initiate */ static void -send_create (void *cls) +send_create_ack (void *cls) { struct CadetConnection *cc = cls; struct GNUNET_CADET_ConnectionCreateMessage *create_msg; @@ -320,7 +310,7 @@ send_create (void *cls) unsigned int path_length; cc->task = NULL; - GNUNET_assert (NULL != cc->mq); + GNUNET_assert (GNUNET_YES == cc->mqm_ready); path_length = GCPP_get_length (cc->path); env = GNUNET_MQ_msg_extra (create_msg, path_length * sizeof (struct GNUNET_PeerIdentity), @@ -331,11 +321,42 @@ send_create (void *cls) pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path, i)); cc->env = env; - GNUNET_MQ_notify_sent (env, - &transmit_create_done_cb, - cc); - GNUNET_MQ_send (cc->mq, - env); + cc->mqm_ready = GNUNET_NO; + cc->state = CADET_CONNECTION_READY; + GCP_send (cc->mq_man, + env); +} + + +/** + * We got a #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE for a + * connection that we already have. Either our ACK got lost + * or something is fishy. Consider retransmitting the ACK. + * + * @param cc connection that got the duplicate CREATE + */ +void +GCC_handle_duplicate_create (struct CadetConnection *cc) +{ + if (GNUNET_YES == cc->mqm_ready) + { + /* Tell tunnel that we are not ready for transmission anymore + (until CREATE_ACK is done) */ + cc->ready_cb (cc->ready_cb_cls, + GNUNET_NO); + + /* Revert back to the state of having only received the 'CREATE', + and immediately proceed to send the CREATE_ACK. */ + cc->state = CADET_CONNECTION_CREATE_RECEIVED; + cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack, + cc); + } + else + { + /* We are currently sending something else back, which + can only be an ACK or payload, either of which would + do. So actually no need to do anything. */ + } } @@ -344,34 +365,62 @@ send_create (void *cls) * peer at the first hop. Adjust accordingly. * * @param cls the `struct CadetConnection` - * @param mq NULL if the CORE connection was lost, non-NULL if - * it became available + * @param available #GNUNET_YES if sending is now possible, + * #GNUNET_NO if sending is no longer possible + * #GNUNET_SYSERR if sending is no longer possible + * and the last envelope was discarded */ static void manage_first_hop_mq (void *cls, - struct GNUNET_MQ_Handle *mq) + int available) { struct CadetConnection *cc = cls; - if (NULL == mq) + if (GNUNET_YES != available) { /* Connection is down, for now... */ - cc->mq = NULL; + cc->mqm_ready = GNUNET_NO; + cc->state = CADET_CONNECTION_NEW; + cc->retry_delay = GNUNET_TIME_UNIT_ZERO; if (NULL != cc->task) { GNUNET_SCHEDULER_cancel (cc->task); cc->task = NULL; } + cc->ready_cb (cc->ready_cb_cls, + GNUNET_NO); return; } - cc->mq = mq; - cc->state = CADET_CONNECTION_SENDING_CREATE; - - /* Now repeat sending connection creation messages - down the path, until we get an ACK! */ - cc->task = GNUNET_SCHEDULER_add_now (&send_create, - cc); + cc->mqm_ready = GNUNET_YES; + switch (cc->state) + { + case CADET_CONNECTION_NEW: + /* Transmit immediately */ + cc->task = GNUNET_SCHEDULER_add_now (&send_create, + cc); + break; + case CADET_CONNECTION_SENDING_CREATE: + /* Should not be possible to be called in this state. */ + GNUNET_assert (0); + break; + case CADET_CONNECTION_SENT: + /* Retry a bit later... */ + cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay); + cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay, + &send_create, + cc); + break; + case CADET_CONNECTION_CREATE_RECEIVED: + /* We got the 'CREATE' (incoming connection), should send the CREATE_ACK */ + cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack, + cc); + break; + case CADET_CONNECTION_READY: + cc->ready_cb (cc->ready_cb_cls, + GNUNET_YES); + break; + } } @@ -383,6 +432,7 @@ manage_first_hop_mq (void *cls, * @param destination where to go * @param path which path to take (may not be the full path) * @param ct which tunnel uses this connection + * @param init_state initial state for the connection * @param ready_cb function to call when ready to transmit * @param ready_cb_cls closure for @a cb * @return handle to the connection @@ -392,7 +442,8 @@ connection_create (struct CadetPeer *destination, struct CadetPeerPath *path, struct CadetTConnection *ct, const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, - GNUNET_SCHEDULER_TaskCallback ready_cb, + enum CadetConnectionState init_state, + GCC_ReadyCallback ready_cb, void *ready_cb_cls) { struct CadetConnection *cc; @@ -403,6 +454,7 @@ connection_create (struct CadetPeer *destination, destination); GNUNET_assert (UINT_MAX > off); cc = GNUNET_new (struct CadetConnection); + cc->state = init_state; cc->ct = ct; cc->cid = *cid; GNUNET_assert (GNUNET_OK == @@ -448,19 +500,16 @@ GCC_create_inbound (struct CadetPeer *destination, struct CadetPeerPath *path, struct CadetTConnection *ct, const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, - GNUNET_SCHEDULER_TaskCallback ready_cb, + GCC_ReadyCallback ready_cb, void *ready_cb_cls) { - struct CadetConnection *cc; - - cc = connection_create (destination, - path, - ct, - cid, - ready_cb, - ready_cb_cls); - /* FIXME: send CREATE_ACK? */ - return cc; + return connection_create (destination, + path, + ct, + cid, + CADET_CONNECTION_CREATE_RECEIVED, + ready_cb, + ready_cb_cls); } @@ -479,41 +528,21 @@ struct CadetConnection * GCC_create (struct CadetPeer *destination, struct CadetPeerPath *path, struct CadetTConnection *ct, - GNUNET_SCHEDULER_TaskCallback ready_cb, + GCC_ReadyCallback ready_cb, void *ready_cb_cls) { struct GNUNET_CADET_ConnectionTunnelIdentifier cid; - struct CadetConnection *cc; GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &cid, sizeof (cid)); - cc = connection_create (destination, - path, - ct, - &cid, - ready_cb, - ready_cb_cls); - /* FIXME: send CREATE? */ - return cc; -} - - -/** - * We finished transmission of a message, if we are still ready, tell - * the tunnel! - * - * @param cls our `struct CadetConnection` - */ -static void -transmit_done_cb (void *cls) -{ - struct CadetConnection *cc = cls; - - cc->env = NULL; - if ( (NULL != cc->mq) && - (CADET_CONNECTION_READY == cc->state) ) - cc->ready_cb (cc->ready_cb_cls); + return connection_create (destination, + path, + ct, + &cid, + CADET_CONNECTION_NEW, + ready_cb, + ready_cb_cls); } @@ -524,21 +553,18 @@ transmit_done_cb (void *cls) * connection is right now ready for transmission. * * @param cc connection identification - * @param env envelope with message to transmit + * @param env envelope with message to transmit; must NOT + * yet have a #GNUNET_MQ_notify_sent() callback attached to it */ void GCC_transmit (struct CadetConnection *cc, struct GNUNET_MQ_Envelope *env) { - GNUNET_assert (NULL == cc->env); - cc->env = env; - GNUNET_MQ_notify_sent (env, - &transmit_done_cb, - cc); - if ( (NULL != cc->mq) && - (CADET_CONNECTION_READY == cc->state) ) - GNUNET_MQ_send (cc->mq, - env); + GNUNET_assert (GNUNET_YES == cc->mqm_ready); + GNUNET_assert (CADET_CONNECTION_READY == cc->state); + cc->mqm_ready = GNUNET_NO; + GCP_send (cc->mq_man, + env); } @@ -568,6 +594,39 @@ GCC_get_id (struct CadetConnection *cc) } +/** + * Get a (static) string for a connection. + * + * @param cc Connection. + */ +const char * +GCC_2s (const struct CadetConnection *cc) +{ + static char buf[128]; + + if (NULL == cc) + return "Connection(NULL)"; + + if (NULL != cc->ct) + { + GNUNET_snprintf (buf, + sizeof (buf), + "Connection(%s(Tunnel(%s)))", + GNUNET_sh2s (&cc->cid.connection_of_tunnel), + GCT_2s (cc->ct->t)); + return buf; + } + GNUNET_snprintf (buf, + sizeof (buf), + "Connection(%s(Tunnel(NULL)))", + GNUNET_sh2s (&cc->cid.connection_of_tunnel)); + return buf; +} + + +#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__) + + /** * Log connection info. * @@ -578,7 +637,29 @@ void GCC_debug (struct CadetConnection *cc, enum GNUNET_ErrorType level) { - GNUNET_break (0); // FIXME: implement... + int do_log; + char *s; + + do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK), + "cadet-con", + __FILE__, __FUNCTION__, __LINE__); + if (0 == do_log) + return; + if (NULL == cc) + { + LOG2 (level, + "Connection (NULL)\n"); + return; + } + s = GCPP_2s (cc->path); + LOG2 (level, + "Connection %s to %s via path %s in state %d is %s\n", + GCC_2s (cc), + GCP_2s (cc->destination), + s, + cc->state, + (GNUNET_YES == cc->mqm_ready) ? "ready" : "busy"); + GNUNET_free (s); } /* end of gnunet-service-cadet-new_connection.c */ diff --git a/src/cadet/gnunet-service-cadet-new_connection.h b/src/cadet/gnunet-service-cadet-new_connection.h index f2364dea4..99426776d 100644 --- a/src/cadet/gnunet-service-cadet-new_connection.h +++ b/src/cadet/gnunet-service-cadet-new_connection.h @@ -33,14 +33,17 @@ #include "gnunet-service-cadet-new_peer.h" #include "cadet_protocol.h" + /** - * Is the given connection currently ready for transmission? + * Function called to notify tunnel about change in our readyness. * - * @param cc connection to transmit on - * @return #GNUNET_YES if we could transmit + * @param cls closure + * @param is_ready #GNUNET_YES if the connection is now ready for transmission, + * #GNUNET_NO if the connection is no longer ready for transmission */ -int -GCC_is_ready (struct CadetConnection *cc); +typedef void +(*GCC_ReadyCallback)(void *cls, + int is_ready); /** @@ -67,7 +70,7 @@ struct CadetConnection * GCC_create (struct CadetPeer *destination, struct CadetPeerPath *path, struct CadetTConnection *ct, - GNUNET_SCHEDULER_TaskCallback ready_cb, + GCC_ReadyCallback ready_cb, void *ready_cb_cls); @@ -88,7 +91,7 @@ GCC_create_inbound (struct CadetPeer *destination, struct CadetPeerPath *path, struct CadetTConnection *ct, const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, - GNUNET_SCHEDULER_TaskCallback ready_cb, + GCC_ReadyCallback ready_cb, void *ready_cb_cls); @@ -170,6 +173,15 @@ const struct GNUNET_CADET_ConnectionTunnelIdentifier * GCC_get_id (struct CadetConnection *cc); +/** + * Get a (static) string for a connection. + * + * @param cc Connection. + */ +const char * +GCC_2s (const struct CadetConnection *cc); + + /** * Log connection info. * diff --git a/src/cadet/gnunet-service-cadet-new_core.c b/src/cadet/gnunet-service-cadet-new_core.c index 3d8406dc9..9ce4418de 100644 --- a/src/cadet/gnunet-service-cadet-new_core.c +++ b/src/cadet/gnunet-service-cadet-new_core.c @@ -25,6 +25,12 @@ * @author Christian Grothoff * * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom)) + * + * TODO: + * - pass encrypted ACK to connection (!) + * - given BROKEN messages, destroy paths (?) + * - + * - handle POLL (if needed) */ #include "platform.h" #include "gnunet-service-cadet-new_core.h" @@ -35,6 +41,54 @@ #include "gnunet_core_service.h" #include "cadet_protocol.h" +/** + * Number of messages we are willing to buffer per route. + */ +#define ROUTE_BUFFER_SIZE 8 + + +/** + * Information we keep per direction for a route. + */ +struct RouteDirection +{ + /** + * Target peer. + */ + struct CadetPeer *hop; + + /** + * Route this direction is part of. + */ + struct CadetRoute *my_route; + + /** + * Message queue manager for @e hop. + */ + struct GCP_MessageQueueManager *mqm; + + /** + * Cyclic message buffer to @e hop. + */ + struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE]; + + /** + * Next write offset to use to append messages to @e out_buffer. + */ + unsigned int out_wpos; + + /** + * Next read offset to use to retrieve messages from @e out_buffer. + */ + unsigned int out_rpos; + + /** + * Is @e mqm currently ready for transmission? + */ + int is_ready; + +}; + /** * Description of a segment of a `struct CadetConnection` at the @@ -48,24 +102,14 @@ struct CadetRoute { /** - * Previous hop on this route. + * Information about the next hop on this route. */ - struct CadetPeer *prev_hop; + struct RouteDirection next; /** - * Next hop on this route. + * Information about the previous hop on this route. */ - struct CadetPeer *next_hop; - - /** - * Message queue notifications for @e prev_hop. - */ - struct GCP_MessageQueueManager *prev_mqm; - - /** - * Message queue notifications for @e next_hop. - */ - struct GCP_MessageQueueManager *next_mqm; + struct RouteDirection prev; /** * Unique identifier for the connection that uses this route. @@ -77,12 +121,6 @@ struct CadetRoute */ struct GNUNET_TIME_Absolute last_use; - /** - * Counter, used to verify that both MQs are up when the route is - * initialized. - */ - unsigned int up; - }; @@ -125,6 +163,8 @@ route_message (struct CadetPeer *prev, const struct GNUNET_MessageHeader *msg) { struct CadetRoute *route; + struct RouteDirection *dir; + struct GNUNET_MQ_Envelope *env; route = get_route (cid); if (NULL == route) @@ -136,13 +176,33 @@ route_message (struct CadetPeer *prev, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); bm->cid = *cid; bm->peer1 = my_full_id; - GCP_send (prev, - env); + GCP_send_ooo (prev, + env); return; } - /* FIXME: support round-robin queue management here somewhere! */ - GCP_send ((prev == route->prev_hop) ? route->next_hop : route->prev_hop, - GNUNET_MQ_msg_copy (msg)); + dir = (prev == route->prev.hop) ? &route->next : &route->prev; + if (GNUNET_YES == dir->is_ready) + { + dir->is_ready = GNUNET_NO; + GCP_send (dir->mqm, + GNUNET_MQ_msg_copy (msg)); + return; + } + env = dir->out_buffer[dir->out_wpos]; + if (NULL != env) + { + /* Queue full, drop earliest message in queue */ + GNUNET_assert (dir->out_rpos == dir->out_wpos); + GNUNET_MQ_discard (env); + dir->out_rpos++; + if (ROUTE_BUFFER_SIZE == dir->out_rpos) + dir->out_rpos = 0; + } + env = GNUNET_MQ_msg_copy (msg); + dir->out_buffer[dir->out_wpos] = env; + dir->out_wpos++; + if (ROUTE_BUFFER_SIZE == dir->out_wpos) + dir->out_wpos = 0; } @@ -169,6 +229,29 @@ check_connection_create (void *cls, } +/** + * Free internal data of a route direction. + * + * @param dir direction to destroy (do NOT free memory of 'dir' itself) + */ +static void +destroy_direction (struct RouteDirection *dir) +{ + for (unsigned int i=0;iout_buffer[i]) + { + GNUNET_MQ_discard (dir->out_buffer[i]); + dir->out_buffer[i] = NULL; + } + if (NULL != dir->mqm) + { + GCP_request_mq_cancel (dir->mqm, + NULL); + dir->mqm = NULL; + } +} + + /** * Destroy our state for @a route. * @@ -177,8 +260,8 @@ check_connection_create (void *cls, static void destroy_route (struct CadetRoute *route) { - GCP_request_mq_cancel (route->next_mqm); - GCP_request_mq_cancel (route->prev_mqm); + destroy_direction (&route->prev); + destroy_direction (&route->next); GNUNET_free (route); } @@ -192,7 +275,7 @@ destroy_route (struct CadetRoute *route) * @param peer2 another one of the peers where a link is broken */ static void -send_broken (struct CadetPeer *target, +send_broken (struct RouteDirection *target, const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, const struct GNUNET_PeerIdentity *peer1, const struct GNUNET_PeerIdentity *peer2) @@ -207,8 +290,9 @@ send_broken (struct CadetPeer *target, bm->peer1 = *peer1; if (NULL != peer2) bm->peer2 = *peer2; - GCP_send (target, - env); + GCP_request_mq_cancel (target->mqm, + env); + target->mqm = NULL; } @@ -218,53 +302,64 @@ send_broken (struct CadetPeer *target, * be called immediately when we register, and then again * later if the connection ever goes down. * - * @param cls the `struct CadetRoute` - * @param mq the message queue, NULL if connection went down + * @param cls the `struct RouteDirection` + * @param available #GNUNET_YES if sending is now possible, + * #GNUNET_NO if sending is no longer possible + * #GNUNET_SYSERR if sending is no longer possible + * and the last envelope was discarded */ static void -mqm_cr_destroy_prev (void *cls, - struct GNUNET_MQ_Handle *mq) +dir_ready_cb (void *cls, + int ready) { - struct CadetRoute *route = cls; + struct RouteDirection *dir = cls; + struct CadetRoute *route = dir->my_route; + struct RouteDirection *odir; - if (NULL != mq) + if (GNUNET_YES == ready) { - route->up |= 1; + struct GNUNET_MQ_Envelope *env; + + dir->is_ready = GNUNET_YES; + if (NULL != (env = dir->out_buffer[dir->out_rpos])) + { + dir->out_buffer[dir->out_rpos] = NULL; + dir->out_rpos++; + if (ROUTE_BUFFER_SIZE == dir->out_rpos) + dir->out_rpos = 0; + dir->is_ready = GNUNET_NO; + GCP_send (dir->mqm, + env); + } return; } - send_broken (route->next_hop, + odir = (dir == &route->next) ? &route->prev : &route->next; + send_broken (&route->next, &route->cid, - GCP_get_id (route->prev_hop), + GCP_get_id (odir->hop), &my_full_id); destroy_route (route); } /** - * Function called when the message queue to the previous hop - * becomes available/unavailable. We expect this function to - * be called immediately when we register, and then again - * later if the connection ever goes down. + * Initialize one of the directions of a route. * - * @param cls the `struct CadetRoute` - * @param mq the message queue, NULL if connection went down + * @param route route the direction belongs to + * @param dir direction to initialize + * @param hop next hop on in the @a dir */ static void -mqm_cr_destroy_next (void *cls, - struct GNUNET_MQ_Handle *mq) +dir_init (struct RouteDirection *dir, + struct CadetRoute *route, + struct CadetPeer *hop) { - struct CadetRoute *route = cls; - - if (NULL != mq) - { - route->up |= 2; - return; - } - send_broken (route->prev_hop, - &route->cid, - GCP_get_id (route->next_hop), - &my_full_id); - destroy_route (route); + dir->hop = hop; + dir->my_route = route; + dir->mqm = GCP_request_mq (hop, + &dir_ready_cb, + dir); + GNUNET_assert (GNUNET_YES == dir->is_ready); } @@ -310,16 +405,28 @@ handle_connection_create (void *cls, if (NULL != get_route (&msg->cid)) { - /* CID not chosen at random, collides */ - GNUNET_break_op (0); + /* Duplicate CREATE, pass it on, previous one might have been lost! */ + route_message (sender, + &msg->cid, + &msg->header); return; } if (off == path_length - 1) { /* We are the destination, create connection */ + struct CadetConnection *cc; struct CadetPeerPath *path; struct CadetPeer *origin; + cc = GNUNET_CONTAINER_multishortmap_get (connections, + &msg->cid.connection_of_tunnel); + if (NULL != cc) + { + /* Duplicate CREATE, likely our ACK got lost, retransmit the ACK! */ + GNUNET_break (0); // FIXME: not implemented! + return; + } + path = GCPP_get_path_from_route (path_length, pids); origin = GCP_get (&pids[0], @@ -327,35 +434,37 @@ handle_connection_create (void *cls, GCT_add_inbound_connection (GCT_create_tunnel (origin), &msg->cid, path); - return; } /* We are merely a hop on the way, check if we can support the route */ next = GCP_get (&pids[off + 1], GNUNET_NO); if ( (NULL == next) || - (NULL == GCP_get_mq (next)) ) + (GNUNET_NO == GCP_has_core_connection (next)) ) { /* unworkable, send back BROKEN notification */ - send_broken (sender, - &msg->cid, - &pids[off + 1], - &my_full_id); + struct GNUNET_MQ_Envelope *env; + struct GNUNET_CADET_ConnectionBrokenMessage *bm; + + env = GNUNET_MQ_msg (bm, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); + bm->cid = msg->cid; + bm->peer1 = pids[off + 1]; + bm->peer2 = my_full_id; + GCP_send_ooo (sender, + env); return; } /* Workable route, create routing entry */ route = GNUNET_new (struct CadetRoute); route->cid = msg->cid; - route->prev_mqm = GCP_request_mq (sender, - &mqm_cr_destroy_prev, - route); - route->next_mqm = GCP_request_mq (next, - &mqm_cr_destroy_next, - route); - route->prev_hop = sender; - route->next_hop = next; - GNUNET_assert ((1|2) == route->up); + dir_init (&route->prev, + route, + sender); + dir_init (&route->next, + route, + next); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multishortmap_put (routes, &route->cid.connection_of_tunnel, @@ -371,8 +480,8 @@ handle_connection_create (void *cls, * @param msg Message itself. */ static void -handle_connection_ack (void *cls, - const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg) +handle_connection_create_ack (void *cls, + const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg) { struct CadetPeer *peer = cls; struct CadetConnection *cc; @@ -734,7 +843,7 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c) GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, struct GNUNET_CADET_ConnectionCreateMessage, NULL), - GNUNET_MQ_hd_fixed_size (connection_ack, + GNUNET_MQ_hd_fixed_size (connection_create_ack, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, struct GNUNET_CADET_ConnectionCreateMessageAckMessage, NULL), diff --git a/src/cadet/gnunet-service-cadet-new_paths.c b/src/cadet/gnunet-service-cadet-new_paths.c index 3f6edef39..3cfce337c 100644 --- a/src/cadet/gnunet-service-cadet-new_paths.c +++ b/src/cadet/gnunet-service-cadet-new_paths.c @@ -525,4 +525,33 @@ GCPP_get_peer_at_offset (struct CadetPeerPath *path, } +/** + * Convert a path to a human-readable string. + * + * @param path path to convert + * @return string, to be freed by caller (unlike other *_2s APIs!) + */ +char * +GCPP_2s (struct CadetPeerPath *path) +{ + char *s; + char *old; + + old = GNUNET_strdup (""); + for (unsigned int i = 0; + i < path->entries_length; + i++) + { + GNUNET_asprintf (&s, + "%s %s", + old, + GCP_2s (GCPP_get_peer_at_offset (path, + i))); + GNUNET_free_non_null (old); + old = s; + } + return old; +} + + /* end of gnunet-service-cadet-new_paths.c */ diff --git a/src/cadet/gnunet-service-cadet-new_paths.h b/src/cadet/gnunet-service-cadet-new_paths.h index 6a864e8ec..5714368c7 100644 --- a/src/cadet/gnunet-service-cadet-new_paths.h +++ b/src/cadet/gnunet-service-cadet-new_paths.h @@ -169,4 +169,14 @@ GCPP_get_peer_at_offset (struct CadetPeerPath *path, unsigned int off); +/** + * Convert a path to a human-readable string. + * + * @param path path to convert + * @return string, to be freed by caller (unlike other *_2s APIs!) + */ +char * +GCPP_2s (struct CadetPeerPath *p); + + #endif diff --git a/src/cadet/gnunet-service-cadet-new_peer.c b/src/cadet/gnunet-service-cadet-new_peer.c index c57622181..5b978ff77 100644 --- a/src/cadet/gnunet-service-cadet-new_peer.c +++ b/src/cadet/gnunet-service-cadet-new_peer.c @@ -86,6 +86,11 @@ struct GCP_MessageQueueManager */ struct CadetPeer *cp; + /** + * Envelope this manager would like to transmit once it is its turn. + */ + struct GNUNET_MQ_Envelope *env; + }; @@ -188,6 +193,11 @@ struct CadetPeer */ unsigned int num_paths; + /** + * Number of message queue managers of this peer that have a message in waiting. + */ + unsigned int mqm_ready_counter; + /** * Current length of the @e path_heads and @path_tails arrays. * The arrays should be grown as needed. @@ -265,50 +275,121 @@ destroy_peer (void *cls) /** - * Get the message queue for peer @a cp. + * Set the message queue to @a mq for peer @a cp and notify watchers. * * @param cp peer to modify - * @return message queue (can be NULL) + * @param mq message queue to set (can be NULL) */ -struct GNUNET_MQ_Handle * -GCP_get_mq (struct CadetPeer *cp) +void +GCP_set_mq (struct CadetPeer *cp, + struct GNUNET_MQ_Handle *mq) { - return cp->core_mq; + cp->core_mq = mq; + + for (struct GCP_MessageQueueManager *mqm = cp->mqm_head; + NULL != mqm; + mqm = mqm->next) + { + if (NULL == mq) + { + if (NULL != mqm->env) + { + GNUNET_MQ_discard (mqm->env); + mqm->env = NULL; + mqm->cb (mqm->cb_cls, + GNUNET_SYSERR); + } + else + { + mqm->cb (mqm->cb_cls, + GNUNET_NO); + } + } + else + { + GNUNET_assert (NULL == mqm->env); + mqm->cb (mqm->cb_cls, + GNUNET_YES); + } + } } /** - * Set the message queue to @a mq for peer @a cp and notify watchers. + * Transmit current envelope from this @a mqm. * - * @param cp peer to modify - * @param mq message queue to set (can be NULL) + * @param mqm mqm to transmit message for now */ -void -GCP_set_mq (struct CadetPeer *cp, - struct GNUNET_MQ_Handle *mq) +static void +mqm_execute (struct GCP_MessageQueueManager *mqm) { - cp->core_mq = mq; + struct CadetPeer *cp = mqm->cp; + + /* Move entry to the end of the DLL, to be fair. */ + if (mqm != cp->mqm_tail) + { + GNUNET_CONTAINER_DLL_remove (cp->mqm_head, + cp->mqm_tail, + mqm); + GNUNET_CONTAINER_DLL_insert_tail (cp->mqm_head, + cp->mqm_tail, + mqm); + } + GNUNET_MQ_send (cp->core_mq, + mqm->env); + mqm->env = NULL; + cp->mqm_ready_counter--; +} + + +/** + * Function called when CORE took one of the messages from + * a message queue manager and transmitted it. + * + * @param cls the `struct CadetPeeer` where we made progress + */ +static void +mqm_send_done (void *cls) +{ + struct CadetPeer *cp = cls; + + if (0 == cp->mqm_ready_counter) + return; /* nothing to do */ for (struct GCP_MessageQueueManager *mqm = cp->mqm_head; NULL != mqm; mqm = mqm->next) - mqm->cb (mqm->cb_cls, - mq); + { + if (NULL == mqm->env) + continue; + mqm_execute (mqm); + return; + } } /** * Send the message in @a env to @a cp. * - * @param cp the peer - * @param env envelope with the message to send + * @param mqm the message queue manager to use for transmission + * @param env envelope with the message to send; must NOT + * yet have a #GNUNET_MQ_notify_sent() callback attached to it */ void -GCP_send (struct CadetPeer *cp, +GCP_send (struct GCP_MessageQueueManager *mqm, struct GNUNET_MQ_Envelope *env) { + struct CadetPeer *cp = mqm->cp; + GNUNET_assert (NULL != cp->core_mq); - GNUNET_MQ_send (cp->core_mq, - env); + GNUNET_assert (NULL == mqm->env); + GNUNET_MQ_notify_sent (env, + &mqm_send_done, + cp); + mqm->env = env; + cp->mqm_ready_counter++; + if (0 != GNUNET_MQ_get_length (cp->core_mq)) + return; + mqm_execute (mqm); } @@ -864,6 +945,19 @@ GCP_drop_tunnel (struct CadetPeer *peer, } +/** + * Test if @a cp has a core-level connection + * + * @param cp peer to test + * @return #GNUNET_YES if @a cp has a core-level connection + */ +int +GCP_has_core_connection (struct CadetPeer *cp) +{ + return (NULL != cp->core_mq) ? GNUNET_YES : GNUNET_NO; +} + + /** * Start message queue change notifications. * @@ -888,7 +982,7 @@ GCP_request_mq (struct CadetPeer *cp, mqm); if (NULL != cp->core_mq) cb (cb_cls, - cp->core_mq); + GNUNET_YES); return mqm; } @@ -897,12 +991,24 @@ GCP_request_mq (struct CadetPeer *cp, * Stops message queue change notifications. * * @param mqm handle matching request to cancel + * @param last_env final message to transmit, or NULL */ void -GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm) +GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm, + struct GNUNET_MQ_Envelope *last_env) { struct CadetPeer *cp = mqm->cp; + if (NULL != mqm->env) + GNUNET_MQ_discard (mqm->env); + if (NULL != last_env) + { + if (NULL != cp->core_mq) + GNUNET_MQ_send (cp->core_mq, + last_env); + else + GNUNET_MQ_discard (last_env); + } GNUNET_CONTAINER_DLL_remove (cp->mqm_head, cp->mqm_tail, mqm); @@ -910,5 +1016,29 @@ GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm) } +/** + * Send the message in @a env to @a cp, overriding queueing logic. + * This function should only be used to send error messages outside + * of flow and congestion control, similar to ICMP. Note that + * the envelope may be silently discarded as well. + * + * @param cp peer to send the message to + * @param env envelope with the message to send + */ +void +GCP_send_ooo (struct CadetPeer *cp, + struct GNUNET_MQ_Envelope *env) +{ + if (NULL == cp->core_mq) + { + GNUNET_MQ_discard (env); + return; + } + GNUNET_MQ_send (cp->core_mq, + env); +} + + + /* end of gnunet-service-cadet-new_peer.c */ diff --git a/src/cadet/gnunet-service-cadet-new_peer.h b/src/cadet/gnunet-service-cadet-new_peer.h index 6b4ee1b56..c633f47e5 100644 --- a/src/cadet/gnunet-service-cadet-new_peer.h +++ b/src/cadet/gnunet-service-cadet-new_peer.h @@ -262,7 +262,12 @@ GCP_destroy_all_peers (void); /** * Data structure used to track whom we have to notify about changes - * to our message queue. + * in our ability to transmit to a given peer. + * + * All queue managers will be given equal chance for sending messages + * to @a cp. This construct this guarantees fairness for access to @a + * cp among the different message queues. Each connection or route + * will have its respective message queue managers for each direction. */ struct GCP_MessageQueueManager; @@ -271,15 +276,19 @@ struct GCP_MessageQueueManager; * Function to call with updated message queue object. * * @param cls closure - * @param mq NULL if MQ is gone, otherwise an active message queue + * @param available #GNUNET_YES if sending is now possible, + * #GNUNET_NO if sending is no longer possible + * #GNUNET_SYSERR if sending is no longer possible + * and the last envelope was discarded */ typedef void (*GCP_MessageQueueNotificationCallback)(void *cls, - struct GNUNET_MQ_Handle *mq); + int available); /** - * Start message queue change notifications. + * Start message queue change notifications. Will create a new slot + * to manage the message queue to the given @a cp. * * @param cp peer to notify for * @param cb function to call if mq becomes available or unavailable @@ -293,44 +302,67 @@ GCP_request_mq (struct CadetPeer *cp, /** - * Stops message queue change notifications. + * Test if @a cp has a core-level connection * - * @param mqm handle matching request to cancel + * @param cp peer to test + * @return #GNUNET_YES if @a cp has a core-level connection + */ +int +GCP_has_core_connection (struct CadetPeer *cp); + + +/** + * Send the message in @a env via a @a mqm. Must only be called at + * most once after the respective + * #GCP_MessageQueueNotificationCallback was called with `available` + * set to #GNUNET_YES, and not after the callback was called with + * `available` set to #GNUNET_NO or #GNUNET_SYSERR. + * + * @param mqm message queue manager for the transmission + * @param env envelope with the message to send; must NOT + * yet have a #GNUNET_MQ_notify_sent() callback attached to it */ void -GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm); +GCP_send (struct GCP_MessageQueueManager *mqm, + struct GNUNET_MQ_Envelope *env); /** - * Set the message queue to @a mq for peer @a cp and notify watchers. + * Send the message in @a env to @a cp, overriding queueing logic. + * This function should only be used to send error messages outside + * of flow and congestion control, similar to ICMP. Note that + * the envelope may be silently discarded as well. * - * @param cp peer to modify - * @param mq message queue to set (can be NULL) + * @param cp peer to send the message to + * @param env envelope with the message to send */ void -GCP_set_mq (struct CadetPeer *cp, - struct GNUNET_MQ_Handle *mq); +GCP_send_ooo (struct CadetPeer *cp, + struct GNUNET_MQ_Envelope *env); /** - * Get the message queue for peer @a cp. + * Stops message queue change notifications and sends a last message. + * In practice, this is implemented by sending that @a last_env + * message immediately (if any), ignoring queue order. * - * @param cp peer to modify - * @return message queue (can be NULL) + * @param mqm handle matching request to cancel + * @param last_env final message to transmit, or NULL */ -struct GNUNET_MQ_Handle * -GCP_get_mq (struct CadetPeer *cp); +void +GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm, + struct GNUNET_MQ_Envelope *last_env); /** - * Send the message in @a env to @a cp. + * Set the message queue to @a mq for peer @a cp and notify watchers. * - * @param cp the peer - * @param env envelope with the message to send + * @param cp peer to modify + * @param mq message queue to set (can be NULL) */ void -GCP_send (struct CadetPeer *cp, - struct GNUNET_MQ_Envelope *env); +GCP_set_mq (struct CadetPeer *cp, + struct GNUNET_MQ_Handle *mq); #endif diff --git a/src/cadet/gnunet-service-cadet-new_tunnels.c b/src/cadet/gnunet-service-cadet-new_tunnels.c index 9161c41f7..23b270b82 100644 --- a/src/cadet/gnunet-service-cadet-new_tunnels.c +++ b/src/cadet/gnunet-service-cadet-new_tunnels.c @@ -226,43 +226,6 @@ struct CadetTunnelAxolotl }; -/** - * Entry in list of connections used by tunnel, with metadata. - */ -struct CadetTConnection -{ - /** - * Next in DLL. - */ - struct CadetTConnection *next; - - /** - * Prev in DLL. - */ - struct CadetTConnection *prev; - - /** - * Connection handle. - */ - struct CadetConnection *cc; - - /** - * Tunnel this connection belongs to. - */ - struct CadetTunnel *t; - - /** - * Creation time, to keep oldest connection alive. - */ - struct GNUNET_TIME_Absolute created; - - /** - * Connection throughput, to keep fastest connection alive. - */ - uint32_t throughput; -}; - - /** * Struct used to save messages in a non-ready tunnel to send once connected. */ @@ -1418,18 +1381,27 @@ destroy_tunnel (void *cls) /** - * A connection is ready for transmission. Looks at our message queue - * and if there is a message, sends it out via the connection. + * A connection is @a is_ready for transmission. Looks at our message + * queue and if there is a message, sends it out via the connection. * - * @param cls the `struct CadetTConnection` that is ready + * @param cls the `struct CadetTConnection` that is @a is_ready + * @param is_ready #GNUNET_YES if connection are now ready, + * #GNUNET_NO if connection are no longer ready */ static void -connection_ready_cb (void *cls) +connection_ready_cb (void *cls, + int is_ready) { struct CadetTConnection *ct = cls; struct CadetTunnel *t = ct->t; struct CadetTunnelQueueEntry *tq = t->tq_head; + if (GNUNET_NO == ct->is_ready) + { + ct->is_ready = GNUNET_NO; + return; + } + ct->is_ready = GNUNET_YES; if (NULL == tq) return; /* no messages pending right now */ @@ -1440,6 +1412,7 @@ connection_ready_cb (void *cls) tq); if (NULL != tq->cid) *tq->cid = *GCC_get_id (ct->cc); + ct->is_ready = GNUNET_NO; GCC_transmit (ct->cc, tq->env); tq->cont (tq->cont_cls); @@ -1453,6 +1426,8 @@ connection_ready_cb (void *cls) * at our message queue and if there is a message, picks a connection * to send it on. * + * FIXME: yuck... Need better selection logic! + * * @param t tunnel to process messages on */ static void @@ -1465,11 +1440,14 @@ trigger_transmissions (struct CadetTunnel *t) for (ct = t->connection_head; NULL != ct; ct = ct->next) - if (GNUNET_YES == GCC_is_ready (ct->cc)) + if (GNUNET_YES == ct->is_ready) break; if (NULL == ct) return; /* no connections ready */ - connection_ready_cb (ct); + + /* FIXME: a bit hackish to do it like this... */ + connection_ready_cb (ct, + GNUNET_YES); } @@ -1567,7 +1545,7 @@ consider_path_cb (void *cls, path, ct, &connection_ready_cb, - t); + ct); /* FIXME: schedule job to kill connection (and path?) if it takes too long to get ready! (And track performance data on how long other connections took with the tunnel!) -- 2.25.1