Don't pass NULL to destroy_route
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_peer.c
index fa1cc9ed9e402e8ac7db9994007f8cc892caead6..cc13f6da612a70a7b694c6ef94b71d396801745a 100644 (file)
@@ -29,7 +29,6 @@
  *   where we actually need it (i.e. not if we have a direct connection,
  *   or if we already have plenty of good short ones, or maybe even
  *   to take a break if we have some connections and have searched a lot (?))
- * - optimize MQM ready scans (O(n) -> O(1))
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
@@ -40,7 +39,6 @@
 #include "gnunet_core_service.h"
 #include "gnunet_statistics_service.h"
 #include "cadet_protocol.h"
-#include "cadet_path.h"
 #include "gnunet-service-cadet-new.h"
 #include "gnunet-service-cadet-new_connection.h"
 #include "gnunet-service-cadet-new_dht.h"
@@ -116,9 +114,9 @@ struct CadetPeer
   struct GNUNET_PeerIdentity pid;
 
   /**
-   * Last time we heard from this peer
+   * Last time we heard from this peer (currently not used!)
    */
-  struct GNUNET_TIME_Absolute last_contact;
+  struct GNUNET_TIME_Absolute last_contactXXX;
 
   /**
    * Array of DLLs of paths traversing the peer, organized by the
@@ -142,6 +140,11 @@ struct CadetPeer
    */
   struct GCP_MessageQueueManager *mqm_tail;
 
+  /**
+   * Pointer to first "ready" entry in @e mqm_head.
+   */
+  struct GCP_MessageQueueManager *mqm_ready_ptr;
+
   /**
    * MIN-heap of paths owned by this peer (they also end at this
    * peer).  Ordered by desirability.
@@ -204,6 +207,12 @@ struct CadetPeer
    */
   unsigned int num_paths;
 
+  /**
+   * Sum over all of the offsets of all of the paths in the @a path_heads DLLs.
+   * Used to speed-up @GCP_get_desirability_of_path() calculation.
+   */
+  unsigned int off_sum;
+
   /**
    * Number of message queue managers of this peer that have a message in waiting.
    *
@@ -241,6 +250,67 @@ GCP_2s (const struct CadetPeer *cp)
 }
 
 
+/**
+ * Calculate how desirable a path is for @a cp if @a cp
+ * is at offset @a off.
+ *
+ * The 'desirability_table.c' program can be used to compute a list of
+ * sample outputs for different scenarios.  Basically, we score paths
+ * lower if there are many alternatives, and higher if they are
+ * shorter than average, and very high if they are much shorter than
+ * average and without many alternatives.
+ *
+ * @param cp a peer reachable via a path
+ * @param off offset of @a cp in the path
+ * @return score how useful a path is to reach @a cp,
+ *         positive scores mean path is more desirable
+ */
+double
+GCP_get_desirability_of_path (struct CadetPeer *cp,
+                              unsigned int off)
+{
+  unsigned int num_alts = cp->num_paths;
+  unsigned int off_sum;
+  double avg_sum;
+  double path_delta;
+  double weight_alts;
+
+  GNUNET_assert (num_alts >= 1); /* 'path' should be in there! */
+  GNUNET_assert (0 != cp->path_dll_length);
+
+  /* We maintain 'off_sum' in 'peer' and thereby
+     avoid the SLOW recalculation each time. Kept here
+     just to document what is going on. */
+#if SLOW
+  off_sum = 0;
+  for (unsigned int j=0;j<cp->path_dll_length;j++)
+    for (struct CadetPeerPathEntry *pe = cp->path_heads[j];
+         NULL != pe;
+         pe = pe->next)
+      off_sum += j;
+  GNUNET_assert (off_sum == cp->off_sum);
+#else
+  off_sum = cp->off_sum;
+#endif
+  avg_sum = off_sum * 1.0 / cp->path_dll_length;
+  path_delta = off - avg_sum;
+  /* path_delta positiv: path off of peer above average (bad path for peer),
+     path_delta negativ: path off of peer below average (good path for peer) */
+  if (path_delta <= - 1.0)
+    weight_alts = - num_alts / path_delta; /* discount alternative paths */
+  else if (path_delta >= 1.0)
+    weight_alts = num_alts * path_delta; /* overcount alternative paths */
+  else
+    weight_alts = num_alts; /* count alternative paths normally */
+
+
+  /* off+1: long paths are generally harder to find and thus count
+     a bit more as they get longer.  However, above-average paths
+     still need to count less, hence the squaring of that factor. */
+  return (off + 1.0) / (weight_alts * weight_alts);
+}
+
+
 /**
  * This peer is no longer be needed, clean it up now.
  *
@@ -257,7 +327,8 @@ destroy_peer (void *cls)
   cp->destroy_task = NULL;
   GNUNET_assert (NULL == cp->t);
   GNUNET_assert (NULL == cp->core_mq);
-  GNUNET_assert (0 == cp->path_dll_length);
+  for (unsigned int i=0;i<cp->path_dll_length;i++)
+    GNUNET_assert (NULL == cp->path_heads[i]);
   GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (cp->connections));
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (peers,
@@ -284,13 +355,18 @@ destroy_peer (void *cls)
     cp->connectivity_suggestion = NULL;
   }
   GNUNET_CONTAINER_multishortmap_destroy (cp->connections);
-  GNUNET_CONTAINER_heap_destroy (cp->path_heap);
+  if (NULL != cp->path_heap)
+  {
+    GNUNET_CONTAINER_heap_destroy (cp->path_heap);
+    cp->path_heap = NULL;
+  }
   GNUNET_free_non_null (cp->hello);
   /* Peer should not be freed if paths exist; if there are no paths,
      there ought to be no connections, and without connections, no
      notifications. Thus we can assert that mqm_head is empty at this
      point. */
   GNUNET_assert (NULL == cp->mqm_head);
+  GNUNET_assert (NULL == cp->mqm_ready_ptr);
   GNUNET_free (cp);
 }
 
@@ -418,8 +494,9 @@ consider_peer_destroy (struct CadetPeer *cp)
                                                      cp);
     return;
   }
-  if (0 < cp->path_dll_length)
-    return; /* still relevant! */
+  for (unsigned int i=0;i<cp->path_dll_length;i++)
+    if (NULL != cp->path_heads[i])
+      return; /* still relevant! */
   if (NULL != cp->hello)
   {
     /* relevant only until HELLO expires */
@@ -497,6 +574,34 @@ GCP_set_mq (struct CadetPeer *cp,
 }
 
 
+/**
+ * Debug function should NEVER return true in production code, useful to
+ * simulate losses for testcases.
+ *
+ * @return #GNUNET_YES or #GNUNET_NO with the decision to drop.
+ */
+static int
+should_I_drop (void)
+{
+  if (0 == drop_percent)
+    return GNUNET_NO;
+  if (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                101) < drop_percent)
+    return GNUNET_YES;
+  return GNUNET_NO;
+}
+
+
+/**
+ * Function called when CORE took one of the messages from
+ * a message queue manager and transmitted it.
+ *
+ * @param cls the `struct CadetPeeer` where we made progress
+ */
+static void
+mqm_send_done (void *cls);
+
+
 /**
  * Transmit current envelope from this @a mqm.
  *
@@ -507,10 +612,10 @@ mqm_execute (struct GCP_MessageQueueManager *mqm)
 {
   struct CadetPeer *cp = mqm->cp;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Sending to peer %s from MQM %p\n",
-       GCP_2s (cp),
-       mqm);
+  /* Move ready pointer to the next entry that might be ready. */
+  if ( (mqm == cp->mqm_ready_ptr) &&
+       (NULL != mqm->next) )
+    cp->mqm_ready_ptr = mqm->next;
   /* Move entry to the end of the DLL, to be fair. */
   if (mqm != cp->mqm_tail)
   {
@@ -521,10 +626,52 @@ mqm_execute (struct GCP_MessageQueueManager *mqm)
                                       cp->mqm_tail,
                                       mqm);
   }
-  GNUNET_MQ_send (cp->core_mq,
-                  mqm->env);
-  mqm->env = NULL;
   cp->mqm_ready_counter--;
+  if (GNUNET_YES == should_I_drop ())
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "DROPPING message to peer %s from MQM %p\n",
+         GCP_2s (cp),
+         mqm);
+    GNUNET_MQ_discard (mqm->env);
+    mqm->env = NULL;
+    mqm_send_done (cp);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Sending to peer %s from MQM %p\n",
+         GCP_2s (cp),
+         mqm);
+    GNUNET_MQ_send (cp->core_mq,
+                    mqm->env);
+    mqm->env = NULL;
+  }
+  mqm->cb (mqm->cb_cls,
+           GNUNET_YES);
+}
+
+
+/**
+ * Find the next ready message in the queue (starting
+ * the search from the `cp->mqm_ready_ptr`) and if possible
+ * execute the transmission.
+ *
+ * @param cp peer to try to send the next ready message to
+ */
+static void
+send_next_ready (struct CadetPeer *cp)
+{
+  struct GCP_MessageQueueManager *mqm;
+
+  if (0 == cp->mqm_ready_counter)
+    return;
+  while ( (NULL != (mqm = cp->mqm_ready_ptr)) &&
+          (NULL == mqm->env) )
+    cp->mqm_ready_ptr = mqm->next;
+  if (NULL == mqm)
+    return; /* nothing to do */
+  mqm_execute (mqm);
 }
 
 
@@ -542,17 +689,7 @@ mqm_send_done (void *cls)
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Sending to peer %s completed\n",
        GCP_2s (cp));
-  if (0 == cp->mqm_ready_counter)
-    return; /* nothing to do */
-  for (struct GCP_MessageQueueManager *mqm = cp->mqm_head;
-       NULL != mqm;
-       mqm = mqm->next)
-  {
-    if (NULL == mqm->env)
-      continue;
-    mqm_execute (mqm);
-    return;
-  }
+  send_next_ready (cp);
 }
 
 
@@ -569,6 +706,7 @@ GCP_send (struct GCP_MessageQueueManager *mqm,
 {
   struct CadetPeer *cp = mqm->cp;
 
+  GNUNET_assert (NULL != env);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queueing message to peer %s in MQM %p\n",
        GCP_2s (cp),
@@ -580,9 +718,13 @@ GCP_send (struct GCP_MessageQueueManager *mqm,
                          cp);
   mqm->env = env;
   cp->mqm_ready_counter++;
+  if (mqm != cp->mqm_ready_ptr)
+    cp->mqm_ready_ptr = cp->mqm_head;
+  if (1 == cp->mqm_ready_counter)
+    cp->mqm_ready_ptr = mqm;
   if (0 != GNUNET_MQ_get_length (cp->core_mq))
     return;
-  mqm_execute (mqm);
+  send_next_ready (cp);
 }
 
 
@@ -627,6 +769,28 @@ GCP_destroy_all_peers ()
 }
 
 
+/**
+ * Drop all paths owned by this peer, and do not
+ * allow new ones to be added: We are shutting down.
+ *
+ * @param cp peer to drop paths to
+ */
+void
+GCP_drop_owned_paths (struct CadetPeer *cp)
+{
+  struct CadetPeerPath *path;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Destroying all paths to %s\n",
+       GCP_2s (cp));
+  while (NULL != (path =
+                  GNUNET_CONTAINER_heap_remove_root (cp->path_heap)))
+    GCPP_release (path);
+  GNUNET_CONTAINER_heap_destroy (cp->path_heap);
+  cp->path_heap = NULL;
+}
+
+
 /**
  * Add an entry to the DLL of all of the paths that this peer is on.
  *
@@ -639,6 +803,8 @@ GCP_path_entry_add (struct CadetPeer *cp,
                     struct CadetPeerPathEntry *entry,
                     unsigned int off)
 {
+  GNUNET_assert (cp == GCPP_get_peer_at_offset (entry->path,
+                                                off));
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Discovered that peer %s is on path %s at offset %u\n",
        GCP_2s (cp),
@@ -658,6 +824,7 @@ GCP_path_entry_add (struct CadetPeer *cp,
   GNUNET_CONTAINER_DLL_insert (cp->path_heads[off],
                                cp->path_tails[off],
                                entry);
+  cp->off_sum += off;
   cp->num_paths++;
 
   /* If we have a tunnel to this peer, tell the tunnel that there is a
@@ -698,6 +865,7 @@ GCP_path_entry_remove (struct CadetPeer *cp,
                                cp->path_tails[off],
                                entry);
   GNUNET_assert (0 < cp->num_paths);
+  cp->off_sum -= off;
   cp->num_paths--;
   if ( (NULL == cp->core_mq) &&
        (NULL != cp->t) &&
@@ -730,6 +898,14 @@ GCP_attach_path (struct CadetPeer *cp,
   GNUNET_CONTAINER_HeapCostType root_desirability;
   struct GNUNET_CONTAINER_HeapNode *hn;
 
+  GNUNET_assert (cp == GCPP_get_peer_at_offset (path,
+                                                off));
+  if (NULL == cp->path_heap)
+  {
+    /* #GCP_drop_owned_paths() was already called, we cannot take new ones! */
+    GNUNET_assert (GNUNET_NO == force);
+    return NULL;
+  }
   desirability = GCPP_get_desirability (path);
   if (GNUNET_NO == force)
   {
@@ -1205,6 +1381,8 @@ GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm,
     else
       GNUNET_MQ_discard (last_env);
   }
+  if (cp->mqm_ready_ptr == mqm)
+    cp->mqm_ready_ptr = mqm->next;
   GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
                                cp->mqm_tail,
                                mqm);