rps-sampler_client.h rps-sampler_client.c \
rps_api.c rps.h
libgnunetrps_la_LIBADD = \
+ $(top_builddir)/src/nse/libgnunetnse.la \
$(top_builddir)/src/util/libgnunetutil.la \
$(GN_LIBINTL) $(XLIB)
libgnunetrps_la_LDFLAGS = \
"Cancelling rps get reply\n");
GNUNET_assert (NULL != pending_rep->req_handle);
GNUNET_RPS_request_cancel (pending_rep->req_handle);
+ pending_rep->req_handle = NULL;
GNUNET_free (pending_rep);
+ pending_rep = NULL;
}
void
return evaluate ();
}
-static uint32_t fac (uint32_t x)
-{
- if (1 >= x)
- {
- return x;
- }
- return x * fac (x - 1);
-}
-static uint32_t binom (uint32_t n, uint32_t k)
-{
- //GNUNET_assert (n >= k);
- if (k > n) return 0;
- /* if (0 > n) return 0; - always false */
- /* if (0 > k) return 0; - always false */
- if (0 == k) return 1;
- return fac (n)
- /
- fac(k) * fac(n - k);
-}
-
-/**
- * @brief is b in view of a?
+/** @brief is b in view of a?
*
* @param a
* @param b
*/
struct RPS_Sampler *
RPS_sampler_init (size_t init_size,
- struct GNUNET_TIME_Relative max_round_interval);
+ struct GNUNET_TIME_Relative max_round_interval);
/**
# So, 50 is enough for a network of size 50^3 = 125000
MINSIZE = 4
+DESIRED_PROBABILITY = 0.75
+
+DEFICIENCY_FACTOR = 0.4
[testbed]
}
+/**
+ * @brief Compute the probability that we already observed all peers from a
+ * biased stream of peer ids.
+ *
+ * Deficiency factor:
+ * As introduced by Brahms: Factor between the number of unique ids in a
+ * truly random stream and number of unique ids in the gossip stream.
+ *
+ * @param num_peers_estim The estimated number of peers in the network
+ * @param num_peers_observed The number of peers the given element has observed
+ * @param deficiency_factor A factor that catches the 'bias' of a random stream
+ * of peer ids
+ *
+ * @return The estimated probability
+ */
+static double
+prob_observed_n_peers (uint32_t num_peers_estim,
+ uint32_t num_peers_observed,
+ double deficiency_factor)
+{
+ uint32_t num_peers = num_peers_estim * (1/deficiency_factor);
+ uint64_t sum = 0;
+
+ for (uint32_t i = 0; i < num_peers; i++)
+ {
+ uint64_t a = pow (-1, num_peers-i);
+ uint64_t b = binom (num_peers, i);
+ uint64_t c = pow (i, num_peers_observed);
+ sum += a * b * c;
+ }
+
+ return sum / (double) pow (num_peers, num_peers_observed);
+}
+
+
/**
* Get one random peer out of the sampled peers.
*
struct RPS_SamplerElement *s_elem;
struct GNUNET_TIME_Relative last_request_diff;
struct RPS_Sampler *sampler;
+ double prob_observed_n;
gpc->get_peer_task = NULL;
gpc->notify_ctx = NULL;
gpc);
return;
}
+ /* compute probability */
+ prob_observed_n = prob_observed_n_peers (sampler->num_peers_estim,
+ s_elem->num_peers,
+ sampler->deficiency_factor);
+ /* check if probability is above desired */
+ if (prob_observed_n >= sampler->desired_probability)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Probability of having observed all peers (%d) too small ( < %d).\n",
+ prob_observed_n,
+ sampler->desired_probability);
+ GNUNET_assert (NULL == gpc->notify_ctx);
+ gpc->notify_ctx =
+ sampler_notify_on_update (sampler,
+ &sampler_mod_get_rand_peer,
+ gpc);
+ return;
+ }
/* More reasons to wait could be added here */
// GNUNET_STATISTICS_set (stats,
};
+/**
+ * @brief Update the current estimate of the network size stored at the sampler
+ *
+ * Used for computing the condition when to return elements to the client
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler The sampler to update
+ * @param num_peers The estimated value
+ */
+void
+RPS_sampler_update_with_nw_size (struct RPS_Sampler *sampler,
+ uint32_t num_peers)
+{
+ sampler->num_peers_estim = num_peers;
+}
+
+
+/**
+ * @brief Set the probability that is needed at least with what a sampler
+ * element has to have observed all elements from the network.
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler
+ * @param desired_probability
+ */
+void
+RPS_sampler_set_desired_probability (struct RPS_Sampler *sampler,
+ double desired_probability)
+{
+ sampler->desired_probability = desired_probability;
+}
+
+
+/**
+ * @brief Set the deficiency factor.
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler
+ * @param desired_probability
+ */
+void
+RPS_sampler_set_deficiency_factor (struct RPS_Sampler *sampler,
+ double deficiency_factor)
+{
+ sampler->deficiency_factor = deficiency_factor;
+}
+
+
/**
* @brief Add a callback that will be called when the next peer is inserted
* into the sampler
*/
struct GNUNET_TIME_Relative max_round_interval;
+ /**
+ * @brief The estimated total number of peers in the network
+ */
+ uint32_t num_peers_estim;
+
+ /**
+ * @brief The desired probability with which we want to have observed all
+ * peers.
+ */
+ double desired_probability;
+
+ /**
+ * @brief A factor that catches the 'bias' of a random stream of peer ids.
+ *
+ * As introduced by Brahms: Factor between the number of unique ids in a
+ * truly random stream and number of unique ids in the gossip stream.
+ */
+ double deficiency_factor;
+
/**
* Stores the function to return peers. Which one it is depends on whether
* the Sampler is the modified one or not.
};
+/**
+ * @brief Update the current estimate of the network size stored at the sampler
+ *
+ * Used for computing the condition when to return elements to the client
+ *
+ * @param sampler The sampler to update
+ * @param num_peers The estimated value
+ */
+void
+RPS_sampler_update_with_nw_size (struct RPS_Sampler *sampler,
+ uint32_t num_peers);
+
+
+/**
+ * @brief Set the probability that is needed at least with what a sampler
+ * element has to have observed all elements from the network.
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler
+ * @param desired_probability
+ */
+void
+RPS_sampler_set_desired_probability (struct RPS_Sampler *sampler,
+ double desired_probability);
+
+
+/**
+ * @brief Set the deficiency factor.
+ *
+ * Only used/useful with the client sampler
+ * (Maybe move to rps-sampler_client.{h|c} ?)
+ *
+ * @param sampler
+ * @param desired_probability
+ */
+void
+RPS_sampler_set_deficiency_factor (struct RPS_Sampler *sampler,
+ double deficiency_factor);
+
+
/**
* @brief Add a callback that will be called when the next peer is inserted
* into the sampler
return file_name;
}
+
+/**
+ * @brief Factorial
+ *
+ * @param x Number of which to compute the factorial
+ *
+ * @return Factorial of @a x
+ */
+uint32_t fac (uint32_t x)
+{
+ if (1 >= x)
+ {
+ return x;
+ }
+ return x * fac (x - 1);
+}
+
+/**
+ * @brief Binomial coefficient (n choose k)
+ *
+ * @param n
+ * @param k
+ *
+ * @return Binomial coefficient of @a n and @a k
+ */
+uint32_t binom (uint32_t n, uint32_t k)
+{
+ //GNUNET_assert (n >= k);
+ if (k > n) return 0;
+ /* if (0 > n) return 0; - always false */
+ /* if (0 > k) return 0; - always false */
+ if (0 == k) return 1;
+ return fac (n)
+ /
+ fac(k) * fac(n - k);
+}
+
+
/* end of gnunet-service-rps.c */
size_t size_buf,
unsigned bits_needed);
+
+/**
+ * @brief Factorial
+ *
+ * @param x Number of which to compute the factorial
+ *
+ * @return Factorial of @a x
+ */
+uint32_t fac (uint32_t x);
+
+
+/**
+ * @brief Binomial coefficient (n choose k)
+ *
+ * @param n
+ * @param k
+ *
+ * @return Binomial coefficient of @a n and @a k
+ */
+uint32_t binom (uint32_t n, uint32_t k);
+
#endif /* RPS_TEST_UTIL_H */
/* end of gnunet-service-rps.c */
# Keep in mind, that (networksize)^(1/3) should be enough.
# So, 50 is enough for a network of size 50^3 = 125000
MINSIZE = 10
+
+# The probability whith which we want a sampler element to have observed all
+# peer ids in the network at least
+DESIRED_PROBABILITY = 0.9
+
+# A factor that catches the 'bias' of a random stream of peer ids.
+#
+# As introduced by Brahms: Factor between the number of unique ids in a
+# truly random stream and number of unique ids in the gossip stream.
+DEFICIENCY_FACTOR = 0.4
#include "gnunet_rps_service.h"
#include "rps-sampler_client.h"
+#include "gnunet_nse_service.h"
+
#include <inttypes.h>
#define LOG(kind,...) GNUNET_log_from (kind, "rps-api",__VA_ARGS__)
* @brief Tail of the DLL of stream requests
*/
struct GNUNET_RPS_StreamRequestHandle *stream_requests_tail;
+
+ /**
+ * @brief Handle to nse service
+ */
+ struct GNUNET_NSE_Handle *nse;
+
+ /**
+ * @brief Pointer to the head element in DLL of request handles
+ */
+ struct GNUNET_RPS_Request_Handle *rh_head;
+
+ /**
+ * @brief Pointer to the tail element in DLL of request handles
+ */
+ struct GNUNET_RPS_Request_Handle *rh_tail;
+
+ /**
+ * @brief The desired probability with which we want to have observed all
+ * peers.
+ */
+ float desired_probability;
+
+ /**
+ * @brief A factor that catches the 'bias' of a random stream of peer ids.
+ *
+ * As introduced by Brahms: Factor between the number of unique ids in a
+ * truly random stream and number of unique ids in the gossip stream.
+ */
+ float deficiency_factor;
};
* The closure for the callback.
*/
void *ready_cb_cls;
+
+ /**
+ * @brief Pointer to next element in DLL
+ */
+ struct GNUNET_RPS_Request_Handle *next;
+
+ /**
+ * @brief Pointer to previous element in DLL
+ */
+ struct GNUNET_RPS_Request_Handle *prev;
};
rh->ready_cb (rh->ready_cb_cls,
num_peers,
peers);
- GNUNET_RPS_stream_cancel (rh->srh);
- rh->srh = NULL;
- RPS_sampler_destroy (rh->sampler);
- rh->sampler = NULL;
+ GNUNET_RPS_request_cancel (rh);
}
}
+/**
+ * @brief Callback for network size estimate - called with new estimates about
+ * the network size, updates all samplers with the new estimate
+ *
+ * Implements #GNUNET_NSE_Callback
+ *
+ * @param cls the rps handle
+ * @param timestamp unused
+ * @param logestimate the estimate
+ * @param std_dev the standard distribution
+ */
+static void
+nse_cb (void *cls,
+ struct GNUNET_TIME_Absolute timestamp,
+ double logestimate,
+ double std_dev)
+{
+ struct GNUNET_RPS_Handle *h = cls;
+ (void) timestamp;
+ (void) std_dev;
+
+ for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
+ NULL != rh_iter && NULL != rh_iter->next;
+ rh_iter = rh_iter->next)
+ {
+ RPS_sampler_update_with_nw_size (rh_iter->sampler,
+ GNUNET_NSE_log_estimate_to_n (logestimate));
+ }
+}
+
+
/**
* Reconnect to the service
*/
mq_handlers,
&mq_error_handler,
h);
+ if (NULL != h->nse)
+ GNUNET_NSE_disconnect (h->nse);
+ h->nse = GNUNET_NSE_connect (h->cfg, &nse_cb, h);
}
* Connect to the rps service
*
* @param cfg configuration to use
- * @return a handle to the service
+ * @return a handle to the service, NULL on error
*/
struct GNUNET_RPS_Handle *
GNUNET_RPS_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
h = GNUNET_new (struct GNUNET_RPS_Handle);
h->cfg = cfg;
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_float (cfg,
+ "RPS",
+ "DESIRED_PROBABILITY",
+ &h->desired_probability))
+ {
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+ "RPS", "DESIRED_PROBABILITY");
+ GNUNET_free (h);
+ return NULL;
+ }
+ if (0 > h->desired_probability ||
+ 1 < h->desired_probability)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "The desired probability must be in the interval [0;1]\n");
+ GNUNET_free (h);
+ return NULL;
+ }
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_float (cfg,
+ "RPS",
+ "DEFICIENCY_FACTOR",
+ &h->deficiency_factor))
+ {
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+ "RPS", "DEFICIENCY_FACTOR");
+ GNUNET_free (h);
+ return NULL;
+ }
+ if (0 > h->desired_probability ||
+ 1 < h->desired_probability)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "The deficiency factor must be in the interval [0;1]\n");
+ GNUNET_free (h);
+ return NULL;
+ }
reconnect (h);
if (NULL == h->mq)
{
rh->num_requests = num_req_peers;
rh->sampler = RPS_sampler_mod_init (num_req_peers,
GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
+ RPS_sampler_set_desired_probability (rh->sampler,
+ rps_handle->desired_probability);
+ RPS_sampler_set_deficiency_factor (rh->sampler,
+ rps_handle->deficiency_factor);
rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
num_req_peers,
peers_ready_cb,
rh); /* cls */
rh->ready_cb = ready_cb;
rh->ready_cb_cls = cls;
+ GNUNET_CONTAINER_DLL_insert (rps_handle->rh_head,
+ rps_handle->rh_tail,
+ rh);
return rh;
}
h = rh->rps_handle;
GNUNET_assert (NULL != rh);
+ GNUNET_assert (NULL != rh->srh);
GNUNET_assert (h == rh->srh->rps_handle);
GNUNET_RPS_stream_cancel (rh->srh);
rh->srh = NULL;
RPS_sampler_request_cancel (rh->sampler_rh);
}
RPS_sampler_destroy (rh->sampler);
+ rh->sampler = NULL;
+ GNUNET_CONTAINER_DLL_remove (h->rh_head,
+ h->rh_tail,
+ rh);
GNUNET_free (rh);
}
LOG (GNUNET_ERROR_TYPE_WARNING,
"Still waiting for replies\n");
for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
- NULL != srh_iter;
- srh_iter = srh_next)
+ NULL != srh_iter;
+ srh_iter = srh_next)
{
srh_next = srh_iter->next;
GNUNET_RPS_stream_cancel (srh_iter);
}
}
+ if (NULL != h->rh_head)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Not all requests were cancelled!\n");
+ for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
+ h->rh_head != NULL;
+ rh_iter = h->rh_head)
+ {
+ GNUNET_RPS_request_cancel (rh_iter);
+ }
+ }
if (NULL != srh_callback_peers)
{
GNUNET_free (srh_callback_peers);
"Still waiting for view updates\n");
GNUNET_RPS_view_request_cancel (h);
}
+ if (NULL != h->nse)
+ GNUNET_NSE_disconnect (h->nse);
GNUNET_MQ_destroy (h->mq);
GNUNET_free (h);
}
return evaluate ();
}
-static uint32_t fac (uint32_t x)
-{
- if (1 >= x)
- {
- return x;
- }
- return x * fac (x - 1);
-}
-
-static uint32_t binom (uint32_t n, uint32_t k)
-{
- //GNUNET_assert (n >= k);
- if (k > n) return 0;
- if (0 > n) return 0;
- if (0 > k) return 0;
- if (0 == k) return 1;
- return fac (n)
- /
- fac(k) * fac(n - k);
-}
/**
* @brief is b in view of a?
# So, 50 is enough for a network of size 50^3 = 125000
MINSIZE = 4
+DESIRED_PROBABILITY = 0.75
+
+DEFICIENCY_FACTOR = 0.4
+
[testbed]