optimize mqm_head scans by avoiding constantly scanning over definitively non-ready...
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_peer.c
index fe40d76b65c51d23ac55bd6580e8ba6f8e5d83a7..bcc69be53cb1a893b8f582879b256b1b7099a4d9 100644 (file)
@@ -29,7 +29,6 @@
  *   where we actually need it (i.e. not if we have a direct connection,
  *   or if we already have plenty of good short ones, or maybe even
  *   to take a break if we have some connections and have searched a lot (?))
- * - optimize MQM ready scans (O(n) -> O(1))
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
@@ -141,6 +140,11 @@ struct CadetPeer
    */
   struct GCP_MessageQueueManager *mqm_tail;
 
+  /**
+   * Pointer to first "ready" entry in @e mqm_head.
+   */
+  struct GCP_MessageQueueManager *mqm_ready_ptr;
+
   /**
    * MIN-heap of paths owned by this peer (they also end at this
    * peer).  Ordered by desirability.
@@ -295,6 +299,7 @@ destroy_peer (void *cls)
      notifications. Thus we can assert that mqm_head is empty at this
      point. */
   GNUNET_assert (NULL == cp->mqm_head);
+  GNUNET_assert (NULL == cp->mqm_ready_ptr);
   GNUNET_free (cp);
 }
 
@@ -502,6 +507,34 @@ GCP_set_mq (struct CadetPeer *cp,
 }
 
 
+/**
+ * Debug function should NEVER return true in production code, useful to
+ * simulate losses for testcases.
+ *
+ * @return #GNUNET_YES or #GNUNET_NO with the decision to drop.
+ */
+static int
+should_I_drop (void)
+{
+  if (0 == drop_percent)
+    return GNUNET_NO;
+  if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                101) < drop_percent)
+    return GNUNET_YES;
+  return GNUNET_NO;
+}
+
+
+/**
+ * Function called when CORE took one of the messages from
+ * a message queue manager and transmitted it.
+ *
+ * @param cls the `struct CadetPeeer` where we made progress
+ */
+static void
+mqm_send_done (void *cls);
+
+
 /**
  * Transmit current envelope from this @a mqm.
  *
@@ -512,10 +545,10 @@ mqm_execute (struct GCP_MessageQueueManager *mqm)
 {
   struct CadetPeer *cp = mqm->cp;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Sending to peer %s from MQM %p\n",
-       GCP_2s (cp),
-       mqm);
+  /* Move ready pointer to the next entry that might be ready. */
+  if ( (mqm == cp->mqm_ready_ptr) &&
+       (NULL != mqm->next) )
+    cp->mqm_ready_ptr = mqm->next;
   /* Move entry to the end of the DLL, to be fair. */
   if (mqm != cp->mqm_tail)
   {
@@ -526,15 +559,55 @@ mqm_execute (struct GCP_MessageQueueManager *mqm)
                                       cp->mqm_tail,
                                       mqm);
   }
-  GNUNET_MQ_send (cp->core_mq,
-                  mqm->env);
-  mqm->env = NULL;
   cp->mqm_ready_counter--;
+  if (GNUNET_YES == should_I_drop ())
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "DROPPING message to peer %s from MQM %p\n",
+         GCP_2s (cp),
+         mqm);
+    GNUNET_MQ_discard (mqm->env);
+    mqm->env = NULL;
+    mqm_send_done (cp);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Sending to peer %s from MQM %p\n",
+         GCP_2s (cp),
+         mqm);
+    GNUNET_MQ_send (cp->core_mq,
+                    mqm->env);
+    mqm->env = NULL;
+  }
   mqm->cb (mqm->cb_cls,
            GNUNET_YES);
 }
 
 
+/**
+ * Find the next ready message in the queue (starting
+ * the search from the `cp->mqm_ready_ptr`) and if possible
+ * execute the transmission.
+ *
+ * @param cp peer to try to send the next ready message to
+ */
+static void
+send_next_ready (struct CadetPeer *cp)
+{
+  struct GCP_MessageQueueManager *mqm;
+
+  if (0 == cp->mqm_ready_counter)
+    return;
+  while ( (NULL != (mqm = cp->mqm_ready_ptr)) &&
+          (NULL == mqm->env) )
+    cp->mqm_ready_ptr = mqm->next;
+  if (NULL == mqm)
+    return; /* nothing to do */
+  mqm_execute (mqm);
+}
+
+
 /**
  * Function called when CORE took one of the messages from
  * a message queue manager and transmitted it.
@@ -549,17 +622,7 @@ mqm_send_done (void *cls)
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Sending to peer %s completed\n",
        GCP_2s (cp));
-  if (0 == cp->mqm_ready_counter)
-    return; /* nothing to do */
-  for (struct GCP_MessageQueueManager *mqm = cp->mqm_head;
-       NULL != mqm;
-       mqm = mqm->next)
-  {
-    if (NULL == mqm->env)
-      continue;
-    mqm_execute (mqm);
-    return;
-  }
+  send_next_ready (cp);
 }
 
 
@@ -576,6 +639,7 @@ GCP_send (struct GCP_MessageQueueManager *mqm,
 {
   struct CadetPeer *cp = mqm->cp;
 
+  GNUNET_assert (NULL != env);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queueing message to peer %s in MQM %p\n",
        GCP_2s (cp),
@@ -587,9 +651,13 @@ GCP_send (struct GCP_MessageQueueManager *mqm,
                          cp);
   mqm->env = env;
   cp->mqm_ready_counter++;
+  if (mqm != cp->mqm_ready_ptr)
+    cp->mqm_ready_ptr = cp->mqm_head;
+  if (1 == cp->mqm_ready_counter)
+    cp->mqm_ready_ptr = mqm;
   if (0 != GNUNET_MQ_get_length (cp->core_mq))
     return;
-  mqm_execute (mqm);
+  send_next_ready (cp);
 }
 
 
@@ -1244,6 +1312,8 @@ GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm,
     else
       GNUNET_MQ_discard (last_env);
   }
+  if (cp->mqm_ready_ptr == mqm)
+    cp->mqm_ready_ptr = mqm->next;
   GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
                                cp->mqm_tail,
                                mqm);