/*
This file is part of GNUnet.
- (C)
+ Copyright (C)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
* Globals
***********************************************************************/
+/**
+ * Sampler used for the Brahms protocol itself.
+ */
+static struct RPS_Sampler *prot_sampler;
+
+/**
+ * Sampler used for the clients.
+ */
+static struct RPS_Sampler *client_sampler;
+
/**
* Set of all peers to keep track of them.
*/
rem_from_list (tmp_peer_list, &tmp_size, peer);
if (0 == tmp_size)
+ {
+ GNUNET_free (peer);
return NULL;
+ }
/**;
* Choose the r_index of the peer we want to return
void
insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
{
- RPS_sampler_update_list (peer);
+ RPS_sampler_update (prot_sampler, peer);
+ RPS_sampler_update (client_sampler, peer);
}
/**
* Check whether #insert_in_sampler was already scheduled
*/
- int
+static int
insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
{
unsigned int i;
/**
* Wrapper around #RPS_sampler_resize()
+ *
+ * If we do not have enough sampler elements, double current sampler size
+ * If we have more than enough sampler elements, halv current sampler size
*/
- void
-resize_wrapper ()
+static void
+resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
+{
+ unsigned int sampler_size;
+
+ // TODO statistics
+ // TODO respect the min, max
+ sampler_size = RPS_sampler_get_size (sampler);
+ if (sampler_size > new_size * 4)
+ { /* Shrinking */
+ RPS_sampler_resize (sampler, sampler_size / 2);
+ }
+ else if (sampler_size < new_size)
+ { /* Growing */
+ RPS_sampler_resize (sampler, sampler_size * 2);
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
+}
+
+
+/**
+ * Wrapper around #RPS_sampler_resize() resizing the client sampler
+ */
+static void
+client_resize_wrapper ()
{
uint32_t bigger_size;
unsigned int sampler_size;
// TODO statistics
+ sampler_size = RPS_sampler_get_size (client_sampler);
+
if (sampler_size_est_need > sampler_size_client_need)
bigger_size = sampler_size_est_need;
else
bigger_size = sampler_size_client_need;
// TODO respect the min, max
- sampler_size = RPS_sampler_get_size ();
- if (sampler_size > bigger_size * 4)
- { /* Shrinking */
- RPS_sampler_resize (sampler_size / 2);
- }
- else if (sampler_size < bigger_size)
- { /* Growing */
- RPS_sampler_resize (sampler_size * 2);
- }
+ resize_wrapper (client_sampler, bigger_size);
LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
}
max_round_duration.rel_value_us / request_rate.rel_value_us;
/* Resize the sampler */
- resize_wrapper ();
+ client_resize_wrapper ();
}
last_request = GNUNET_TIME_absolute_get ();
}
* /Util functions
***********************************************************************/
+
+
+
+
/**
* Function called by NSE.
*
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
- logestimate, std_dev, RPS_sampler_get_size ());
+ logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
//scale = .01;
estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
// GNUNET_NSE_log_estimate_to_n (logestimate);
LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
/* If the NSE has changed adapt the lists accordingly */
- resize_wrapper ();
+ resize_wrapper (prot_sampler, sampler_size_est_need);
+ client_resize_wrapper ();
}
LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers);
- RPS_sampler_get_n_rand_peers (client_respond, client, num_peers, GNUNET_YES);
+ RPS_sampler_get_n_rand_peers (client_sampler, client_respond,
+ client, num_peers, GNUNET_YES);
GNUNET_SERVER_receive_done (client,
GNUNET_OK);
peers = (struct GNUNET_PeerIdentity *) &message[1];
for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
- RPS_sampler_update_list (&peers[i]);
+ RPS_sampler_update (prot_sampler, &peers[i]);
+ RPS_sampler_update (client_sampler, &peers[i]);
GNUNET_SERVER_receive_done (client,
GNUNET_OK);
{
peer = *tmp_peer;
GNUNET_free (tmp_peer);
+
GNUNET_array_append (pending_pull_reply_list, pending_pull_reply_list_size, peer);
if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer))
for (i = second_border ; i < sampler_size_est_need ; i++)
{
/* Update gossip list with peers from history */
- RPS_sampler_get_n_rand_peers (hist_update, NULL, 1, GNUNET_NO);
+ RPS_sampler_get_n_rand_peers (prot_sampler, hist_update, NULL, 1, GNUNET_NO);
num_hist_update_tasks++;
// TODO change the peer_flags accordingly
}
}
else
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list.\n");
}
// TODO independent of that also get some peers from CADET_get_peers()?
for ( i = 0 ; i < push_list_size ; i++ )
{
- RPS_sampler_update_list (&push_list[i]);
+ RPS_sampler_update (prot_sampler, &push_list[i]);
+ RPS_sampler_update (client_sampler, &push_list[i]);
// TODO set in_flag?
}
for ( i = 0 ; i < pull_list_size ; i++ )
{
- RPS_sampler_update_list (&pull_list[i]);
+ RPS_sampler_update (prot_sampler, &push_list[i]);
+ RPS_sampler_update (client_sampler, &push_list[i]);
// TODO set in_flag?
}
* Open a connection to given peer and store channel and mq.
*/
void
-insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
+insertCB (void *cls, struct RPS_Sampler *sampler,
+ const struct GNUNET_PeerIdentity *id)
{
// We open a channel to be notified when this peer goes down.
(void) get_channel (peer_map, id);
* Close the connection to given peer and delete channel and mq.
*/
void
-removeCB (void *cls, const struct GNUNET_PeerIdentity *id)
+removeCB (void *cls, struct RPS_Sampler *sampler,
+ const struct GNUNET_PeerIdentity *id)
{
size_t s;
struct PeerContext *ctx;
- s = RPS_sampler_count_id (id);
+ s = RPS_sampler_count_id (sampler, id);
if ( 1 >= s )
{
if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id))
peer_ctx = (struct PeerContext *) value;
+ if (0 != peer_ctx->num_outstanding_ops)
+ GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0);
+
if (NULL != peer_ctx->mq)
GNUNET_MQ_destroy (peer_ctx->mq);
GNUNET_NSE_disconnect (nse);
GNUNET_CADET_disconnect (cadet_handle);
- RPS_sampler_destroy ();
+ RPS_sampler_destroy (prot_sampler);
+ RPS_sampler_destroy (client_sampler);
GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (peer_map));
GNUNET_CONTAINER_multipeermap_destroy (peer_map);
GNUNET_array_grow (gossip_list, gossip_list_size, 0);
LOG (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up channel to peer %s\n",
GNUNET_i2s (peer));
- RPS_sampler_reinitialise_by_value (peer);
+ RPS_sampler_reinitialise_by_value (prot_sampler, peer);
+ RPS_sampler_reinitialise_by_value (client_sampler, peer);
if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
{
half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
- RPS_sampler_init (sampler_size_est_need, max_round_interval,
+ prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval,
+ insertCB, NULL, removeCB, NULL);
+ client_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval,
insertCB, NULL, removeCB, NULL);
/* Initialise push and pull maps */