fair, global message buffer implemented
authorChristian Grothoff <christian@grothoff.org>
Mon, 30 Jan 2017 20:14:30 +0000 (21:14 +0100)
committerChristian Grothoff <christian@grothoff.org>
Mon, 30 Jan 2017 20:14:30 +0000 (21:14 +0100)
src/cadet/TODO
src/cadet/gnunet-service-cadet-new_core.c

index 0033e0f3449225bb0e2604a251c6f8695bad3239..cbce04e2f1e8ba5fb09497d9e8c228873b8bc3e1 100644 (file)
@@ -4,10 +4,6 @@
   + get current RTT from connection; use that for initial retransmissions!
   + figure out flow control without ACKs (unreliable traffic!)
 
-- HIGH: revisit message buffer, have global buffer instead per-route, but then
-        make sure it is shared fairly across routes and connections (CORE);
-        also, do not buffer if the connection is set to unbuffered!
-
 - HIGH: revisit handling of 'unbuffered' traffic! (CHANNEL/TUNNEL)
         (need to push down through tunnel into connection selection);
         At Tunnel-level, try to create connections that match channel
index fc81c1a3e777d5668ed4efdcc7fb21da140441dc..2c050af6ddee92fc09739a3ac85ceac5fcb7270e 100644 (file)
@@ -27,7 +27,6 @@
  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
  *
  * TODO:
- * - properly implement GLOBAL message buffer, instead of per-route buffers
  * - do NOT use buffering if the route options say no buffer!
  * - Optimization: given BROKEN messages, destroy paths (?)
  */
@@ -83,14 +82,13 @@ struct Rung
    * Total number of route directions in this rung.
    */
   unsigned int num_routes;
-};
-
 
-/**
- * Number of messages we are willing to buffer per route.
- * FIXME: have global buffer pool instead!
- */
-#define ROUTE_BUFFER_SIZE 8
+  /**
+   * Number of messages route directions at this rung have
+   * in their buffer.
+   */
+  unsigned int rung_off;
+};
 
 
 /**
@@ -139,21 +137,6 @@ struct RouteDirection
    */
   struct GCP_MessageQueueManager *mqm;
 
-  /**
-   * Cyclic message buffer to @e hop.
-   */
-  struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
-
-  /**
-   * Next write offset to use to append messages to @e out_buffer.
-   */
-  unsigned int out_wpos;
-
-  /**
-   * Next read offset to use to retrieve messages from @e out_buffer.
-   */
-  unsigned int out_rpos;
-
   /**
    * Is @e mqm currently ready for transmission?
    */
@@ -220,6 +203,22 @@ static struct GNUNET_CONTAINER_MultiShortmap *routes;
  */
 static struct GNUNET_CONTAINER_Heap *route_heap;
 
+/**
+ * Rung zero (always pointed to by #rung_head).
+ */
+static struct Rung rung_zero;
+
+/**
+ * DLL of rungs, with the head always point to a rung of
+ * route directions with no messages in the queue.
+ */
+static struct Rung *rung_head = &rung_zero;
+
+/**
+ * Tail of the #rung_head DLL.
+ */
+static struct Rung *rung_tail = &rung_zero;
+
 /**
  * Maximum number of concurrent routes this peer will support.
  */
@@ -254,6 +253,91 @@ get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
 }
 
 
+/**
+ * Lower the rung in which @a dir is by 1.
+ *
+ * @param dir direction to lower in rung.
+ */
+static void
+lower_rung (struct RouteDirection *dir)
+{
+  struct Rung *rung = dir->rung;
+  struct Rung *prev;
+
+  GNUNET_CONTAINER_DLL_remove (rung->rd_head,
+                               rung->rd_tail,
+                               dir);
+  prev = rung->prev;
+  GNUNET_assert (NULL != prev);
+  if (prev->rung_off != rung->rung_off - 1)
+  {
+    prev = GNUNET_new (struct Rung);
+    prev->rung_off = rung->rung_off - 1;
+    GNUNET_CONTAINER_DLL_insert_after (rung_head,
+                                       rung_tail,
+                                       prev,
+                                       rung);
+  }
+  else
+  {
+    rung = prev;
+  }
+  GNUNET_assert (NULL != rung);
+  GNUNET_CONTAINER_DLL_insert (rung->rd_head,
+                               rung->rd_tail,
+                               dir);
+
+}
+
+
+/**
+ * Discard the buffer @a env from the route direction @a dir and
+ * move @a dir down a rung.
+ *
+ * @param dir direction that contains the @a env in the buffer
+ * @param env envelope to discard
+ */
+static void
+discard_buffer (struct RouteDirection *dir,
+                struct GNUNET_MQ_Envelope *env)
+{
+  GNUNET_MQ_dll_remove (&dir->env_head,
+                        &dir->env_tail,
+                        env);
+  cur_buffers--;
+  GNUNET_MQ_discard (env);
+  lower_rung (dir);
+}
+
+
+/**
+ * Discard all messages from the highest rung, to make space.
+ */
+static void
+discard_all_from_rung_tail ()
+{
+  struct Rung *tail = rung_tail;
+  struct RouteDirection *dir;
+
+  while (NULL != (dir = tail->rd_head))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Queue full due new message %s on connection %s, dropping old message\n",
+         GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
+    GNUNET_STATISTICS_update (stats,
+                              "# messages dropped due to full buffer",
+                              1,
+                              GNUNET_NO);
+    discard_buffer (dir,
+                    dir->env_head);
+  }
+  GNUNET_CONTAINER_DLL_remove (rung_head,
+                               rung_tail,
+                               tail);
+  GNUNET_free (tail);
+}
+
+
 /**
  * We message @a msg from @a prev.  Find its route by @a cid and
  * forward to the next hop.  Drop and signal broken route if we do not
@@ -270,6 +354,8 @@ route_message (struct CadetPeer *prev,
 {
   struct CadetRoute *route;
   struct RouteDirection *dir;
+  struct Rung *rung;
+  struct Rung *nxt;
   struct GNUNET_MQ_Envelope *env;
 
   route = get_route (cid);
@@ -308,26 +394,51 @@ route_message (struct CadetPeer *prev,
               GNUNET_MQ_msg_copy (msg));
     return;
   }
-  env = dir->out_buffer[dir->out_wpos];
-  if (NULL != env)
+  rung = dir->rung;
+  if (cur_buffers == max_buffers)
   {
-    /* Queue full, drop earliest message in queue */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Queue full due to new message of type %u from %s to %s on connection %s, dropping old message\n",
-         ntohs (msg->type),
-         GCP_2s (prev),
-         GNUNET_i2s (GCP_get_id (dir->hop)),
-         GNUNET_sh2s (&cid->connection_of_tunnel));
-    GNUNET_STATISTICS_update (stats,
-                              "# messages dropped due to full buffer",
-                              1,
-                              GNUNET_NO);
-  GNUNET_assert (dir->out_rpos == dir->out_wpos);
-    GNUNET_MQ_discard (env);
-    dir->out_rpos++;
-    if (ROUTE_BUFFER_SIZE == dir->out_rpos)
-      dir->out_rpos = 0;
+    /* Need to make room. */
+    if (NULL != rung->next)
+    {
+      /* Easy case, drop messages from route directions in highest rung */
+      discard_all_from_rung_tail ();
+    }
+    else
+    {
+      /* We are in the highest rung, drop our own! */
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Queue full due new message %s on connection %s, dropping old message\n",
+           GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
+      GNUNET_STATISTICS_update (stats,
+                                "# messages dropped due to full buffer",
+                                1,
+                                GNUNET_NO);
+      discard_buffer (dir,
+                      dir->env_head);
+      rung = dir->rung;
+    }
+  }
+  /* remove 'dir' from current rung */
+  GNUNET_CONTAINER_DLL_remove (rung->rd_head,
+                               rung->rd_tail,
+                               dir);
+  /* make 'nxt' point to the next higher rung, creat if necessary */
+  nxt = rung->next;
+  if ( (NULL == nxt) ||
+       (rung->rung_off + 1 != nxt->rung_off) )
+  {
+    nxt = GNUNET_new (struct Rung);
+    nxt->rung_off = rung->rung_off + 1;
+    GNUNET_CONTAINER_DLL_insert_after (rung_head,
+                                       rung_tail,
+                                       rung,
+                                       nxt);
   }
+  /* insert 'dir' into next higher rung */
+  GNUNET_CONTAINER_DLL_insert (nxt->rd_head,
+                               nxt->rd_tail,
+                               dir);
+  /* add message into 'dir' buffer */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queueing new message of type %u from %s to %s on connection %s\n",
        ntohs (msg->type),
@@ -335,10 +446,19 @@ route_message (struct CadetPeer *prev,
        GNUNET_i2s (GCP_get_id (dir->hop)),
        GNUNET_sh2s (&cid->connection_of_tunnel));
   env = GNUNET_MQ_msg_copy (msg);
-  dir->out_buffer[dir->out_wpos] = env;
-  dir->out_wpos++;
-  if (ROUTE_BUFFER_SIZE == dir->out_wpos)
-    dir->out_wpos = 0;
+  GNUNET_MQ_dll_insert_tail (&dir->env_head,
+                             &dir->env_tail,
+                             env);
+  cur_buffers++;
+  /* Clean up 'rung' if now empty (and not head) */
+  if ( (NULL == rung->rd_head) &&
+       (rung != rung_head) )
+  {
+    GNUNET_CONTAINER_DLL_remove (rung_head,
+                                 rung_tail,
+                                 rung);
+    GNUNET_free (rung);
+  }
 }
 
 
@@ -373,18 +493,26 @@ check_connection_create (void *cls,
 static void
 destroy_direction (struct RouteDirection *dir)
 {
-  for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
-    if (NULL != dir->out_buffer[i])
-    {
-      GNUNET_MQ_discard (dir->out_buffer[i]);
-      dir->out_buffer[i] = NULL;
-    }
+  struct GNUNET_MQ_Envelope *env;
+
+  while (NULL != (env = dir->env_head))
+  {
+    GNUNET_STATISTICS_update (stats,
+                              "# messages dropped due to route destruction",
+                              1,
+                              GNUNET_NO);
+    discard_buffer (dir,
+                    env);
+  }
   if (NULL != dir->mqm)
   {
     GCP_request_mq_cancel (dir->mqm,
                            NULL);
     dir->mqm = NULL;
   }
+  GNUNET_CONTAINER_DLL_remove (rung_head->rd_head,
+                               rung_head->rd_tail,
+                               dir);
 }
 
 
@@ -521,12 +649,13 @@ dir_ready_cb (void *cls,
     struct GNUNET_MQ_Envelope *env;
 
     dir->is_ready = GNUNET_YES;
-    if (NULL != (env = dir->out_buffer[dir->out_rpos]))
+    if (NULL != (env = dir->env_head))
     {
-      dir->out_buffer[dir->out_rpos] = NULL;
-      dir->out_rpos++;
-      if (ROUTE_BUFFER_SIZE == dir->out_rpos)
-        dir->out_rpos = 0;
+      GNUNET_MQ_dll_remove (&dir->env_head,
+                            &dir->env_tail,
+                            env);
+      cur_buffers--;
+      lower_rung (dir);
       dir->is_ready = GNUNET_NO;
       GCP_send (dir->mqm,
                 env);