*/
static struct GNUNET_PeerIdentity *own_identity;
-/**
- * Closure to the callback cadet calls on each peer it passes to us
- */
-struct init_peer_cls
-{
- /**
- * The server handle to later listen to client requests
- */
- struct GNUNET_SERVER_Handle *server;
-
- /**
- * Counts how many peers cadet already passed to us
- */
- uint32_t i;
-};
-
struct GNUNET_PeerIdentity *
get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size);
LIVING = 0x10
};
+
+/**
+ * Functions of this type can be used to be stored at a peer for later execution.
+ */
+typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer);
+
+/**
+ * Outstanding operation on peer consisting of callback and closure
+ */
+struct PeerOutstandingOp
+{
+ /**
+ * Callback
+ */
+ PeerOp op;
+
+ /**
+ * Closure
+ */
+ void *op_cls;
+};
+
+
/**
* Struct used to keep track of other peer's status
*
*/
struct GNUNET_CADET_Channel *recv_channel; // unneeded?
+ /**
+ * Array of outstanding operations on this peer.
+ */
+ struct PeerOutstandingOp *outstanding_ops;
+
+ /**
+ * Number of outstanding operations.
+ */
+ unsigned int num_outstanding_ops;
+ //size_t num_outstanding_ops;
+
/**
* This is pobably followed by 'statistical' data (when we first saw
* him, how did we get his ID, how many pushes (in a timeinterval),
static struct GNUNET_TIME_Relative request_rate;
+/**
+ * Number of history update tasks.
+ */
+uint32_t num_hist_update_tasks;
+
+
/***********************************************************************
* /Globals
***********************************************************************/
ctx->mq = NULL;
ctx->send_channel = NULL;
ctx->recv_channel = NULL;
+ ctx->outstanding_ops = NULL;
+ ctx->num_outstanding_ops = 0;
(void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
}
+/**
+ * Put random peer from sampler into the gossip list as history update.
+ */
+ void
+hist_update (void *cls, struct GNUNET_PeerIdentity *ids, uint32_t num_peers)
+{
+ GNUNET_assert (1 == num_peers);
+
+ if (gossip_list_size < sampler_size_est_need)
+ GNUNET_array_append (gossip_list, gossip_list_size, *ids);
+
+ if (0 < num_hist_update_tasks)
+ num_hist_update_tasks--;
+}
+
+
/**
* Callback that is called when a channel was effectively established.
* This is given to ntfy_tmt_rdy and called when the channel was
LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer));
+ if (0 != peer_ctx->num_outstanding_ops)
+ { /* Call outstanding operations */
+ unsigned int i;
+
+ for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+ peer_ctx->outstanding_ops[i].op (peer_ctx->outstanding_ops[i].op_cls, peer);
+ GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0);
+ }
+
GNUNET_free (peer);
buf = NULL;
const struct GNUNET_PeerIdentity *peer)
{
struct PeerContext *ctx;
- //struct GNUNET_PeerIdentity *tmp_peer;
+ struct GNUNET_PeerIdentity *tmp_peer;
ctx = get_peer_ctx (peer_map, peer);
if (NULL == ctx->send_channel)
GNUNET_RPS_CADET_PORT,
GNUNET_CADET_OPTION_RELIABLE);
- //tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity);
- //*tmp_peer = *peer;
- //(void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO,
- // GNUNET_TIME_UNIT_FOREVER_REL,
- // 0, peer_is_live, tmp_peer);
+ if (NULL == ctx->recv_channel)
+ {
+ tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity);
+ *tmp_peer = *peer;
+ (void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ 0, peer_is_live, tmp_peer);
+ }
// do I have to explicitly put it in the peer_map?
(void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
struct GNUNET_TIME_Relative
T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
{
- return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size); // FIXME find a way to devide that by arr_size
+ return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size);
+}
+
+
+/**
+ * Insert PeerID in #pull_list
+ *
+ * Called once we know a peer is live.
+ */
+ void
+insert_in_pull_list (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+ if (GNUNET_NO == in_arr (pull_list, pull_list_size, peer))
+ GNUNET_array_append (pull_list, pull_list_size, *peer);
+}
+
+/**
+ * Check whether #insert_in_pull_list was already scheduled
+ */
+ int
+insert_in_pull_list_scheduled (const struct PeerContext *peer_ctx)
+{
+ unsigned int i;
+
+ for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+ if (insert_in_pull_list == peer_ctx->outstanding_ops[i].op)
+ return GNUNET_YES;
+ return GNUNET_NO;
+}
+
+
+/**
+ * Insert PeerID in #gossip_list
+ *
+ * Called once we know a peer is live.
+ */
+ void
+insert_in_gossip_list (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+ if (GNUNET_NO == in_arr (gossip_list, gossip_list_size, peer))
+ GNUNET_array_append (gossip_list, gossip_list_size, *peer);
+}
+
+/**
+ * Check whether #insert_in_pull_list was already scheduled
+ */
+ int
+insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx)
+{
+ unsigned int i;
+
+ for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+ if (insert_in_gossip_list == peer_ctx->outstanding_ops[i].op)
+ return GNUNET_YES;
+ return GNUNET_NO;
+}
+
+
+/**
+ * Update sampler with given PeerID.
+ */
+ void
+insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
+ RPS_sampler_update_list (peer);
+}
+
+/**
+ * Check whether #insert_in_sampler was already scheduled
+ */
+ int
+insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
+{
+ unsigned int i;
+
+ for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
+ if (insert_in_sampler== peer_ctx->outstanding_ops[i].op)
+ return GNUNET_YES;
+ return GNUNET_NO;
}
-/***********************************************************************
- * /Util functions
-***********************************************************************/
/**
- * Wrapper around _sampler_resize()
+ * Wrapper around #RPS_sampler_resize()
*/
void
resize_wrapper ()
}
+/***********************************************************************
+ * /Util functions
+***********************************************************************/
+
/**
* Function called by NSE.
*
LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers);
- RPS_sampler_get_n_rand_peers (client_respond, client, num_peers);
+ RPS_sampler_get_n_rand_peers (client_respond, client, num_peers, GNUNET_YES);
GNUNET_SERVER_receive_done (client,
GNUNET_OK);
struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
struct GNUNET_PeerIdentity *peers;
+ struct PeerContext *peer_ctx;
+ struct PeerOutstandingOp out_op;
uint32_t i;
if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
peers = (struct GNUNET_PeerIdentity *) &msg[1];
for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ )
{
- if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i]))
- GNUNET_array_append (pull_list, pull_list_size, peers[i]);
+ peer_ctx = get_peer_ctx (peer_map, &peers[i]);
+
+ if ((0 != (peer_ctx->peer_flags && LIVING)) ||
+ NULL != peer_ctx->recv_channel)
+ {
+ if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i]))
+ GNUNET_array_append (pull_list, pull_list_size, peers[i]);
+ }
+ else if (GNUNET_NO == insert_in_pull_list_scheduled (peer_ctx))
+ {
+ out_op.op = insert_in_pull_list;
+ GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op);
+ }
}
// TODO check that id is valid - whether it is reachable
/* Send PUSHes */
//n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size);
- n_peers = round (alpha * gossip_list_size);
- if (0 == n_peers)
- n_peers = 1;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n",
- n_peers, alpha, gossip_list_size);
- for ( i = 0 ; i < n_peers ; i++ )
+ if (0 != gossip_list_size)
{
- peer = get_rand_peer (gossip_list, gossip_list_size);
- if (own_identity != peer)
- { // FIXME if this fails schedule/loop this for later
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer));
-
- ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
- // FIXME sometimes it returns a pointer to a freed mq
- mq = get_mq (peer_map, peer);
- GNUNET_MQ_send (mq, ev);
+ n_peers = round (alpha * gossip_list_size);
+ if (0 == n_peers)
+ n_peers = 1;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n",
+ n_peers, alpha, gossip_list_size);
+ for ( i = 0 ; i < n_peers ; i++ )
+ {
+ peer = get_rand_peer (gossip_list, gossip_list_size);
+ if (own_identity != peer)
+ { // FIXME if this fails schedule/loop this for later
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer));
+
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
+ // FIXME sometimes it returns a pointer to a freed mq
+ mq = get_mq (peer_map, peer);
+ GNUNET_MQ_send (mq, ev);
+ }
}
}
/* Send PULL requests */
//n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
- n_peers = round (beta * gossip_list_size);
- if (0 == n_peers)
- n_peers = 1;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n",
- n_peers, beta, gossip_list_size);
- for ( i = 0 ; i < n_peers ; i++ )
+ if (0 != gossip_list_size)
{
- peer = get_rand_peer (gossip_list, gossip_list_size);
- if (own_identity != peer)
- { // FIXME if this fails schedule/loop this for later
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer));
-
- ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
- //pull_msg = NULL;
- mq = get_mq (peer_map, peer);
- GNUNET_MQ_send (mq, ev);
+ n_peers = round (beta * gossip_list_size);
+ if (0 == n_peers)
+ n_peers = 1;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n",
+ n_peers, beta, gossip_list_size);
+ for ( i = 0 ; i < n_peers ; i++ )
+ {
+ peer = get_rand_peer (gossip_list, gossip_list_size);
+ if (own_identity != peer)
+ { // FIXME if this fails schedule/loop this for later
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer));
+
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
+ //pull_msg = NULL;
+ mq = get_mq (peer_map, peer);
+ GNUNET_MQ_send (mq, ev);
+ }
}
}
push_list_size != 0 &&
pull_list_size != 0 )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list.\n");
uint32_t first_border;
uint32_t second_border;
- GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need);
+ first_border = round (alpha * sampler_size_est_need);
+ second_border = first_border + round (beta * sampler_size_est_need);
+
+ GNUNET_array_grow (gossip_list, gossip_list_size, second_border);
- first_border = round (alpha * gossip_list_size);
for ( i = 0 ; i < first_border ; i++ )
{ // TODO use RPS_sampler_get_n_rand_peers
/* Update gossip list with peers received through PUSHes */
// TODO change the peer_flags accordingly
}
- second_border = first_border + round (beta * gossip_list_size);
for ( i = first_border ; i < second_border ; i++ )
{
/* Update gossip list with peers received through PULLs */
for ( i = second_border ; i < gossip_list_size ; i++ )
{
/* Update gossip list with peers from history */
- peer = RPS_sampler_get_n_rand_peers_ (1);
- gossip_list[i] = *peer;
+ RPS_sampler_get_n_rand_peers (hist_update, NULL, 1, GNUNET_NO);
+ num_hist_update_tasks++;
// TODO change the peer_flags accordingly
}
unsigned int best_path) // "How long is the best path?
// (0 = unknown, 1 = ourselves, 2 = neighbor)"
{
- struct init_peer_cls *ipc;
+ struct GNUNET_SERVER_Handle *server;
+ struct PeerOutstandingOp out_op;
+ struct PeerContext *peer_ctx;
- ipc = (struct init_peer_cls *) cls;
+ server = (struct GNUNET_SERVER_Handle *) cls;
if ( NULL != peer )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Got %" PRIX32 ". peer %s (at %p) from CADET (gossip_list_size: %u)\n",
- ipc->i, GNUNET_i2s (peer), peer, gossip_list_size);
- RPS_sampler_update_list (peer);
- (void) get_peer_ctx (peer_map, peer); // unneeded? -> insertCB
+ "Got peer %s (at %p) from CADET (gossip_list_size: %u)\n",
+ GNUNET_i2s (peer), peer, gossip_list_size);
- if (ipc->i < gossip_list_size)
+ // maybe create a function for that
+ peer_ctx = get_peer_ctx (peer_map, peer);
+ if (GNUNET_NO == insert_in_sampler_scheduled (peer_ctx))
{
- gossip_list[ipc->i] = *peer; // FIXME sometimes we're writing to invalid space here
- // not sure whether fixed
- ipc->i++;
+ out_op.op = insert_in_sampler;
+ GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op);
}
- // send push/pull to each of those peers?
- }
- else
- {
- if (ipc->i < gossip_list_size)
+ if (GNUNET_NO == insert_in_gossip_list_scheduled (peer_ctx))
{
- memcpy (&gossip_list[ipc->i],
- RPS_sampler_get_n_rand_peers_ (1),
- (gossip_list_size - ipc->i) * sizeof (struct GNUNET_PeerIdentity));
+ out_op.op = insert_in_gossip_list;
+ GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op);
}
- rps_start (ipc->server);
- GNUNET_free (ipc);
+
+ /* Issue livelyness test on peer */
+ (void) get_channel (peer_map, peer);
+
+ // send push/pull to each of those peers?
}
+ else
+ rps_start (server);
}
(void) peer_remove_cb (peer, peer, peer_ctx);
}
+
/**
* Actually start the service.
*/
LOG (GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
+ num_hist_update_tasks = 0;
+
do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
- struct init_peer_cls *ipc;
cfg = c;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need);
- //gossip_list_size = sampler_size; // TODO rename sampler_size
gossip_list = NULL;
- GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need);
/* connect to NSE */
nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
// TODO check whether that was successful
- // TODO disconnect on shutdown
LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
- RPS_sampler_init (sampler_size_est_need, own_identity, max_round_interval,
+ RPS_sampler_init (sampler_size_est_need, max_round_interval,
insertCB, NULL, removeCB, NULL);
sampler_size = sampler_size_est_need;
pull_list_size = 0;
- ipc = GNUNET_new (struct init_peer_cls);
- ipc->server = server;
- ipc->i = 0;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
- GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, ipc);
+ GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, server);
// TODO send push/pull to each of those peers?
}
/**
* Closure to _get_n_rand_peers_ready_cb()
*/
-struct RPS_GetNRandPeersReadyCls
+struct NRandPeersReadyCls
{
/**
* Number of peers we are waiting for.
* give those back.
*/
void
-RPS_sampler_get_n_rand_peers_ready_cb (void *cls,
+check_n_peers_ready (void *cls,
const struct GNUNET_PeerIdentity *id)
{
- struct RPS_GetNRandPeersReadyCls *n_peers_cls;
+ struct NRandPeersReadyCls *n_peers_cls;
- n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls;
+ n_peers_cls = (struct NRandPeersReadyCls *) cls;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: Got %" PRIX32 "th of %" PRIX32 " peers\n",
+ "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n",
n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers)
sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
- /* We might want to keep the previous peer */
-
- //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id,
- // sizeof(struct GNUNET_PeerIdentity),
- // &sampler_el->peer_id_hash);
-
sampler_el->birth = GNUNET_TIME_absolute_get ();
sampler_el->num_peers = 0;
sampler_el->num_change = 0;
* Initialise a tuple of sampler elements.
*
* @param init_size the size the sampler is initialised with
- * @param id with which all newly created sampler elements are initialised
* @param ins_cb the callback that will be called on every PeerID that is
* newly inserted into a sampler element
* @param ins_cls the closure given to #ins_cb
*/
void
RPS_sampler_init (size_t init_size,
- const struct GNUNET_PeerIdentity *id,
struct GNUNET_TIME_Relative max_round_interval,
RPS_sampler_insert_cb ins_cb, void *ins_cls,
RPS_sampler_remove_cb rem_cb, void *rem_cls)
//sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
//GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
RPS_sampler_resize (init_size);
- RPS_sampler_update_list (id); // no super nice desing but ok for the moment
client_get_index = 0;
* corrsponding peer to the client.
* Only used internally
*/
- const struct GNUNET_PeerIdentity *
-RPS_sampler_get_rand_peer_ ()
+ void
+RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ struct GetPeerCls *gpc;
uint32_t r_index;
- const struct GNUNET_PeerIdentity *peer; // do we have to malloc that?
+ struct GNUNET_HashCode *hash;
- // TODO implement extra logic
+ gpc = (struct GetPeerCls *) cls;
/**;
* Choose the r_index of the peer we want to return
r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
sampler->sampler_size);
- //if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
- // // TODO schedule for later
- // peer = NULL;
- //else
- peer = &(sampler->sampler_elements[r_index]->peer_id);
- //sampler->sampler_elements[r_index]->last_client_request = GNUNET_TIME_absolute_get();
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer));
-
- return peer;
-}
-
-
-/**
- * Get n random peers out of the sampled peers.
- *
- * We might want to reinitialise this sampler after giving the
- * corrsponding peer to the client.
- * Random with or without consumption?
- * Only used internally
- */
- const struct GNUNET_PeerIdentity *
-RPS_sampler_get_n_rand_peers_ (uint32_t n)
-{
- if ( 0 == sampler->sampler_size )
+ if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sgrp: List empty - Returning NULL\n");
- return NULL;
+ gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
+ GNUNET_TIME_UNIT_SECONDS,
+ .1),
+ &RPS_sampler_get_rand_peer_,
+ cls);
+ return;
}
- else
- {
- // TODO check if we have too much (distinct) sampled peers
- // If we are not ready yet maybe schedule for later
- struct GNUNET_PeerIdentity *peers;
- uint32_t i;
- peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity));
+ *gpc->id = sampler->sampler_elements[r_index]->peer_id;
- for ( i = 0 ; i < n ; i++ ) {
- //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements);
- memcpy (&peers[i], RPS_sampler_get_rand_peer_ (), sizeof (struct GNUNET_PeerIdentity));
- }
- return peers;
- }
+ hash = GNUNET_new (struct GNUNET_HashCode);
+ GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
+ if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
+ LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
+ GNUNET_free (gpc->get_peer_task);
+
+ gpc->cb (gpc->cb_cls, gpc->id);
}
* @return a random PeerID of the PeerIDs previously put into the sampler.
*/
void
-//RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb,
-// void *cls, struct GNUNET_PeerIdentity *id)
RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GetPeerCls *gpc;
+ struct GNUNET_PeerIdentity tmp_id;
struct RPS_SamplerElement *s_elem;
struct GNUNET_TIME_Relative last_request_diff;
struct GNUNET_HashCode *hash;
uint32_t tmp_client_get_index;
- //struct GNUNET_TIME_Relative inv_last_request_diff;
LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n");
gpc = (struct GetPeerCls *) cls;
hash = GNUNET_new (struct GNUNET_HashCode);
+
+ /* Store the next #client_get_index to check whether we cycled over the whole list */
if (0 < client_get_index)
tmp_client_get_index = client_get_index - 1;
else
tmp_client_get_index = sampler->sampler_size - 1;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ".\n",
+ "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
tmp_client_get_index, sampler->sampler_size);
do
return;
}
- *gpc->id = sampler->sampler_elements[client_get_index]->peer_id;
-
+ tmp_id = sampler->sampler_elements[client_get_index]->peer_id;
RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
+ RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], &tmp_id,
+ NULL, NULL, NULL, NULL);
+
+ /* Cycle the #client_get_index one step further */
if ( client_get_index == sampler->sampler_size - 1 )
client_get_index = 0;
else
client_get_index++;
+
LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index);
} while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
s_elem = sampler->sampler_elements[client_get_index];
+ *gpc->id = s_elem->peer_id;
/* Check whether we may use this sampler to give it back to the client */
if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
*
* @param cb callback that will be called once the ids are ready.
* @param cls closure given to @a cb
+ * @param for_client #GNUNET_YES if result is used for client,
+ * #GNUNET_NO if used internally
* @param num_peers the number of peers requested
*/
void
RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
- void *cls, uint32_t num_peers)
+ void *cls, uint32_t num_peers, int for_client)
{
- if ( 0 == sampler->sampler_size )
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sgrp: List empty - Returning NULL\n");
- cb (cls, NULL, 0);
- }
- else
- {
- // TODO check if we have too much (distinct) sampled peers
- // If we are not ready yet maybe schedule for later
- uint32_t i;
- struct RPS_GetNRandPeersReadyCls *cb_cls;
- struct GetPeerCls *gpc;
- struct GNUNET_HashCode *hash;
-
- hash = GNUNET_new (struct GNUNET_HashCode);
+ GNUNET_assert (0 != sampler->sampler_size);
- cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls);
- cb_cls->num_peers = num_peers;
- cb_cls->cur_num_peers = 0;
- cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
- cb_cls->callback = cb;
- cb_cls->cls = cls;
+ // TODO check if we have too much (distinct) sampled peers
+ uint32_t i;
+ struct NRandPeersReadyCls *cb_cls;
+ struct GetPeerCls *gpc;
+ struct GNUNET_HashCode *hash;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
+ hash = GNUNET_new (struct GNUNET_HashCode);
- for ( i = 0 ; i < num_peers ; i++ )
- {
- gpc = GNUNET_new (struct GetPeerCls);
- gpc->cb = RPS_sampler_get_n_rand_peers_ready_cb;
- gpc->cb_cls = cb_cls;
- gpc->id = &cb_cls->ids[i];
+ cb_cls = GNUNET_new (struct NRandPeersReadyCls);
+ cb_cls->num_peers = num_peers;
+ cb_cls->cur_num_peers = 0;
+ cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
+ cb_cls->callback = cb;
+ cb_cls->cls = cls;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
- // maybe add a little delay
+ for ( i = 0 ; i < num_peers ; i++ )
+ {
+ gpc = GNUNET_new (struct GetPeerCls);
+ gpc->cb = check_n_peers_ready;
+ gpc->cb_cls = cb_cls;
+ gpc->id = &cb_cls->ids[i];
+
+ // maybe add a little delay
+ if (GNUNET_YES == for_client)
gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc);
- GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
- (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- //RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb,
- // cb_cls, &peers[i]);
- }
+ else if (GNUNET_NO == for_client)
+ gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer_, gpc);
+ GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
+ (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
}
}