RPS: Return peers to client after many observed ids
authorJulius Bünger <buenger@mytum.de>
Thu, 4 Apr 2019 11:41:25 +0000 (13:41 +0200)
committerJulius Bünger <buenger@mytum.de>
Thu, 4 Apr 2019 11:42:57 +0000 (13:42 +0200)
13 files changed:
src/rps/Makefile.am
src/rps/gnunet-rps-profiler.c
src/rps/gnunet-service-rps_sampler.h
src/rps/profiler_rps.conf
src/rps/rps-sampler_client.c
src/rps/rps-sampler_common.c
src/rps/rps-sampler_common.h
src/rps/rps-test_util.c
src/rps/rps-test_util.h
src/rps/rps.conf.in
src/rps/rps_api.c
src/rps/test_rps.c
src/rps/test_rps.conf

index 1fffe6be0031f8367e834fc976853ed35e278f49..ce73caa0f7e2e9fe56eda5574db8d6c4333818cc 100644 (file)
@@ -36,6 +36,7 @@ libgnunetrps_la_SOURCES = \
   rps-sampler_client.h rps-sampler_client.c \
   rps_api.c rps.h
 libgnunetrps_la_LIBADD = \
+  $(top_builddir)/src/nse/libgnunetnse.la \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(GN_LIBINTL) $(XLIB)
 libgnunetrps_la_LDFLAGS = \
index af27546f2fa0b035b1dd14165d691a9bb04d6691..a852d94b154e5a3f52b1940fbefdf1f312d713bd 100644 (file)
@@ -1041,7 +1041,9 @@ cancel_request (struct PendingReply *pending_rep)
               "Cancelling rps get reply\n");
   GNUNET_assert (NULL != pending_rep->req_handle);
   GNUNET_RPS_request_cancel (pending_rep->req_handle);
+  pending_rep->req_handle = NULL;
   GNUNET_free (pending_rep);
+  pending_rep = NULL;
 }
 
 void
@@ -2061,29 +2063,8 @@ profiler_eval (void)
   return evaluate ();
 }
 
-static uint32_t fac (uint32_t x)
-{
-  if (1 >= x)
-  {
-    return x;
-  }
-  return x * fac (x - 1);
-}
 
-static uint32_t binom (uint32_t n, uint32_t k)
-{
-  //GNUNET_assert (n >= k);
-  if (k > n) return 0;
-  /* if (0 > n) return 0;  - always false */
-  /* if (0 > k) return 0;  - always false */
-  if (0 == k) return 1;
-  return fac (n)
-    /
-    fac(k) * fac(n - k);
-}
-
-/**
- * @brief is b in view of a?
+/** @brief is b in view of a?
  *
  * @param a
  * @param b
index 921570f7da497ad035cc2a0d3b211d124f1a4979..d8e5f3efdcd1ff14caaa0cc4317d42d585a5a7e5 100644 (file)
@@ -70,7 +70,7 @@ RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size);
  */
 struct RPS_Sampler *
 RPS_sampler_init (size_t init_size,
-    struct GNUNET_TIME_Relative max_round_interval);
+                  struct GNUNET_TIME_Relative max_round_interval);
 
 
 /**
index 6049da5a07efde2ee03b7c2c663f262f87cc85dd..5edd6d3ff15ff6135241c669ffb2990d37fac5a8 100644 (file)
@@ -22,6 +22,9 @@ FILENAME_VALID_PEERS = $GNUNET_DATA_HOME/rps/valid_peers.txt
 # So, 50 is enough for a network of size 50^3 = 125000
 MINSIZE = 4
 
+DESIRED_PROBABILITY = 0.75
+
+DEFICIENCY_FACTOR = 0.4
 
 
 [testbed]
index 1ba60e1a80be8b38407038daee823600b3d1426f..0de25df071806b469e76e6a7d51cdbcc89d058b4 100644 (file)
@@ -218,6 +218,41 @@ RPS_sampler_mod_init (size_t init_size,
 }
 
 
+/**
+ * @brief Compute the probability that we already observed all peers from a
+ * biased stream of peer ids.
+ *
+ * Deficiency factor:
+ * As introduced by Brahms: Factor between the number of unique ids in a
+ * truly random stream and number of unique ids in the gossip stream.
+ *
+ * @param num_peers_estim The estimated number of peers in the network
+ * @param num_peers_observed The number of peers the given element has observed
+ * @param deficiency_factor A factor that catches the 'bias' of a random stream
+ * of peer ids
+ *
+ * @return The estimated probability
+ */
+static double
+prob_observed_n_peers (uint32_t num_peers_estim,
+                       uint32_t num_peers_observed,
+                       double deficiency_factor)
+{
+  uint32_t num_peers = num_peers_estim * (1/deficiency_factor);
+  uint64_t sum = 0;
+
+  for (uint32_t i = 0; i < num_peers; i++)
+  {
+    uint64_t a = pow (-1, num_peers-i);
+    uint64_t b = binom (num_peers, i);
+    uint64_t c = pow (i, num_peers_observed);
+    sum += a * b * c;
+  }
+
+  return sum / (double) pow (num_peers, num_peers_observed);
+}
+
+
 /**
  * Get one random peer out of the sampled peers.
  *
@@ -230,6 +265,7 @@ sampler_mod_get_rand_peer (void *cls)
   struct RPS_SamplerElement *s_elem;
   struct GNUNET_TIME_Relative last_request_diff;
   struct RPS_Sampler *sampler;
+  double prob_observed_n;
 
   gpc->get_peer_task = NULL;
   gpc->notify_ctx = NULL;
@@ -294,6 +330,24 @@ sampler_mod_get_rand_peer (void *cls)
                                 gpc);
     return;
   }
+  /* compute probability */
+  prob_observed_n = prob_observed_n_peers (sampler->num_peers_estim,
+                                           s_elem->num_peers,
+                                           sampler->deficiency_factor);
+  /* check if probability is above desired */
+  if (prob_observed_n >= sampler->desired_probability)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Probability of having observed all peers (%d) too small ( < %d).\n",
+        prob_observed_n,
+        sampler->desired_probability);
+    GNUNET_assert (NULL == gpc->notify_ctx);
+    gpc->notify_ctx =
+      sampler_notify_on_update (sampler,
+                                &sampler_mod_get_rand_peer,
+                                gpc);
+    return;
+  }
   /* More reasons to wait could be added here */
 
 //  GNUNET_STATISTICS_set (stats,
index 2b0569c614130ffc0d330ba16c6e14c8994502f2..3ed4ef989133dba9a480b7bf5628e04586344cea 100644 (file)
@@ -115,6 +115,60 @@ struct RPS_SamplerRequestHandle
 };
 
 
+/**
+ * @brief Update the current estimate of the network size stored at the sampler
+ *
+ * Used for computing the condition when to return elements to the client
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler The sampler to update
+ * @param num_peers The estimated value
+ */
+void
+RPS_sampler_update_with_nw_size (struct RPS_Sampler *sampler,
+                                 uint32_t num_peers)
+{
+  sampler->num_peers_estim = num_peers;
+}
+
+
+/**
+ * @brief Set the probability that is needed at least with what a sampler
+ * element has to have observed all elements from the network.
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler
+ * @param desired_probability
+ */
+void
+RPS_sampler_set_desired_probability (struct RPS_Sampler *sampler,
+                                     double desired_probability)
+{
+  sampler->desired_probability = desired_probability;
+}
+
+
+/**
+ * @brief Set the deficiency factor.
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler
+ * @param desired_probability
+ */
+void
+RPS_sampler_set_deficiency_factor (struct RPS_Sampler *sampler,
+                                   double deficiency_factor)
+{
+  sampler->deficiency_factor = deficiency_factor;
+}
+
+
 /**
  * @brief Add a callback that will be called when the next peer is inserted
  * into the sampler
index e36f6e8348d06580676de7ab9d2c3dc6021f51dc..1abe43720841eaa56ccb3ad1307fbf365462def1 100644 (file)
@@ -146,6 +146,25 @@ struct RPS_Sampler
    */
   struct GNUNET_TIME_Relative max_round_interval;
 
+  /**
+   * @brief The estimated total number of peers in the network
+   */
+  uint32_t num_peers_estim;
+
+  /**
+   * @brief The desired probability with which we want to have observed all
+   * peers.
+   */
+  double desired_probability;
+
+  /**
+   * @brief A factor that catches the 'bias' of a random stream of peer ids.
+   *
+   * As introduced by Brahms: Factor between the number of unique ids in a
+   * truly random stream and number of unique ids in the gossip stream.
+   */
+  double deficiency_factor;
+
   /**
    * Stores the function to return peers. Which one it is depends on whether
    * the Sampler is the modified one or not.
@@ -163,6 +182,48 @@ struct RPS_Sampler
 };
 
 
+/**
+ * @brief Update the current estimate of the network size stored at the sampler
+ *
+ * Used for computing the condition when to return elements to the client
+ *
+ * @param sampler The sampler to update
+ * @param num_peers The estimated value
+ */
+void
+RPS_sampler_update_with_nw_size (struct RPS_Sampler *sampler,
+                                 uint32_t num_peers);
+
+
+/**
+ * @brief Set the probability that is needed at least with what a sampler
+ * element has to have observed all elements from the network.
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler
+ * @param desired_probability
+ */
+void
+RPS_sampler_set_desired_probability (struct RPS_Sampler *sampler,
+                                     double desired_probability);
+
+
+/**
+ * @brief Set the deficiency factor.
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler
+ * @param desired_probability
+ */
+void
+RPS_sampler_set_deficiency_factor (struct RPS_Sampler *sampler,
+                                   double deficiency_factor);
+
+
 /**
  * @brief Add a callback that will be called when the next peer is inserted
  * into the sampler
index 0777503299f2491ab36a2b5edbc7d2d2e05ee3bd..fcb4f59a0df19c62c2a1ec08d419610c22173549 100644 (file)
@@ -487,4 +487,42 @@ store_prefix_file_name (const struct GNUNET_PeerIdentity *peer,
   return file_name;
 }
 
+
+/**
+ * @brief Factorial
+ *
+ * @param x Number of which to compute the factorial
+ *
+ * @return Factorial of @a x
+ */
+uint32_t fac (uint32_t x)
+{
+  if (1 >= x)
+  {
+    return x;
+  }
+  return x * fac (x - 1);
+}
+
+/**
+ * @brief Binomial coefficient (n choose k)
+ *
+ * @param n
+ * @param k
+ *
+ * @return Binomial coefficient of @a n and @a k
+ */
+uint32_t binom (uint32_t n, uint32_t k)
+{
+  //GNUNET_assert (n >= k);
+  if (k > n) return 0;
+  /* if (0 > n) return 0;  - always false */
+  /* if (0 > k) return 0;  - always false */
+  if (0 == k) return 1;
+  return fac (n)
+    /
+    fac(k) * fac(n - k);
+}
+
+
 /* end of gnunet-service-rps.c */
index 5009073d07fadcc901d8daec9f8db8e4fb715f57..6b5f568d7edc9f519c179f64f573551c1fa18a4b 100644 (file)
@@ -107,5 +107,26 @@ to_file_raw_unaligned (const char *file_name,
                        size_t size_buf,
                        unsigned bits_needed);
 
+
+/**
+ * @brief Factorial
+ *
+ * @param x Number of which to compute the factorial
+ *
+ * @return Factorial of @a x
+ */
+uint32_t fac (uint32_t x);
+
+
+/**
+ * @brief Binomial coefficient (n choose k)
+ *
+ * @param n
+ * @param k
+ *
+ * @return Binomial coefficient of @a n and @a k
+ */
+uint32_t binom (uint32_t n, uint32_t k);
+
 #endif /* RPS_TEST_UTIL_H */
 /* end of gnunet-service-rps.c */
index ff701e3713a0d1edd78f16c46d61f52bca536cf0..9619c98892f2c397c13bc6c038ba4cad52b96aef 100644 (file)
@@ -26,3 +26,13 @@ FILENAME_VALID_PEERS = $GNUNET_DATA_HOME/rps/valid_peers.txt
 # Keep in mind, that (networksize)^(1/3) should be enough.
 # So, 50 is enough for a network of size 50^3 = 125000
 MINSIZE = 10
+
+# The probability whith which we want a sampler element to have observed all
+# peer ids in the network at least
+DESIRED_PROBABILITY = 0.9
+
+# A factor that catches the 'bias' of a random stream of peer ids.
+#
+# As introduced by Brahms: Factor between the number of unique ids in a
+# truly random stream and number of unique ids in the gossip stream.
+DEFICIENCY_FACTOR = 0.4
index d0b241a2bda5928cc00357861a2a2cf1bd515d1e..7a3adfa944f8f3374e9bee219190f1365e133ac9 100644 (file)
@@ -29,6 +29,8 @@
 #include "gnunet_rps_service.h"
 #include "rps-sampler_client.h"
 
+#include "gnunet_nse_service.h"
+
 #include <inttypes.h>
 
 #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
@@ -109,6 +111,35 @@ struct GNUNET_RPS_Handle
    * @brief Tail of the DLL of stream requests
    */
   struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
+
+  /**
+   * @brief Handle to nse service
+   */
+  struct GNUNET_NSE_Handle *nse;
+
+  /**
+   * @brief Pointer to the head element in DLL of request handles
+   */
+  struct GNUNET_RPS_Request_Handle *rh_head;
+
+  /**
+   * @brief Pointer to the tail element in DLL of request handles
+   */
+  struct GNUNET_RPS_Request_Handle *rh_tail;
+
+  /**
+   * @brief The desired probability with which we want to have observed all
+   * peers.
+   */
+  float desired_probability;
+
+  /**
+   * @brief A factor that catches the 'bias' of a random stream of peer ids.
+   *
+   * As introduced by Brahms: Factor between the number of unique ids in a
+   * truly random stream and number of unique ids in the gossip stream.
+   */
+  float deficiency_factor;
 };
 
 
@@ -152,6 +183,16 @@ struct GNUNET_RPS_Request_Handle
    * The closure for the callback.
    */
   void *ready_cb_cls;
+
+  /**
+   * @brief Pointer to next element in DLL
+   */
+  struct GNUNET_RPS_Request_Handle *next;
+
+  /**
+   * @brief Pointer to previous element in DLL
+   */
+  struct GNUNET_RPS_Request_Handle *prev;
 };
 
 
@@ -263,10 +304,7 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
   rh->ready_cb (rh->ready_cb_cls,
                 num_peers,
                 peers);
-  GNUNET_RPS_stream_cancel (rh->srh);
-  rh->srh = NULL;
-  RPS_sampler_destroy (rh->sampler);
-  rh->sampler = NULL;
+  GNUNET_RPS_request_cancel (rh);
 }
 
 
@@ -606,6 +644,37 @@ hash_from_share_val (const char *share_val,
 }
 
 
+/**
+ * @brief Callback for network size estimate - called with new estimates about
+ * the network size, updates all samplers with the new estimate
+ *
+ * Implements #GNUNET_NSE_Callback
+ *
+ * @param cls the rps handle
+ * @param timestamp unused
+ * @param logestimate the estimate
+ * @param std_dev the standard distribution
+ */
+static void
+nse_cb (void *cls,
+        struct GNUNET_TIME_Absolute timestamp,
+        double logestimate,
+        double std_dev)
+{
+  struct GNUNET_RPS_Handle *h = cls;
+  (void) timestamp;
+  (void) std_dev;
+
+  for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
+       NULL != rh_iter && NULL != rh_iter->next;
+       rh_iter = rh_iter->next)
+  {
+    RPS_sampler_update_with_nw_size (rh_iter->sampler,
+                                     GNUNET_NSE_log_estimate_to_n (logestimate));
+  }
+}
+
+
 /**
  * Reconnect to the service
  */
@@ -631,6 +700,9 @@ reconnect (struct GNUNET_RPS_Handle *h)
                                  mq_handlers,
                                  &mq_error_handler,
                                  h);
+  if (NULL != h->nse)
+    GNUNET_NSE_disconnect (h->nse);
+  h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h);
 }
 
 
@@ -638,7 +710,7 @@ reconnect (struct GNUNET_RPS_Handle *h)
  * Connect to the rps service
  *
  * @param cfg configuration to use
- * @return a handle to the service
+ * @return a handle to the service, NULL on error
  */
 struct GNUNET_RPS_Handle *
 GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
@@ -647,6 +719,44 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 
   h = GNUNET_new (struct GNUNET_RPS_Handle);
   h->cfg = cfg;
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_float (cfg,
+                                           "RPS",
+                                           "DESIRED_PROBABILITY",
+                                           &h->desired_probability))
+  {
+    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+                               "RPS", "DESIRED_PROBABILITY");
+    GNUNET_free (h);
+    return NULL;
+  }
+  if (0 > h->desired_probability ||
+      1 < h->desired_probability)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+        "The desired probability must be in the interval [0;1]\n");
+    GNUNET_free (h);
+    return NULL;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_float (cfg,
+                                           "RPS",
+                                           "DEFICIENCY_FACTOR",
+                                           &h->deficiency_factor))
+  {
+    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+                               "RPS", "DEFICIENCY_FACTOR");
+    GNUNET_free (h);
+    return NULL;
+  }
+  if (0 > h->desired_probability ||
+      1 < h->desired_probability)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+        "The deficiency factor must be in the interval [0;1]\n");
+    GNUNET_free (h);
+    return NULL;
+  }
   reconnect (h);
   if (NULL == h->mq)
   {
@@ -725,6 +835,10 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
   rh->num_requests = num_req_peers;
   rh->sampler = RPS_sampler_mod_init (num_req_peers,
                                       GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
+  RPS_sampler_set_desired_probability (rh->sampler,
+                                       rps_handle->desired_probability);
+  RPS_sampler_set_deficiency_factor (rh->sampler,
+                                     rps_handle->deficiency_factor);
   rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
                                                  num_req_peers,
                                                  peers_ready_cb,
@@ -734,6 +848,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle,
                                        rh); /* cls */
   rh->ready_cb = ready_cb;
   rh->ready_cb_cls = cls;
+  GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head,
+                               rps_handle->rh_tail,
+                               rh);
 
   return rh;
 }
@@ -911,6 +1028,7 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
 
   h = rh->rps_handle;
   GNUNET_assert (NULL != rh);
+  GNUNET_assert (NULL != rh->srh);
   GNUNET_assert (h == rh->srh->rps_handle);
   GNUNET_RPS_stream_cancel (rh->srh);
   rh->srh = NULL;
@@ -920,6 +1038,10 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh)
     RPS_sampler_request_cancel (rh->sampler_rh);
   }
   RPS_sampler_destroy (rh->sampler);
+  rh->sampler = NULL;
+  GNUNET_CONTAINER_DLL_remove (h->rh_head,
+                               h->rh_tail,
+                               rh);
   GNUNET_free (rh);
 }
 
@@ -939,13 +1061,24 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Still waiting for replies\n");
     for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
-        NULL != srh_iter;
-        srh_iter = srh_next)
+         NULL != srh_iter;
+         srh_iter = srh_next)
     {
       srh_next = srh_iter->next;
       GNUNET_RPS_stream_cancel (srh_iter);
     }
   }
+  if (NULL != h->rh_head)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Not all requests were cancelled!\n");
+    for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
+         h->rh_head != NULL;
+         rh_iter = h->rh_head)
+    {
+      GNUNET_RPS_request_cancel (rh_iter);
+    }
+  }
   if (NULL != srh_callback_peers)
   {
     GNUNET_free (srh_callback_peers);
@@ -957,6 +1090,8 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
         "Still waiting for view updates\n");
     GNUNET_RPS_view_request_cancel (h);
   }
+  if (NULL != h->nse)
+    GNUNET_NSE_disconnect (h->nse);
   GNUNET_MQ_destroy (h->mq);
   GNUNET_free (h);
 }
index 26066bf1078518b11ee21678ef8e3176c7729aff..7fc91743b35d762395c855cd54e37f8e34637a25 100644 (file)
@@ -1964,26 +1964,6 @@ profiler_eval (void)
   return evaluate ();
 }
 
-static uint32_t fac (uint32_t x)
-{
-  if (1 >= x)
-  {
-    return x;
-  }
-  return x * fac (x - 1);
-}
-
-static uint32_t binom (uint32_t n, uint32_t k)
-{
-  //GNUNET_assert (n >= k);
-  if (k > n) return 0;
-  if (0 > n) return 0;
-  if (0 > k) return 0;
-  if (0 == k) return 1;
-  return fac (n)
-    /
-    fac(k) * fac(n - k);
-}
 
 /**
  * @brief is b in view of a?
index c22113af55ff796208db89cd06e705c08d1a35bd..68f3982ec4ee60df82f0cc5aba445b31e73d15a9 100644 (file)
@@ -22,6 +22,10 @@ FILENAME_VALID_PEERS = $GNUNET_DATA_HOME/rps/valid_peers.txt
 # So, 50 is enough for a network of size 50^3 = 125000
 MINSIZE = 4
 
+DESIRED_PROBABILITY = 0.75
+
+DEFICIENCY_FACTOR = 0.4
+
 
 
 [testbed]