From: Julius Bünger Date: Thu, 4 Apr 2019 11:41:25 +0000 (+0200) Subject: RPS: Return peers to client after many observed ids X-Git-Tag: v0.11.3~23^2~2^2 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=8c3d9fc59cd5617c4f5b7ea621971bdff25f5353;p=oweals%2Fgnunet.git RPS: Return peers to client after many observed ids --- diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am index 1fffe6be0..ce73caa0f 100644 --- a/src/rps/Makefile.am +++ b/src/rps/Makefile.am @@ -36,6 +36,7 @@ libgnunetrps_la_SOURCES = \ rps-sampler_client.h rps-sampler_client.c \ rps_api.c rps.h libgnunetrps_la_LIBADD = \ + $(top_builddir)/src/nse/libgnunetnse.la \ $(top_builddir)/src/util/libgnunetutil.la \ $(GN_LIBINTL) $(XLIB) libgnunetrps_la_LDFLAGS = \ diff --git a/src/rps/gnunet-rps-profiler.c b/src/rps/gnunet-rps-profiler.c index af27546f2..a852d94b1 100644 --- a/src/rps/gnunet-rps-profiler.c +++ b/src/rps/gnunet-rps-profiler.c @@ -1041,7 +1041,9 @@ cancel_request (struct PendingReply *pending_rep) "Cancelling rps get reply\n"); GNUNET_assert (NULL != pending_rep->req_handle); GNUNET_RPS_request_cancel (pending_rep->req_handle); + pending_rep->req_handle = NULL; GNUNET_free (pending_rep); + pending_rep = NULL; } void @@ -2061,29 +2063,8 @@ profiler_eval (void) return evaluate (); } -static uint32_t fac (uint32_t x) -{ - if (1 >= x) - { - return x; - } - return x * fac (x - 1); -} -static uint32_t binom (uint32_t n, uint32_t k) -{ - //GNUNET_assert (n >= k); - if (k > n) return 0; - /* if (0 > n) return 0; - always false */ - /* if (0 > k) return 0; - always false */ - if (0 == k) return 1; - return fac (n) - / - fac(k) * fac(n - k); -} - -/** - * @brief is b in view of a? +/** @brief is b in view of a? * * @param a * @param b diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h index 921570f7d..d8e5f3efd 100644 --- a/src/rps/gnunet-service-rps_sampler.h +++ b/src/rps/gnunet-service-rps_sampler.h @@ -70,7 +70,7 @@ RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size); */ struct RPS_Sampler * RPS_sampler_init (size_t init_size, - struct GNUNET_TIME_Relative max_round_interval); + struct GNUNET_TIME_Relative max_round_interval); /** diff --git a/src/rps/profiler_rps.conf b/src/rps/profiler_rps.conf index 6049da5a0..5edd6d3ff 100644 --- a/src/rps/profiler_rps.conf +++ b/src/rps/profiler_rps.conf @@ -22,6 +22,9 @@ FILENAME_VALID_PEERS = $GNUNET_DATA_HOME/rps/valid_peers.txt # So, 50 is enough for a network of size 50^3 = 125000 MINSIZE = 4 +DESIRED_PROBABILITY = 0.75 + +DEFICIENCY_FACTOR = 0.4 [testbed] diff --git a/src/rps/rps-sampler_client.c b/src/rps/rps-sampler_client.c index 1ba60e1a8..0de25df07 100644 --- a/src/rps/rps-sampler_client.c +++ b/src/rps/rps-sampler_client.c @@ -218,6 +218,41 @@ RPS_sampler_mod_init (size_t init_size, } +/** + * @brief Compute the probability that we already observed all peers from a + * biased stream of peer ids. + * + * Deficiency factor: + * As introduced by Brahms: Factor between the number of unique ids in a + * truly random stream and number of unique ids in the gossip stream. + * + * @param num_peers_estim The estimated number of peers in the network + * @param num_peers_observed The number of peers the given element has observed + * @param deficiency_factor A factor that catches the 'bias' of a random stream + * of peer ids + * + * @return The estimated probability + */ +static double +prob_observed_n_peers (uint32_t num_peers_estim, + uint32_t num_peers_observed, + double deficiency_factor) +{ + uint32_t num_peers = num_peers_estim * (1/deficiency_factor); + uint64_t sum = 0; + + for (uint32_t i = 0; i < num_peers; i++) + { + uint64_t a = pow (-1, num_peers-i); + uint64_t b = binom (num_peers, i); + uint64_t c = pow (i, num_peers_observed); + sum += a * b * c; + } + + return sum / (double) pow (num_peers, num_peers_observed); +} + + /** * Get one random peer out of the sampled peers. * @@ -230,6 +265,7 @@ sampler_mod_get_rand_peer (void *cls) struct RPS_SamplerElement *s_elem; struct GNUNET_TIME_Relative last_request_diff; struct RPS_Sampler *sampler; + double prob_observed_n; gpc->get_peer_task = NULL; gpc->notify_ctx = NULL; @@ -294,6 +330,24 @@ sampler_mod_get_rand_peer (void *cls) gpc); return; } + /* compute probability */ + prob_observed_n = prob_observed_n_peers (sampler->num_peers_estim, + s_elem->num_peers, + sampler->deficiency_factor); + /* check if probability is above desired */ + if (prob_observed_n >= sampler->desired_probability) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Probability of having observed all peers (%d) too small ( < %d).\n", + prob_observed_n, + sampler->desired_probability); + GNUNET_assert (NULL == gpc->notify_ctx); + gpc->notify_ctx = + sampler_notify_on_update (sampler, + &sampler_mod_get_rand_peer, + gpc); + return; + } /* More reasons to wait could be added here */ // GNUNET_STATISTICS_set (stats, diff --git a/src/rps/rps-sampler_common.c b/src/rps/rps-sampler_common.c index 2b0569c61..3ed4ef989 100644 --- a/src/rps/rps-sampler_common.c +++ b/src/rps/rps-sampler_common.c @@ -115,6 +115,60 @@ struct RPS_SamplerRequestHandle }; +/** + * @brief Update the current estimate of the network size stored at the sampler + * + * Used for computing the condition when to return elements to the client + * + * Only used/useful with the client sampler + * (Maybe move to rps-sampler_client.{h|c} ?) + * + * @param sampler The sampler to update + * @param num_peers The estimated value + */ +void +RPS_sampler_update_with_nw_size (struct RPS_Sampler *sampler, + uint32_t num_peers) +{ + sampler->num_peers_estim = num_peers; +} + + +/** + * @brief Set the probability that is needed at least with what a sampler + * element has to have observed all elements from the network. + * + * Only used/useful with the client sampler + * (Maybe move to rps-sampler_client.{h|c} ?) + * + * @param sampler + * @param desired_probability + */ +void +RPS_sampler_set_desired_probability (struct RPS_Sampler *sampler, + double desired_probability) +{ + sampler->desired_probability = desired_probability; +} + + +/** + * @brief Set the deficiency factor. + * + * Only used/useful with the client sampler + * (Maybe move to rps-sampler_client.{h|c} ?) + * + * @param sampler + * @param desired_probability + */ +void +RPS_sampler_set_deficiency_factor (struct RPS_Sampler *sampler, + double deficiency_factor) +{ + sampler->deficiency_factor = deficiency_factor; +} + + /** * @brief Add a callback that will be called when the next peer is inserted * into the sampler diff --git a/src/rps/rps-sampler_common.h b/src/rps/rps-sampler_common.h index e36f6e834..1abe43720 100644 --- a/src/rps/rps-sampler_common.h +++ b/src/rps/rps-sampler_common.h @@ -146,6 +146,25 @@ struct RPS_Sampler */ struct GNUNET_TIME_Relative max_round_interval; + /** + * @brief The estimated total number of peers in the network + */ + uint32_t num_peers_estim; + + /** + * @brief The desired probability with which we want to have observed all + * peers. + */ + double desired_probability; + + /** + * @brief A factor that catches the 'bias' of a random stream of peer ids. + * + * As introduced by Brahms: Factor between the number of unique ids in a + * truly random stream and number of unique ids in the gossip stream. + */ + double deficiency_factor; + /** * Stores the function to return peers. Which one it is depends on whether * the Sampler is the modified one or not. @@ -163,6 +182,48 @@ struct RPS_Sampler }; +/** + * @brief Update the current estimate of the network size stored at the sampler + * + * Used for computing the condition when to return elements to the client + * + * @param sampler The sampler to update + * @param num_peers The estimated value + */ +void +RPS_sampler_update_with_nw_size (struct RPS_Sampler *sampler, + uint32_t num_peers); + + +/** + * @brief Set the probability that is needed at least with what a sampler + * element has to have observed all elements from the network. + * + * Only used/useful with the client sampler + * (Maybe move to rps-sampler_client.{h|c} ?) + * + * @param sampler + * @param desired_probability + */ +void +RPS_sampler_set_desired_probability (struct RPS_Sampler *sampler, + double desired_probability); + + +/** + * @brief Set the deficiency factor. + * + * Only used/useful with the client sampler + * (Maybe move to rps-sampler_client.{h|c} ?) + * + * @param sampler + * @param desired_probability + */ +void +RPS_sampler_set_deficiency_factor (struct RPS_Sampler *sampler, + double deficiency_factor); + + /** * @brief Add a callback that will be called when the next peer is inserted * into the sampler diff --git a/src/rps/rps-test_util.c b/src/rps/rps-test_util.c index 077750329..fcb4f59a0 100644 --- a/src/rps/rps-test_util.c +++ b/src/rps/rps-test_util.c @@ -487,4 +487,42 @@ store_prefix_file_name (const struct GNUNET_PeerIdentity *peer, return file_name; } + +/** + * @brief Factorial + * + * @param x Number of which to compute the factorial + * + * @return Factorial of @a x + */ +uint32_t fac (uint32_t x) +{ + if (1 >= x) + { + return x; + } + return x * fac (x - 1); +} + +/** + * @brief Binomial coefficient (n choose k) + * + * @param n + * @param k + * + * @return Binomial coefficient of @a n and @a k + */ +uint32_t binom (uint32_t n, uint32_t k) +{ + //GNUNET_assert (n >= k); + if (k > n) return 0; + /* if (0 > n) return 0; - always false */ + /* if (0 > k) return 0; - always false */ + if (0 == k) return 1; + return fac (n) + / + fac(k) * fac(n - k); +} + + /* end of gnunet-service-rps.c */ diff --git a/src/rps/rps-test_util.h b/src/rps/rps-test_util.h index 5009073d0..6b5f568d7 100644 --- a/src/rps/rps-test_util.h +++ b/src/rps/rps-test_util.h @@ -107,5 +107,26 @@ to_file_raw_unaligned (const char *file_name, size_t size_buf, unsigned bits_needed); + +/** + * @brief Factorial + * + * @param x Number of which to compute the factorial + * + * @return Factorial of @a x + */ +uint32_t fac (uint32_t x); + + +/** + * @brief Binomial coefficient (n choose k) + * + * @param n + * @param k + * + * @return Binomial coefficient of @a n and @a k + */ +uint32_t binom (uint32_t n, uint32_t k); + #endif /* RPS_TEST_UTIL_H */ /* end of gnunet-service-rps.c */ diff --git a/src/rps/rps.conf.in b/src/rps/rps.conf.in index ff701e371..9619c9889 100644 --- a/src/rps/rps.conf.in +++ b/src/rps/rps.conf.in @@ -26,3 +26,13 @@ FILENAME_VALID_PEERS = $GNUNET_DATA_HOME/rps/valid_peers.txt # Keep in mind, that (networksize)^(1/3) should be enough. # So, 50 is enough for a network of size 50^3 = 125000 MINSIZE = 10 + +# The probability whith which we want a sampler element to have observed all +# peer ids in the network at least +DESIRED_PROBABILITY = 0.9 + +# A factor that catches the 'bias' of a random stream of peer ids. +# +# As introduced by Brahms: Factor between the number of unique ids in a +# truly random stream and number of unique ids in the gossip stream. +DEFICIENCY_FACTOR = 0.4 diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index d0b241a2b..7a3adfa94 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -29,6 +29,8 @@ #include "gnunet_rps_service.h" #include "rps-sampler_client.h" +#include "gnunet_nse_service.h" + #include #define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__) @@ -109,6 +111,35 @@ struct GNUNET_RPS_Handle * @brief Tail of the DLL of stream requests */ struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail; + + /** + * @brief Handle to nse service + */ + struct GNUNET_NSE_Handle *nse; + + /** + * @brief Pointer to the head element in DLL of request handles + */ + struct GNUNET_RPS_Request_Handle *rh_head; + + /** + * @brief Pointer to the tail element in DLL of request handles + */ + struct GNUNET_RPS_Request_Handle *rh_tail; + + /** + * @brief The desired probability with which we want to have observed all + * peers. + */ + float desired_probability; + + /** + * @brief A factor that catches the 'bias' of a random stream of peer ids. + * + * As introduced by Brahms: Factor between the number of unique ids in a + * truly random stream and number of unique ids in the gossip stream. + */ + float deficiency_factor; }; @@ -152,6 +183,16 @@ struct GNUNET_RPS_Request_Handle * The closure for the callback. */ void *ready_cb_cls; + + /** + * @brief Pointer to next element in DLL + */ + struct GNUNET_RPS_Request_Handle *next; + + /** + * @brief Pointer to previous element in DLL + */ + struct GNUNET_RPS_Request_Handle *prev; }; @@ -263,10 +304,7 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers, rh->ready_cb (rh->ready_cb_cls, num_peers, peers); - GNUNET_RPS_stream_cancel (rh->srh); - rh->srh = NULL; - RPS_sampler_destroy (rh->sampler); - rh->sampler = NULL; + GNUNET_RPS_request_cancel (rh); } @@ -606,6 +644,37 @@ hash_from_share_val (const char *share_val, } +/** + * @brief Callback for network size estimate - called with new estimates about + * the network size, updates all samplers with the new estimate + * + * Implements #GNUNET_NSE_Callback + * + * @param cls the rps handle + * @param timestamp unused + * @param logestimate the estimate + * @param std_dev the standard distribution + */ +static void +nse_cb (void *cls, + struct GNUNET_TIME_Absolute timestamp, + double logestimate, + double std_dev) +{ + struct GNUNET_RPS_Handle *h = cls; + (void) timestamp; + (void) std_dev; + + for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head; + NULL != rh_iter && NULL != rh_iter->next; + rh_iter = rh_iter->next) + { + RPS_sampler_update_with_nw_size (rh_iter->sampler, + GNUNET_NSE_log_estimate_to_n (logestimate)); + } +} + + /** * Reconnect to the service */ @@ -631,6 +700,9 @@ reconnect (struct GNUNET_RPS_Handle *h) mq_handlers, &mq_error_handler, h); + if (NULL != h->nse) + GNUNET_NSE_disconnect (h->nse); + h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h); } @@ -638,7 +710,7 @@ reconnect (struct GNUNET_RPS_Handle *h) * Connect to the rps service * * @param cfg configuration to use - * @return a handle to the service + * @return a handle to the service, NULL on error */ struct GNUNET_RPS_Handle * GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) @@ -647,6 +719,44 @@ GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) h = GNUNET_new (struct GNUNET_RPS_Handle); h->cfg = cfg; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_float (cfg, + "RPS", + "DESIRED_PROBABILITY", + &h->desired_probability)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "RPS", "DESIRED_PROBABILITY"); + GNUNET_free (h); + return NULL; + } + if (0 > h->desired_probability || + 1 < h->desired_probability) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "The desired probability must be in the interval [0;1]\n"); + GNUNET_free (h); + return NULL; + } + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_float (cfg, + "RPS", + "DEFICIENCY_FACTOR", + &h->deficiency_factor)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "RPS", "DEFICIENCY_FACTOR"); + GNUNET_free (h); + return NULL; + } + if (0 > h->desired_probability || + 1 < h->desired_probability) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "The deficiency factor must be in the interval [0;1]\n"); + GNUNET_free (h); + return NULL; + } reconnect (h); if (NULL == h->mq) { @@ -725,6 +835,10 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, rh->num_requests = num_req_peers; rh->sampler = RPS_sampler_mod_init (num_req_peers, GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff + RPS_sampler_set_desired_probability (rh->sampler, + rps_handle->desired_probability); + RPS_sampler_set_deficiency_factor (rh->sampler, + rps_handle->deficiency_factor); rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler, num_req_peers, peers_ready_cb, @@ -734,6 +848,9 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, rh); /* cls */ rh->ready_cb = ready_cb; rh->ready_cb_cls = cls; + GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head, + rps_handle->rh_tail, + rh); return rh; } @@ -911,6 +1028,7 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) h = rh->rps_handle; GNUNET_assert (NULL != rh); + GNUNET_assert (NULL != rh->srh); GNUNET_assert (h == rh->srh->rps_handle); GNUNET_RPS_stream_cancel (rh->srh); rh->srh = NULL; @@ -920,6 +1038,10 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) RPS_sampler_request_cancel (rh->sampler_rh); } RPS_sampler_destroy (rh->sampler); + rh->sampler = NULL; + GNUNET_CONTAINER_DLL_remove (h->rh_head, + h->rh_tail, + rh); GNUNET_free (rh); } @@ -939,13 +1061,24 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) LOG (GNUNET_ERROR_TYPE_WARNING, "Still waiting for replies\n"); for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head; - NULL != srh_iter; - srh_iter = srh_next) + NULL != srh_iter; + srh_iter = srh_next) { srh_next = srh_iter->next; GNUNET_RPS_stream_cancel (srh_iter); } } + if (NULL != h->rh_head) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Not all requests were cancelled!\n"); + for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head; + h->rh_head != NULL; + rh_iter = h->rh_head) + { + GNUNET_RPS_request_cancel (rh_iter); + } + } if (NULL != srh_callback_peers) { GNUNET_free (srh_callback_peers); @@ -957,6 +1090,8 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) "Still waiting for view updates\n"); GNUNET_RPS_view_request_cancel (h); } + if (NULL != h->nse) + GNUNET_NSE_disconnect (h->nse); GNUNET_MQ_destroy (h->mq); GNUNET_free (h); } diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c index 26066bf10..7fc91743b 100644 --- a/src/rps/test_rps.c +++ b/src/rps/test_rps.c @@ -1964,26 +1964,6 @@ profiler_eval (void) return evaluate (); } -static uint32_t fac (uint32_t x) -{ - if (1 >= x) - { - return x; - } - return x * fac (x - 1); -} - -static uint32_t binom (uint32_t n, uint32_t k) -{ - //GNUNET_assert (n >= k); - if (k > n) return 0; - if (0 > n) return 0; - if (0 > k) return 0; - if (0 == k) return 1; - return fac (n) - / - fac(k) * fac(n - k); -} /** * @brief is b in view of a? diff --git a/src/rps/test_rps.conf b/src/rps/test_rps.conf index c22113af5..68f3982ec 100644 --- a/src/rps/test_rps.conf +++ b/src/rps/test_rps.conf @@ -22,6 +22,10 @@ FILENAME_VALID_PEERS = $GNUNET_DATA_HOME/rps/valid_peers.txt # So, 50 is enough for a network of size 50^3 = 125000 MINSIZE = 4 +DESIRED_PROBABILITY = 0.75 + +DEFICIENCY_FACTOR = 0.4 + [testbed]