fair, global message buffer implemented
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_core.c
index fda4a7b84231801e0b4448403961d62374711565..2c050af6ddee92fc09739a3ac85ceac5fcb7270e 100644 (file)
@@ -27,7 +27,6 @@
  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
  *
  * TODO:
- * - properly implement GLOBAL message buffer, instead of per-route buffers
  * - do NOT use buffering if the route options say no buffer!
  * - Optimization: given BROKEN messages, destroy paths (?)
  */
 
 #define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
 
+/**
+ * Information we keep per direction for a route.
+ */
+struct RouteDirection;
+
 
 /**
- * Number of messages we are willing to buffer per route.
- * FIXME: have global buffer pool instead!
+ * Set of CadetRoutes that have exactly the same number of messages
+ * in their buffer.  Used so we can efficiently find all of those
+ * routes that have the current maximum of messages in the buffer (in
+ * case we have to purge).
  */
-#define ROUTE_BUFFER_SIZE 8
+struct Rung
+{
+
+  /**
+   * Rung of RouteDirections with one more buffer entry each.
+   */
+  struct Rung *next;
+
+  /**
+   * Rung of RouteDirections with one less buffer entry each.
+   */
+  struct Rung *prev;
+
+  /**
+   * DLL of route directions with a number of buffer entries matching this rung.
+   */
+  struct RouteDirection *rd_head;
+
+  /**
+   * DLL of route directions with a number of buffer entries matching this rung.
+   */
+  struct RouteDirection *rd_tail;
+
+  /**
+   * Total number of route directions in this rung.
+   */
+  unsigned int num_routes;
+
+  /**
+   * Number of messages route directions at this rung have
+   * in their buffer.
+   */
+  unsigned int rung_off;
+};
 
 
 /**
  */
 struct RouteDirection
 {
+
   /**
-   * Target peer.
+   * DLL of other route directions within the same `struct Rung`.
    */
-  struct CadetPeer *hop;
+  struct RouteDirection *prev;
 
   /**
-   * Route this direction is part of.
+   * DLL of other route directions within the same `struct Rung`.
    */
-  struct CadetRoute *my_route;
+  struct RouteDirection *next;
 
   /**
-   * Message queue manager for @e hop.
+   * Rung of this route direction (matches length of the buffer DLL).
    */
-  struct GCP_MessageQueueManager *mqm;
+  struct Rung *rung;
+
+  /**
+   * Head of DLL of envelopes we have in the buffer for this direction.
+   */
+  struct GNUNET_MQ_Envelope *env_head;
 
   /**
-   * Cyclic message buffer to @e hop.
+   * Tail of DLL of envelopes we have in the buffer for this direction.
    */
-  struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
+  struct GNUNET_MQ_Envelope *env_tail;
 
   /**
-   * Next write offset to use to append messages to @e out_buffer.
+   * Target peer.
    */
-  unsigned int out_wpos;
+  struct CadetPeer *hop;
 
   /**
-   * Next read offset to use to retrieve messages from @e out_buffer.
+   * Route this direction is part of.
    */
-  unsigned int out_rpos;
+  struct CadetRoute *my_route;
+
+  /**
+   * Message queue manager for @e hop.
+   */
+  struct GCP_MessageQueueManager *mqm;
 
   /**
    * Is @e mqm currently ready for transmission?
@@ -153,11 +203,37 @@ static struct GNUNET_CONTAINER_MultiShortmap *routes;
  */
 static struct GNUNET_CONTAINER_Heap *route_heap;
 
+/**
+ * Rung zero (always pointed to by #rung_head).
+ */
+static struct Rung rung_zero;
+
+/**
+ * DLL of rungs, with the head always point to a rung of
+ * route directions with no messages in the queue.
+ */
+static struct Rung *rung_head = &rung_zero;
+
+/**
+ * Tail of the #rung_head DLL.
+ */
+static struct Rung *rung_tail = &rung_zero;
+
 /**
  * Maximum number of concurrent routes this peer will support.
  */
 static unsigned long long max_routes;
 
+/**
+ * Maximum number of envelopes we will buffer at this peer.
+ */
+static unsigned long long max_buffers;
+
+/**
+ * Current number of envelopes we have buffered at this peer.
+ */
+static unsigned long long cur_buffers;
+
 /**
  * Task to timeout routes.
  */
@@ -177,6 +253,91 @@ get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
 }
 
 
+/**
+ * Lower the rung in which @a dir is by 1.
+ *
+ * @param dir direction to lower in rung.
+ */
+static void
+lower_rung (struct RouteDirection *dir)
+{
+  struct Rung *rung = dir->rung;
+  struct Rung *prev;
+
+  GNUNET_CONTAINER_DLL_remove (rung->rd_head,
+                               rung->rd_tail,
+                               dir);
+  prev = rung->prev;
+  GNUNET_assert (NULL != prev);
+  if (prev->rung_off != rung->rung_off - 1)
+  {
+    prev = GNUNET_new (struct Rung);
+    prev->rung_off = rung->rung_off - 1;
+    GNUNET_CONTAINER_DLL_insert_after (rung_head,
+                                       rung_tail,
+                                       prev,
+                                       rung);
+  }
+  else
+  {
+    rung = prev;
+  }
+  GNUNET_assert (NULL != rung);
+  GNUNET_CONTAINER_DLL_insert (rung->rd_head,
+                               rung->rd_tail,
+                               dir);
+
+}
+
+
+/**
+ * Discard the buffer @a env from the route direction @a dir and
+ * move @a dir down a rung.
+ *
+ * @param dir direction that contains the @a env in the buffer
+ * @param env envelope to discard
+ */
+static void
+discard_buffer (struct RouteDirection *dir,
+                struct GNUNET_MQ_Envelope *env)
+{
+  GNUNET_MQ_dll_remove (&dir->env_head,
+                        &dir->env_tail,
+                        env);
+  cur_buffers--;
+  GNUNET_MQ_discard (env);
+  lower_rung (dir);
+}
+
+
+/**
+ * Discard all messages from the highest rung, to make space.
+ */
+static void
+discard_all_from_rung_tail ()
+{
+  struct Rung *tail = rung_tail;
+  struct RouteDirection *dir;
+
+  while (NULL != (dir = tail->rd_head))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Queue full due new message %s on connection %s, dropping old message\n",
+         GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
+    GNUNET_STATISTICS_update (stats,
+                              "# messages dropped due to full buffer",
+                              1,
+                              GNUNET_NO);
+    discard_buffer (dir,
+                    dir->env_head);
+  }
+  GNUNET_CONTAINER_DLL_remove (rung_head,
+                               rung_tail,
+                               tail);
+  GNUNET_free (tail);
+}
+
+
 /**
  * We message @a msg from @a prev.  Find its route by @a cid and
  * forward to the next hop.  Drop and signal broken route if we do not
@@ -193,6 +354,8 @@ route_message (struct CadetPeer *prev,
 {
   struct CadetRoute *route;
   struct RouteDirection *dir;
+  struct Rung *rung;
+  struct Rung *nxt;
   struct GNUNET_MQ_Envelope *env;
 
   route = get_route (cid);
@@ -231,26 +394,51 @@ route_message (struct CadetPeer *prev,
               GNUNET_MQ_msg_copy (msg));
     return;
   }
-  env = dir->out_buffer[dir->out_wpos];
-  if (NULL != env)
+  rung = dir->rung;
+  if (cur_buffers == max_buffers)
   {
-    /* Queue full, drop earliest message in queue */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Queue full due to new message of type %u from %s to %s on connection %s, dropping old message\n",
-         ntohs (msg->type),
-         GCP_2s (prev),
-         GNUNET_i2s (GCP_get_id (dir->hop)),
-         GNUNET_sh2s (&cid->connection_of_tunnel));
-    GNUNET_STATISTICS_update (stats,
-                              "# messages dropped due to full buffer",
-                              1,
-                              GNUNET_NO);
-  GNUNET_assert (dir->out_rpos == dir->out_wpos);
-    GNUNET_MQ_discard (env);
-    dir->out_rpos++;
-    if (ROUTE_BUFFER_SIZE == dir->out_rpos)
-      dir->out_rpos = 0;
+    /* Need to make room. */
+    if (NULL != rung->next)
+    {
+      /* Easy case, drop messages from route directions in highest rung */
+      discard_all_from_rung_tail ();
+    }
+    else
+    {
+      /* We are in the highest rung, drop our own! */
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Queue full due new message %s on connection %s, dropping old message\n",
+           GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
+      GNUNET_STATISTICS_update (stats,
+                                "# messages dropped due to full buffer",
+                                1,
+                                GNUNET_NO);
+      discard_buffer (dir,
+                      dir->env_head);
+      rung = dir->rung;
+    }
+  }
+  /* remove 'dir' from current rung */
+  GNUNET_CONTAINER_DLL_remove (rung->rd_head,
+                               rung->rd_tail,
+                               dir);
+  /* make 'nxt' point to the next higher rung, creat if necessary */
+  nxt = rung->next;
+  if ( (NULL == nxt) ||
+       (rung->rung_off + 1 != nxt->rung_off) )
+  {
+    nxt = GNUNET_new (struct Rung);
+    nxt->rung_off = rung->rung_off + 1;
+    GNUNET_CONTAINER_DLL_insert_after (rung_head,
+                                       rung_tail,
+                                       rung,
+                                       nxt);
   }
+  /* insert 'dir' into next higher rung */
+  GNUNET_CONTAINER_DLL_insert (nxt->rd_head,
+                               nxt->rd_tail,
+                               dir);
+  /* add message into 'dir' buffer */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queueing new message of type %u from %s to %s on connection %s\n",
        ntohs (msg->type),
@@ -258,10 +446,19 @@ route_message (struct CadetPeer *prev,
        GNUNET_i2s (GCP_get_id (dir->hop)),
        GNUNET_sh2s (&cid->connection_of_tunnel));
   env = GNUNET_MQ_msg_copy (msg);
-  dir->out_buffer[dir->out_wpos] = env;
-  dir->out_wpos++;
-  if (ROUTE_BUFFER_SIZE == dir->out_wpos)
-    dir->out_wpos = 0;
+  GNUNET_MQ_dll_insert_tail (&dir->env_head,
+                             &dir->env_tail,
+                             env);
+  cur_buffers++;
+  /* Clean up 'rung' if now empty (and not head) */
+  if ( (NULL == rung->rd_head) &&
+       (rung != rung_head) )
+  {
+    GNUNET_CONTAINER_DLL_remove (rung_head,
+                                 rung_tail,
+                                 rung);
+    GNUNET_free (rung);
+  }
 }
 
 
@@ -296,18 +493,26 @@ check_connection_create (void *cls,
 static void
 destroy_direction (struct RouteDirection *dir)
 {
-  for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
-    if (NULL != dir->out_buffer[i])
-    {
-      GNUNET_MQ_discard (dir->out_buffer[i]);
-      dir->out_buffer[i] = NULL;
-    }
+  struct GNUNET_MQ_Envelope *env;
+
+  while (NULL != (env = dir->env_head))
+  {
+    GNUNET_STATISTICS_update (stats,
+                              "# messages dropped due to route destruction",
+                              1,
+                              GNUNET_NO);
+    discard_buffer (dir,
+                    env);
+  }
   if (NULL != dir->mqm)
   {
     GCP_request_mq_cancel (dir->mqm,
                            NULL);
     dir->mqm = NULL;
   }
+  GNUNET_CONTAINER_DLL_remove (rung_head->rd_head,
+                               rung_head->rd_tail,
+                               dir);
 }
 
 
@@ -444,12 +649,13 @@ dir_ready_cb (void *cls,
     struct GNUNET_MQ_Envelope *env;
 
     dir->is_ready = GNUNET_YES;
-    if (NULL != (env = dir->out_buffer[dir->out_rpos]))
+    if (NULL != (env = dir->env_head))
     {
-      dir->out_buffer[dir->out_rpos] = NULL;
-      dir->out_rpos++;
-      if (ROUTE_BUFFER_SIZE == dir->out_rpos)
-        dir->out_rpos = 0;
+      GNUNET_MQ_dll_remove (&dir->env_head,
+                            &dir->env_tail,
+                            env);
+      cur_buffers--;
+      lower_rung (dir);
       dir->is_ready = GNUNET_NO;
       GCP_send (dir->mqm,
                 env);
@@ -762,11 +968,12 @@ handle_connection_broken (void *cls,
   }
 
   /* We're just an intermediary peer, route the message along its path */
-  route = get_route (&msg->cid);
   route_message (peer,
                  &msg->cid,
                  &msg->header);
-  destroy_route (route);
+  route = get_route (&msg->cid);
+  if (NULL != route)
+    destroy_route (route);
   /* FIXME: also destroy paths we MAY have up to the specified link! */
 }
 
@@ -813,11 +1020,12 @@ handle_connection_destroy (void *cls,
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
        GNUNET_sh2s (&msg->cid.connection_of_tunnel));
-  route = get_route (&msg->cid);
   route_message (peer,
                  &msg->cid,
                  &msg->header);
-  destroy_route (route);
+  route = get_route (&msg->cid);
+  if (NULL != route)
+    destroy_route (route);
 }
 
 
@@ -1077,7 +1285,13 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
                                              "CADET",
                                              "MAX_ROUTES",
                                              &max_routes))
-    max_routes = 10000;
+    max_routes = 5000;
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (c,
+                                             "CADET",
+                                             "MAX_MSGS_QUEUE",
+                                             &max_buffers))
+    max_buffers = 10000;
   routes = GNUNET_CONTAINER_multishortmap_create (1024,
                                                   GNUNET_NO);
   route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);