From 98fead2b759d2c7ac87e845d65b6b2f565ba12a3 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 30 Jan 2017 21:14:30 +0100 Subject: [PATCH] fair, global message buffer implemented --- src/cadet/TODO | 4 - src/cadet/gnunet-service-cadet-new_core.c | 241 +++++++++++++++++----- 2 files changed, 185 insertions(+), 60 deletions(-) diff --git a/src/cadet/TODO b/src/cadet/TODO index 0033e0f34..cbce04e2f 100644 --- a/src/cadet/TODO +++ b/src/cadet/TODO @@ -4,10 +4,6 @@ + get current RTT from connection; use that for initial retransmissions! + figure out flow control without ACKs (unreliable traffic!) -- HIGH: revisit message buffer, have global buffer instead per-route, but then - make sure it is shared fairly across routes and connections (CORE); - also, do not buffer if the connection is set to unbuffered! - - HIGH: revisit handling of 'unbuffered' traffic! (CHANNEL/TUNNEL) (need to push down through tunnel into connection selection); At Tunnel-level, try to create connections that match channel diff --git a/src/cadet/gnunet-service-cadet-new_core.c b/src/cadet/gnunet-service-cadet-new_core.c index fc81c1a3e..2c050af6d 100644 --- a/src/cadet/gnunet-service-cadet-new_core.c +++ b/src/cadet/gnunet-service-cadet-new_core.c @@ -27,7 +27,6 @@ * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom)) * * TODO: - * - properly implement GLOBAL message buffer, instead of per-route buffers * - do NOT use buffering if the route options say no buffer! * - Optimization: given BROKEN messages, destroy paths (?) */ @@ -83,14 +82,13 @@ struct Rung * Total number of route directions in this rung. */ unsigned int num_routes; -}; - -/** - * Number of messages we are willing to buffer per route. - * FIXME: have global buffer pool instead! - */ -#define ROUTE_BUFFER_SIZE 8 + /** + * Number of messages route directions at this rung have + * in their buffer. + */ + unsigned int rung_off; +}; /** @@ -139,21 +137,6 @@ struct RouteDirection */ struct GCP_MessageQueueManager *mqm; - /** - * Cyclic message buffer to @e hop. - */ - struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE]; - - /** - * Next write offset to use to append messages to @e out_buffer. - */ - unsigned int out_wpos; - - /** - * Next read offset to use to retrieve messages from @e out_buffer. - */ - unsigned int out_rpos; - /** * Is @e mqm currently ready for transmission? */ @@ -220,6 +203,22 @@ static struct GNUNET_CONTAINER_MultiShortmap *routes; */ static struct GNUNET_CONTAINER_Heap *route_heap; +/** + * Rung zero (always pointed to by #rung_head). + */ +static struct Rung rung_zero; + +/** + * DLL of rungs, with the head always point to a rung of + * route directions with no messages in the queue. + */ +static struct Rung *rung_head = &rung_zero; + +/** + * Tail of the #rung_head DLL. + */ +static struct Rung *rung_tail = &rung_zero; + /** * Maximum number of concurrent routes this peer will support. */ @@ -254,6 +253,91 @@ get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid) } +/** + * Lower the rung in which @a dir is by 1. + * + * @param dir direction to lower in rung. + */ +static void +lower_rung (struct RouteDirection *dir) +{ + struct Rung *rung = dir->rung; + struct Rung *prev; + + GNUNET_CONTAINER_DLL_remove (rung->rd_head, + rung->rd_tail, + dir); + prev = rung->prev; + GNUNET_assert (NULL != prev); + if (prev->rung_off != rung->rung_off - 1) + { + prev = GNUNET_new (struct Rung); + prev->rung_off = rung->rung_off - 1; + GNUNET_CONTAINER_DLL_insert_after (rung_head, + rung_tail, + prev, + rung); + } + else + { + rung = prev; + } + GNUNET_assert (NULL != rung); + GNUNET_CONTAINER_DLL_insert (rung->rd_head, + rung->rd_tail, + dir); + +} + + +/** + * Discard the buffer @a env from the route direction @a dir and + * move @a dir down a rung. + * + * @param dir direction that contains the @a env in the buffer + * @param env envelope to discard + */ +static void +discard_buffer (struct RouteDirection *dir, + struct GNUNET_MQ_Envelope *env) +{ + GNUNET_MQ_dll_remove (&dir->env_head, + &dir->env_tail, + env); + cur_buffers--; + GNUNET_MQ_discard (env); + lower_rung (dir); +} + + +/** + * Discard all messages from the highest rung, to make space. + */ +static void +discard_all_from_rung_tail () +{ + struct Rung *tail = rung_tail; + struct RouteDirection *dir; + + while (NULL != (dir = tail->rd_head)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queue full due new message %s on connection %s, dropping old message\n", + GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel)); + GNUNET_STATISTICS_update (stats, + "# messages dropped due to full buffer", + 1, + GNUNET_NO); + discard_buffer (dir, + dir->env_head); + } + GNUNET_CONTAINER_DLL_remove (rung_head, + rung_tail, + tail); + GNUNET_free (tail); +} + + /** * We message @a msg from @a prev. Find its route by @a cid and * forward to the next hop. Drop and signal broken route if we do not @@ -270,6 +354,8 @@ route_message (struct CadetPeer *prev, { struct CadetRoute *route; struct RouteDirection *dir; + struct Rung *rung; + struct Rung *nxt; struct GNUNET_MQ_Envelope *env; route = get_route (cid); @@ -308,26 +394,51 @@ route_message (struct CadetPeer *prev, GNUNET_MQ_msg_copy (msg)); return; } - env = dir->out_buffer[dir->out_wpos]; - if (NULL != env) + rung = dir->rung; + if (cur_buffers == max_buffers) { - /* Queue full, drop earliest message in queue */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queue full due to new message of type %u from %s to %s on connection %s, dropping old message\n", - ntohs (msg->type), - GCP_2s (prev), - GNUNET_i2s (GCP_get_id (dir->hop)), - GNUNET_sh2s (&cid->connection_of_tunnel)); - GNUNET_STATISTICS_update (stats, - "# messages dropped due to full buffer", - 1, - GNUNET_NO); - GNUNET_assert (dir->out_rpos == dir->out_wpos); - GNUNET_MQ_discard (env); - dir->out_rpos++; - if (ROUTE_BUFFER_SIZE == dir->out_rpos) - dir->out_rpos = 0; + /* Need to make room. */ + if (NULL != rung->next) + { + /* Easy case, drop messages from route directions in highest rung */ + discard_all_from_rung_tail (); + } + else + { + /* We are in the highest rung, drop our own! */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queue full due new message %s on connection %s, dropping old message\n", + GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel)); + GNUNET_STATISTICS_update (stats, + "# messages dropped due to full buffer", + 1, + GNUNET_NO); + discard_buffer (dir, + dir->env_head); + rung = dir->rung; + } + } + /* remove 'dir' from current rung */ + GNUNET_CONTAINER_DLL_remove (rung->rd_head, + rung->rd_tail, + dir); + /* make 'nxt' point to the next higher rung, creat if necessary */ + nxt = rung->next; + if ( (NULL == nxt) || + (rung->rung_off + 1 != nxt->rung_off) ) + { + nxt = GNUNET_new (struct Rung); + nxt->rung_off = rung->rung_off + 1; + GNUNET_CONTAINER_DLL_insert_after (rung_head, + rung_tail, + rung, + nxt); } + /* insert 'dir' into next higher rung */ + GNUNET_CONTAINER_DLL_insert (nxt->rd_head, + nxt->rd_tail, + dir); + /* add message into 'dir' buffer */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing new message of type %u from %s to %s on connection %s\n", ntohs (msg->type), @@ -335,10 +446,19 @@ route_message (struct CadetPeer *prev, GNUNET_i2s (GCP_get_id (dir->hop)), GNUNET_sh2s (&cid->connection_of_tunnel)); env = GNUNET_MQ_msg_copy (msg); - dir->out_buffer[dir->out_wpos] = env; - dir->out_wpos++; - if (ROUTE_BUFFER_SIZE == dir->out_wpos) - dir->out_wpos = 0; + GNUNET_MQ_dll_insert_tail (&dir->env_head, + &dir->env_tail, + env); + cur_buffers++; + /* Clean up 'rung' if now empty (and not head) */ + if ( (NULL == rung->rd_head) && + (rung != rung_head) ) + { + GNUNET_CONTAINER_DLL_remove (rung_head, + rung_tail, + rung); + GNUNET_free (rung); + } } @@ -373,18 +493,26 @@ check_connection_create (void *cls, static void destroy_direction (struct RouteDirection *dir) { - for (unsigned int i=0;iout_buffer[i]) - { - GNUNET_MQ_discard (dir->out_buffer[i]); - dir->out_buffer[i] = NULL; - } + struct GNUNET_MQ_Envelope *env; + + while (NULL != (env = dir->env_head)) + { + GNUNET_STATISTICS_update (stats, + "# messages dropped due to route destruction", + 1, + GNUNET_NO); + discard_buffer (dir, + env); + } if (NULL != dir->mqm) { GCP_request_mq_cancel (dir->mqm, NULL); dir->mqm = NULL; } + GNUNET_CONTAINER_DLL_remove (rung_head->rd_head, + rung_head->rd_tail, + dir); } @@ -521,12 +649,13 @@ dir_ready_cb (void *cls, struct GNUNET_MQ_Envelope *env; dir->is_ready = GNUNET_YES; - if (NULL != (env = dir->out_buffer[dir->out_rpos])) + if (NULL != (env = dir->env_head)) { - dir->out_buffer[dir->out_rpos] = NULL; - dir->out_rpos++; - if (ROUTE_BUFFER_SIZE == dir->out_rpos) - dir->out_rpos = 0; + GNUNET_MQ_dll_remove (&dir->env_head, + &dir->env_tail, + env); + cur_buffers--; + lower_rung (dir); dir->is_ready = GNUNET_NO; GCP_send (dir->mqm, env); -- 2.25.1